开篇
日常开发使用队列的场景不少了吧,至于如何使用,我想文档已经写的很清楚了,毕业一年多了,七月份换一家新公司的时候开始使用 Laravel,因为项目中场景经常使用到 Laravel 中的队列,结合自己阅读的一丝队列的源码,写了这篇文章。(公司一直用的 5.5 所以文章的版本你懂的。)
也不知道从哪讲起,那就从一个最基础的例子开始吧。创建一个最简单的任务类 SendMessage。继承 Illuminate\Contracts\Queue\ShouldQueue 接口。
简单 demo 开始
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Support\Facades\Log; class SendMessage implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $message; /** * Create a new job instance. * * @return void */ public function __construct($message) { $this->message = $message; } /** * Execute the job. * * @return void */ public function handle() { Log::info($this->message); } }
这里直接分发一个任务类到队列中,并没有指定分发到哪个队列,那么会直接分发给默认的队列。直接从这里开始分析吧。
public function test() { $msg = '吴亲库里'; SendMessage::dispatch($msg); }
首先 SendMessage 并没有 dispatch 这个静态方法, 但是它 use dispatchable 这样的 Trait 类,我们可以点开 dispatchable 类查看 dispatch 方法。
trait Dispatchable { /** * Dispatch the job with the given arguments. * * @return \Illuminate\Foundation\Bus\PendingDispatch */ public static function dispatch() { return new PendingDispatch(new static(...func_get_args())); } /** * Set the jobs that should run if this job is successful. * * @param array $chain * @return \Illuminate\Foundation\Bus\PendingChain */ public static function withChain($chain) { return new PendingChain(get_called_class(), $chain); } }
可以看到在 dispatch 方法中 实例化另一个 PendingDispatch 类,并且根据传入的参数实例化任务 SendMessage 作为 PendingDispatch 类的参数。我们接着看,它是咋么分派任务?外层控制器现在只调用了 dispatch, 看看 PendingDispatch 类中有什么
<?php namespace Illuminate\Foundation\Bus; use Illuminate\Contracts\Bus\Dispatcher; use Illuminate\Support\Facades\Log; class PendingDispatch { /** * The job. * * @var mixed */ protected $job; /** * Create a new pending job dispatch. * * @param mixed $job * @return void */ public function __construct($job) { $this->job = $job; } /** * Set the desired connection for the job. * * @param string|null $connection * @return $this */ public function onConnection($connection) { $this->job->onConnection($connection); return $this; } /** * Set the desired queue for the job. * * @param string|null $queue * @return $this */ public function onQueue($queue) { $this->job->onQueue($queue); return $this; } /** * Set the desired connection for the chain. * * @param string|null $connection * @return $this */ public function allOnConnection($connection) { $this->job->allOnConnection($connection); return $this; } /** * Set the desired queue for the chain. * * @param string|null $queue * @return $this */ public function allOnQueue($queue) { $this->job->allOnQueue($queue); return $this; } /** * Set the desired delay for the job. * * @param \DateTime|int|null $delay * @return $this */ public function delay($delay) { $this->job->delay($delay); return $this; } /** * Set the jobs that should run if this job is successful. * * @param array $chain * @return $this */ public function chain($chain) { $this->job->chain($chain); return $this; } /** * Handle the object's destruction. * * @return void */ public function __destruct() { app(Dispatcher::class)->dispatch($this->job); } }
这里查看它的构造和析构函数。好吧从析构函数上已经能看出来,执行推送任务的在这里,app(Dispatcher::class) 这又是什么鬼?看来还得从运行机制开始看。Laravel 底层提供了一个强大的 IOC 容器,我们这里通过辅助函数 app() 的形式访问它,并通过传递参数解析出一个服务对象。这里我们传递 Dispatcher::class 得到的是一个什么服务?这个服务又是在哪里被注册进去的。让我们把目光又转移到根目录下的 index.php 文件。因为这篇文章不是说运行流程,所以一些流程会跳过。
注册服务
/* |-------------------------------------------------------------------------- | Run The Application |-------------------------------------------------------------------------- | | Once we have the application, we can handle the incoming request | through the kernel, and send the associated response back to | the client's browser allowing them to enjoy the creative | and wonderful application we have prepared for them. | */ $kernel = $app->make(Illuminate\Contracts\Http\Kernel::class); $response = $kernel->handle( $request = Illuminate\Http\Request::capture() );
这个服务实际上执行了 handle 方法之后才有的 (别问我为什么,如果和我一样笨的话,多打断点😂), 这里的 $kernel 实际上得到的是一个 Illuminate\Foundation\Http\Kernel 类,让我们进去看看这个类里面的 handle 方法。
/** * Handle an incoming HTTP request. * * @param \Illuminate\Http\Request $request * @return \Illuminate\Http\Response */ public function handle($request) { try { $request->enableHttpMethodParameterOverride(); $response = $this->sendRequestThroughRouter($request); } catch (Exception $e) { $this->reportException($e); $response = $this->renderException($request, $e); } catch (Throwable $e) { $this->reportException($e = new FatalThrowableError($e)); $response = $this->renderException($request, $e); } $this->app['events']->dispatch( new Events\RequestHandled($request, $response) ); return $response; }