1. 首页
  2. >
  3. 编程技术
  4. >
  5. PHP

ThinkPHP6+Supervisor实现进程常驻消息队列

之前有需要使用tp开发一个消息队列功能,用来异步处理订单,发送一些消息等。因为是使用的是Thinkphp6,消息队列我用的thinkphp官方的think-queue消息队列,结合 supervisor 进程管理使队列进程常驻。记录一下,顺便分享给大家。

安装 thinkphp-queue

composer install thinkphp-queue

存储消息环境

thinkphp-queue的消息存储环境,可以有多种形式, Redis,Database,Topthink ,Sync这四种驱动。为了速度,我们一般都使用redis,当然你也可以数据库,创建提供的表结构就行了。

我们这里使用了Redis,具体安装,大家可以使用集成系统,省事一些,具体不再多做赘述啦。

在config/queue.php中,将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。

ThinkPHP6+Supervisor实现进程常驻消息队列

配置如下

添加生产者

修改app/controller/Index.php里使用Queue::later或者Queue::push发布任务。

namespace app\controller;  use app\BaseController; use think\facade\Queue;  class Index extends BaseController {     public function index()     {         //当前任务将由哪个类来负责处理         $jobHandlerClassName = 'app\job\Job1';         //业务数据 对象需要手动转序列化         $jobData = ['ts' => time()];         //队列名称         $jobQueueName = "OrderJob";         //入队列,later延时发送,单位秒。push立即发送         $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);         //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );         // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false         if( $isPushed !== false ){             echo '执行成功';         }else{             echo '执行失败';         }         //php think queue:listen --queue OrderJob  执行队列     } }

创建消费者

编写 消费者类,用于处理 OrderJob 队列中的任务,并编写其 fire() 方法

class Job1 {     public function fire(Job $job, $data)     {         //业务处理代码,具体不贴出来了         $isJobDone = $this->jobDone($data);         //执行成功删除         if($isJobDone){             $job->delete();             print("任务已经被执行成功并且删除");         }else{             $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行             print("任务3s后再次被执行");         }         //通过这个方法可以检查任务重试了几次         if ($job->attempts() > 3) {             print("Job has been retried more than 3 times!");             $job->delete();         }     }       public function failed($data)     {         // ...任务达到最大重试次数后,失败了     }     //job     private function jobDone($data){         Log::write('这是数据 ' . json_encode($data));         return true;     }

出队列(消费任务)

在项目根目录执行命令

php think queue:work --queue OrderJob

supervisor的安装和配置

yum安装

# yum install epel-release # yum install supervisor //设置成开机自动启动 # systemctl enable supervisord

/var/supervisor/conf 创建一个 conf配置文件

[program:queue_worker] ;项目名称 directory = /www/wwwroot/tp6 ; 程序的启动目录,项目根目录的上一级 command = php think queue:work --queue OrderJob --daemon ; 启动命令 queueName就是队列名 process_name=%(program_name)s_%(process_num)02d numprocs = 3         ; 开启的进程数量 autostart = true     ; 在 supervisord 启动的时候也自动启动 startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了 autorestart = true   ; 程序异常退出后自动重启 startretries = 3     ; 启动失败自动重试次数,默认是 3 user = root          ; 用哪个用户启动 redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB stdout_logfile_backups = 20     ; stdout 日志文件备份数 ; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件) stdout_logfile = /var/supervisor/log/queue_worker.log loglevel=info

重启下

# systemctl restart supervisord