【Laravel】キューを必要なだけ非同期実行する

 Laravel にはバックグラウンドで自動で処理を実行するためのキューという仕組みがあります。キューはデータベースなどどこかしらに実行すべき処理を示すジョブが貯められ、その処理を実行するプロセスであるワーカーがジョブを順次実行していきます。
キュー 6.x Laravel
 このワーカーは一度に一つのジョブしか実行しません。複数のジョブを実行するためには複数のワーカーを立ち上げる必要があります。一方でワーカーはジョブが貯まっているかを監視します。ワーカーが多いほど何かしらのI/O(入出力)処理が増えます。特に監視間隔が短い場合は無視できません。必要な時に必要なだけワーカーを走らせる仕組みが欲しくなります。
 必要な時に必要なだけワーカーを走らせる仕組みを実現したライブラリはいくつかあります。例えば、laravel-async-queueです。
barryvdh/laravel-async-queue: Laravel Async Queue Driver
 このライブラリは上述の必要な時に必要なだけワーカーを走らせる仕組みを提供してくれます。

 この記事ではこの必要な時に必要なだけワーカーを走らせる仕組みの根本を紹介します。
 作るのはジョブを監視し、ワーカーを別プロセスとして走らせるキューマネージャ―ともいうべきコマンドです。ジョブを監視するプロセスはキューマネージャー唯一つのためI/Oを気にする必要はありません。行うべきジョブをキューマネージャーが発見した場合、都度別プロセスでワーカーが走るため常に複数のジョブ(何らかのすべき処理を行うプロセス)を実行できます。

<?php


namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Models\Eloquents\Job;
use Illuminate\Support\Collection;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Process\Process;

class AsyncJobWorkerManager extends BaseCommand
{
    protected $name = 'queue:async-worker';
    protected $description = '非同期で queue:work を走らせる';
    protected Collection $runningWorkers;

    protected function getOptions()
    {
        return [
            ['max', 'm', InputOption::VALUE_OPTIONAL, '同時に走る worker の最大数']
        ];
    }

    public function handle(): void
    {
        // 今動作しているワーカーを保持しておく Collection
        $this->runningWorkers = new Collection([]);
        // 同時に走らせるワーカーの最大数
        $maxWorkerCount = +$this->option('max') ?: 100;
        while (true) {
            // 走っているワーカーだけを保持する(止まったワーカーは捨てる)
            $this->runningWorkers = $this->runningWorkers
                ->filter(static fn(Process $worker) => $worker->isRunning());
            // 現在実行中のワーカー数が同時に走らせるワーカー数を超えておらず, 未試行の Job を見つけたら実行
            if ($this->runningWorkers->count() < $maxWorkerCount && Job::whereAttempts(0)->count() > 0) {
        //  一つのジョブを行ったら消えるワーカーコマンドを非同期実行
                $worker = new Process(['php', 'artisan', 'queue:work', 'database', '--once']);
                $worker->setTimeout(null);// タイムアウトなし
                $worker->start(fn($msg) => \Log::info($msg));// ワーカー内での出力はログファイル送り
                // 保持するワーカーに新たなワーカーを追加
                $this->runningWorkers->push($worker);
            }
            // 処理を休憩(秒)
            sleep(5);
        }
    }
}

 これで監視用プロセスを一つに絞ったまま非同期でワーカーを実行できます。このコードを元に色々と拡張することで要件にあった処理を実現できます。例えば上記コードは 5 秒に 1 つのワーカーしか走らせないコードです。場合によっては 1 ループで複数ワーカーを走らせたり、監視間隔を短くする必要があるかもしれません。サブプロセスとして php コマンドを動かしていますが docker run をかませる必要がるかもしれません。

>株式会社シーポイントラボ

株式会社シーポイントラボ

TEL:053-543-9889
営業時間:9:00~18:00(月〜金)
住所:〒432-8003
   静岡県浜松市中央区和地山3-1-7
   浜松イノベーションキューブ 315
※ご来社の際はインターホンで「316」をお呼びください

CTR IMG