之前有需要使用tp开发一个消息队列功能,用来异步处理订单,发送一些消息等。因为是使用的是Thinkphp6,消息队列我用的thinkphp官方的think-queue消息队列,结合 supervisor 进程管理使队列进程常驻。记录一下,顺便分享给大家。
安装 thinkphp-queue
composer install thinkphp-queue
存储消息环境
thinkphp-queue的消息存储环境,可以有多种形式, Redis,Database,Topthink ,Sync这四种驱动。为了速度,我们一般都使用redis,当然你也可以数据库,创建提供的表结构就行了。
我们这里使用了Redis,具体安装,大家可以使用集成系统,省事一些,具体不再多做赘述啦。
在config/queue.php中,将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。
配置如下
添加生产者
修改app/controller/Index.php里使用Queue::later或者Queue::push发布任务。
namespace app\controller; use app\BaseController; use think\facade\Queue; class Index extends BaseController { public function index() { //当前任务将由哪个类来负责处理 $jobHandlerClassName = 'app\job\Job1'; //业务数据 对象需要手动转序列化 $jobData = ['ts' => time()]; //队列名称 $jobQueueName = "OrderJob"; //入队列,later延时发送,单位秒。push立即发送 $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName); //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false if( $isPushed !== false ){ echo '执行成功'; }else{ echo '执行失败'; } //php think queue:listen --queue OrderJob 执行队列 } }
创建消费者
编写 消费者类,用于处理 OrderJob 队列中的任务,并编写其 fire() 方法
class Job1 { public function fire(Job $job, $data) { //业务处理代码,具体不贴出来了 $isJobDone = $this->jobDone($data); //执行成功删除 if($isJobDone){ $job->delete(); print("任务已经被执行成功并且删除"); }else{ $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行 print("任务3s后再次被执行"); } //通过这个方法可以检查任务重试了几次 if ($job->attempts() > 3) { print("Job has been retried more than 3 times!"); $job->delete(); } } public function failed($data) { // ...任务达到最大重试次数后,失败了 } //job private function jobDone($data){ Log::write('这是数据 ' . json_encode($data)); return true; }
出队列(消费任务)
在项目根目录执行命令
php think queue:work --queue OrderJob
supervisor的安装和配置
yum安装
# yum install epel-release # yum install supervisor //设置成开机自动启动 # systemctl enable supervisord
在 /var/supervisor/conf 创建一个 conf配置文件
[program:queue_worker] ;项目名称 directory = /www/wwwroot/tp6 ; 程序的启动目录,项目根目录的上一级 command = php think queue:work --queue OrderJob --daemon ; 启动命令 queueName就是队列名 process_name=%(program_name)s_%(process_num)02d numprocs = 3 ; 开启的进程数量 autostart = true ; 在 supervisord 启动的时候也自动启动 startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了 autorestart = true ; 程序异常退出后自动重启 startretries = 3 ; 启动失败自动重试次数,默认是 3 user = root ; 用哪个用户启动 redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false stdout_logfile_maxbytes = 50MB ; stdout 日志文件大小,默认 50MB stdout_logfile_backups = 20 ; stdout 日志文件备份数 ; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件) stdout_logfile = /var/supervisor/log/queue_worker.log loglevel=info
重启下
# systemctl restart supervisord