【PHP】1つのCSVファイルの中身をデータベースへインポートする処理を並列化する方法

  • 2022年8月19日
  • PHP

 この記事で紹介する方法はいささかトリッキーな構造のコードになるため、これよりも先にアルゴリズムや実行する SQL 高速にしておく、リクエストと独立して非同期で行う様にしてユーザーが待つのを辛くなくす、などの方法を先に検討した方がよいかと思います。

 紹介するのは PHP でしばしばあるCSVファイルの中身をデータベースにインポートする際、それを高速化する方法です。ここではこの高速化を並列化によって実現します。この並列化のざっくばらんな流れは次です。

  1. いくらかの固まった行単位でファイルを分割する
  2. 分割したそれぞれのファイルについて別々のプロセスで処理

分割は割と曲者で、セル内の改行と分割用の区切りが重なるとデータが壊れます。このためこの記事の手法はセル内に改行が現れない形式のCSVにのみ使うべきです。

 実際のコードが次です。

<?php

/**
 * parent.php
 * 並列実行用の親ファイル
 * ファイルを受入、分割し、子の INSERT 用プログラムに渡し、子の管理をします。
 */

$start = microtime(true);
require_once __DIR__.'/vendor/autoload.php';

// 処理対象のCSVファイル
$filePath = __DIR__.'/tmp/x-ken-all.csv';
// ファイルを 5000 行ごとに分割
$dir   = dirname($filePath);
$fname = basename($filePath);
// 分割後のファイルは $prefix. "-00" , $prefix. "-01", ... となる
$prefix = base64_encode(random_bytes(6)).'-';
// linux の split コマンドを用いて行単位での分割を行う
// これを作った環境が特殊でカレントディレクトリを変えてから名前を指定する形
// 環境次第では単に絶対パスでファイルを指定してもOK
exec("cd {$dir} && split -l 5000 -d \"{$fname}\" {$prefix}");

// 分割後の各CSVファイルのフルパスを取得
$tgtCsvList = glob("{$dir}/{$prefix}*");

// CSVファイル処理をするPHPスクリプトを分割した各ファイルについて起動
$processList = [];
foreach ($tgtCsvList as $csv) {
    $p = new \Symfony\Component\Process\Process(['php', __DIR__.'/child.php', $csv]);
    $p->start(static fn ($type, $msg) => print($msg));
    $p->setTimeout(null);
    $processList[] = $p;
}
// 分割したファイルに対して同時に走っているスクリプトが全て終わるのを待つ
// ここでは貪欲に全てを同時に実行しています。マシンリソースと対象次第ではメモリ等が枯渇します。
// 実際はプロセス数制限を付けるなどして安全にした方がよいです。
foreach ($processList as $p) {
    $p->wait();
}

// 終わったら分割したファイルを削除
foreach ($tgtCsvList as $path) {
    unlink($path);
}

echo(microtime(true) - $start)."sec\n";
<?php

/**
 * child.php
 * INSERT用のファイルです。
 * よくある CSV を読んでバルクインサートするアレで、特に工夫はありません。
 */

require_once __DIR__.'/vendor/autoload.php';

// x-ken-all.csv を MySQL に INSERT するコード例

$path = $argv[1];
$spf  = new SplFileObject($path);
$spf->setFlags(SplFileObject::READ_CSV);
$insert = [];
$dotenv = \Dotenv\Dotenv::createImmutable('./');
$dotenv->load();
$pdo = new PDO(
    "mysql:dbname={$_ENV['DB_DATABASE']};host={$_ENV['DB_HOST']}:{$_ENV['DB_PORT']}",
    $_ENV['DB_USERNAME'],
    $_ENV['DB_PASSWORD'],
);
$sql = $sqlInit = 'INSERT INTO x_ken_all (
public_code,
zip_old,
zip,
pref_kana,
city_kana,
area_kana,
pref_kanji,
city_kanji,
area_kanji,
address_kanji,
has_many_zip,
has_detail_banchi,
has_choume,
is_zip_to_many_area,
state_of_update,
reason_of_change
) VALUES
';
$placeholder = [];
$i           = 0;
foreach ($spf as $row) {
    if(empty($row) || $row[0] == null){
        continue;
    }
    $sql .= '(';
    if (count($row) < 16) {
        for ($i = count($row); $i < 16; ++$i) {
            $row[$i] = 0;
        }
    }
    foreach ($row as $cell) {
        $placeholder[] = $cell;
        $sql .= '?,';
    }
    $sql = rtrim($sql, ',');
    $sql .= '),';

    ++$i;
    if ($i % 100 === 0) {
        echo $i."\n";
        $sql = rtrim($sql, ',').';';
        $pdo->prepare($sql)->execute($placeholder);
        $sql         = $sqlInit;
        $placeholder = [];
    }
}
$sql = rtrim($sql, ',').';';
$pdo->prepare($sql)->execute($placeholder);

 この方法は巨大ファイルに対して高速化が可能な点で便利ですが、マシンのメモリを食いつくしやすくもあり、そこは注意が必要です。PHPの1プロセスに割り当てられているメモリの最大量とマシンのメモリの量と相談して親プログラム側で同時実行する数を制御した方が安全です。

>株式会社シーポイントラボ

株式会社シーポイントラボ

TEL:053-543-9889
営業時間:9:00~18:00(月〜金)
住所:〒432-8003
   静岡県浜松市中央区和地山3-1-7
   浜松イノベーションキューブ 315
※ご来社の際はインターホンで「316」をお呼びください

CTR IMG