在开发应用程序的时候,你可能需要执行一些任务,例如解析和存储一些大型文件,这些任务在Web 请求期间需要耗费很长时间才能执行完成。 这时候我们就可以用消息队列来创建在后台处理的队列任务。 通过将时间密集型任务移至队列,这样你的应用程序可以以极快的速度响应 Web 请求,并为你的程序提供更好的用户体验
传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。
一般来说,可以抽离的任务具有以下的特点:
●允许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式)
○允许延后:抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务;
○允许异步:业务处理过程中的邮件,短信等通知
○允许并行:用户支付成功之后,邮件通知,微信通知,短信通知可以由多个不同的消费者并行执行,通知到达的时间不要求先后顺序。
●允许失败和重试
○强一致性的业务放入核心流程处理
○无一致性要求或最终一致即可的业务放入队列处理
1.当前队列有三种驱动方式
| 驱动方式 | 说明 |
|---|---|
| sync | 同步执行,有新队列任务则通过事件 Event 来直接触发执行,不存储任务,直接执行 |
| atabase | 数据库存储,新队列任务数据存储到数据库,队列执行程序再从数据库中读取任务数据 |
| redis(推荐) | Redis 存储,新队列任务数据存储到 Redis,队列执行程序再从 Redis 中读取任务数据 |
2.先打开config下的queue.php文件
<?php
return [
'default' => 'database',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
// 驱动方式
'type' => 'redis',
'queue' => 'default',
// 服务器地址
'host' => env('redis.redis_hostname', '127.0.0.1'),
// 端口
'port' => env('redis.port', '6379'),
// 密码
'password' => env('redis.redis_password', ''),
// 缓存有效期 0表示永久缓存
'expire' => 0 ,
// 缓存前缀
'prefix' => 'QUERY',
// 缓存标签前缀
'tag_prefix' => 'QUERY:',
// 数据库 0号数据库
'select' => env('redis.select', 0),
// 服务端主动关闭
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'database',// none 不记录失败任务 database 将失败任务迁移到失败任务表
'table' => 'jobs_failed',
],
];
default值:当前启动的队列驱动
1.在app/job目录下创建一个任务消费类
例如:
<?php
namespace app\job;
use core\base\BaseJob;
/**
* 订单延时关闭任务
*/
class OrderClose extends BaseJob
{
/**
* 消费
* @param $order_id 订单id
* @param $close_time 关闭时间
* @return true
*/
protected function doJob($order_id, $close_time)
{
//业务代码
//......
return true;
}
}
在业务控制器中新增一个控制器,创建一条消息,并推送到队列
1、新增一个控制器\app\adminapi\controller\test\JobTest.php,在该控制器中添加 helloWorldJob 方法,实例化刚刚创建的任务类,并调用dispatch函数
<?php
namespace app\adminapi\controller\test;
use app\job\OrderClose;
use core\base\BaseAdminController;
use think\Response;
/**
* 测试队列任务
*/
class JobTest extends BaseAdminController
{
/**
* 创建消息
* @return Response
*/
public function helloWorldJob()
{
$order_id = 1;//订单id
$close_time = time();//订单关闭时间
$is_pushed = OrderClose::dispatch(['order_id' => $order_id, 'close_time' => $close_time]);
if( $is_pushed !== false ){
echo date('Y-m-d H:i:s') . " Hello World Job is Pushed.";
}else{
echo 'Hello World Job Error.';
}
}
}
dispatch函数的参数介绍
<?php
use think\facade\Route;
Route::group('job', function () {
//获取本地插件
Route::get('test', 'test.JobTest/helloWorldJob');
});
至此,所有的代码都已准备完毕,在运行消息队列之前,我们先看一下现在的目录结构:

在浏览器中访问 http://your.project.com/adminapi/job/test ,可以看到消息推送成功。

切换当前终端窗口的目录到项目niucloud目录下,执行
php think queue:listen
可以看到执行的结果类似如下:

php think queue:listen \
--queue helloJobQueue \ //监听的队列的名称
--delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 \ //该进程允许使用的内存上限,以 M 为单位
--sleep 3 \ //如果队列中无任务,则多长时间后重新检查
--tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60 // work 进程允许执行的最长时间,以秒为单位
这儿需要重点说明的是