Phone: 158 4018 8888 Email: 407593529@qq.com

消息队列

2026-02-25 15:08:13 2

消息队列

在开发应用程序的时候,你可能需要执行一些任务,例如解析和存储一些大型文件,这些任务在Web 请求期间需要耗费很长时间才能执行完成。 这时候我们就可以用消息队列来创建在后台处理的队列任务。 通过将时间密集型任务移至队列,这样你的应用程序可以以极快的速度响应 Web 请求,并为你的程序提供更好的用户体验
传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。
一般来说,可以抽离的任务具有以下的特点:
●允许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式)
○允许延后:抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务;
○允许异步:业务处理过程中的邮件,短信等通知
○允许并行:用户支付成功之后,邮件通知,微信通知,短信通知可以由多个不同的消费者并行执行,通知到达的时间不要求先后顺序。
●允许失败和重试
○强一致性的业务放入核心流程处理
○无一致性要求或最终一致即可的业务放入队列处理

1.配置

1.先打开.env文件,将queue下的state改成true(如果.env文件中没有queue配置的话要手动添加)

2.队列必须配置redis,打开.env文件,redis下就是redis的配置

2.消息的消费和删除

1.在app/job目录下创建一个任务消费类
例如:

  <?php
namespace app\job;

use core\base\BaseJob;

/**
 * 订单延时关闭任务
 */
class OrderClose extends BaseJob
{


    /**
     * 消费
     * @param $order_id 订单id
     * @param $close_time 关闭时间
     * @return true
     */
    protected function doJob($order_id, $close_time)
    {
      	//业务代码
      	//......
        return true;
    }
}


3.消息的创建与推送

在业务控制器中新增一个控制器,创建一条消息,并推送到队列
1、新增一个控制器\app\adminapi\controller\test\JobTest.php,在该控制器中添加 helloWorldJob 方法,实例化刚刚创建的任务类,并调用dispatch函数

  <?php
namespace app\adminapi\controller\test;

use app\job\OrderClose;
use core\base\BaseAdminController;
use think\Response;

/**
 * 测试队列任务
 */
class JobTest extends BaseAdminController
{
    /**
     * 创建消息
     * @return Response
     */
    public function helloWorldJob()
    {
        $order_id = 1;//订单id
        $close_time = time();//订单关闭时间
        $is_pushed = OrderClose::dispatch(['order_id' => $order_id, 'close_time' => $close_time]);
        if( $is_pushed !== false ){
            echo date('Y-m-d H:i:s') . " Hello World Job is Pushed.";
        }else{
            echo 'Hello World Job Error.';
        }
    }


}


dispatch函数的参数介绍

  • action 任务消费类内部的方法, 非必填,不填写或类型为数组的话的话默认会调用doJob函数,如果是数组的话则视为当前值为data参数
  • data 数据传参, 他会将这儿的数组作为可变数量的参数传递给对应函数的给定变量参数
  • secs 延时执行时间(单位为秒),默认为0, 支持延时执行和定时执行,如果为0就是普通的异步任务,如果传值则视为当前时间的*秒后延时执行
  • queue_name 当前任务归属的队列名称,如果为新队列,会自动创建,结合命令行使用
  • is_async 默认为true,当前任务是否需要异步执行,最高优先级
    2、新建一个路由配置文件\app\adminapi\route\job.php
<?php
use think\facade\Route;

Route::group('job', function () {
    //获取本地插件
    Route::get('test', 'test.JobTest/helloWorldJob');
});


至此,所有的代码都已准备完毕,在运行消息队列之前,我们先看一下现在的目录结构:

4.发布任务

在浏览器中访问http://your.project.com/adminapi/job/test,可以看到消息推送成功。

5.启动任务

切换当前终端窗口的目录到项目niucloud目录下,执行

php think workerman

可以看到执行的结果类似如下:

通常的,我们需要将消息队列放到守护进程Supervisor常驻内存运行。

至此,我们成功地完成了一个消息的 创建 -> 推送 -> 消费 -> 删除 的流程

特别注意:修改传参后需要重启重启 workerman 在可生效!!!

选择样式

选择布局
选择颜色
选择背景
选择背景