Обработка огромных CSV-файлов в Laravel

Обработка огромных CSV-файлов в Laravel

Мне регулярно приходится работать с CSV, состоящими из миллионов строк и десятков полей. Эти файлы могут весить по два-три гигабайта. С такими объемами нужно работать аккуратно и эффективно. Я поделюсь с вами стратегией, которую я разработал для этого за последние пару лет работы.

Используйте CSV-библиотеку

Моя первая и самая важная рекомендация может показаться простой, но, наверняка у всех нас появлялась мысль «Это всего лишь CSV, что там может быть сложного — отпарсю его сам!».

Правильный ответ: это сложно!

Если вы мне не верите, то прочтите книгу «Ложные представления программистов о CSV» и спросите себя: вы правда готовы обработать все эти тонкости? Да я даже о большинстве из них не знал!

Поэтому просто используйте любой подходящий CSV-парсер. Рекомендую League/CSV. Он фантастически удобен, прост в использовании, протестирован и активно поддерживается.

Никогда не доверяйте поставщику данных

CSV-файлы, которые мы загружаем в нашу систему, в основном, экспортируются из других систем, в том числе государственных. Возможно звучит резко «Никогда не доверяй поставщику данных», но будьте уверены, они постоянно будут что-то в них менять. И лучше быть к этому готовым.

Без уведомлений они могут:

  • изменить порядок полей
  • изменить имя поля
  • добавить/удалить поле
  • начать экранировать определенные символы

Мы загружаем данные автоматически по расписанию в нерабочее время. Система получает файл, открывает его и импортирует данные. Это означает, что нам нужно убедиться, что файл имеет правильный формат, прежде чем мы начнём загрузку миллионов строк.

Согласованность заголовков

Если вы используете пакет League/CSV, то не имееет значения порядок полей в исходнике, потому что вы в итоге получите ассоциативный массив на поле. Но имеет значение, если изменяется имя поля или оно вообще удаляется. Потом этом может привести к ошибкам, когда не окажется нужных данных.

Важно выявить эти проблемы, вместо того чтобы пропускать пустые данные.

Мы делаем это следующим образом: настраиваем наши «ожидания» заголовков полей, а если они не совпадают, то генерируем исключение и останавливаем процесс импорта.

Обычно мы получаем их прямо из файла:

protected function validateHeaders($path, $expectedHeaders)
{
    $headersFromFile = trim(shell_exec("head -1 $path"));

    if ($expectedHeaders !== $headersFromFile) {
        Log::error('Expected: ' . $expectedHeaders);
        Log::error('Actual: ' . $headersFromFile);

        throw new Exception("Заголовки не совпадают. Проверьте логи.");
    }
}

Также можно проверить первую запись, возвращаемую объектом Reader.

protected function validateHeaders($reader, $expectedHeaders)
{
    $headersFromFile = array_keys($reader->fetchOne());

    if($expectedHeaders !== $headersFromFile) {
        Log::error('Expected: ' . $this->originalHeaders);
        Log::error('Actual: ' . $headersFromFile);

        throw new Exception("Заголовки не совпадают. Проверьте логи.");
    }
}

Рекомендую проверять заголовки, чтобы убедиться, что вы работаете с нужными данными.

Нет заголовков?

Сложно проверить заголовки, если их вообще нет!

Такое часто бывает. В этом случае мы обрабатываем первую строку данных как заголовок и отслеживаем изменения в ней, как если бы это был заголовок. Большинство данных, которые мы получаем, относительно стабильны, поэтому, хоть мы и получаем изредка ложные исключения, но нам не нужно часто обновлять эти «фиксированные» данные.

Корректное Экранирование

Пакет League/CSV делает бОльшую часть работы за вас, но бывают случаи, когда поставщик данных закодировал данные нестандартным способом. Всегда нужно их проверять, а не загружать слепо в систему.

Недавно мы столкнулись с проблемой, когда поставщик неправильно экранировал символ, из-за чего две записи объединялись в одну. Естественно, что процесс не остановился с ошибкой, ведь откуда библиотеке знать об этом, она просто следуюет CSV-спецификации. Мы обнаружили это только когда попытались вставить полученное значение в базу данных и получили ошибку «слишком длинное значение». Стало понятно, что парсинг работает неправильно.

Способ, которым мы отлавливаем подобное сейчас, немного сложен, но я кратко расскажу о нём.

Первое, что мы делаем при загрузке нового CSV-файла — проверяем сколько записей в нём находится. Раньше мы просто считали количество строк, но в некоторых полях могут быть переносы, поэтому пришлось придумать другой способ. Каждая строка в конкретном файле содержит легко идентифицируемый шаблон, например | 043,2020, и, если мы посчитаем сколько раз он появится, то узнаем сколько записей должно быть.

public function linesInFile($path) 
{
    $pattern = " \| 043,2020,";
    
    //  'E' — включает regex
    //  'a' — заставляет его работать с файлом, как с текстом (которым он и является).
    //  'o' — говорит ему показывать только совпадения, а не всю строку
    return (int)trim(exec("grep -oaE '$pattern' $path | wc -l"));
}

Зная, сколько записей должно быть, мы сравниваем с тем, сколько у нас записей фактически получилось:

public function ensureCsvParsed($path)
{
    $reader = $this->getReader($path);
    $lines = $this->linesInFile($path);

    if ($lines == $reader->count()) {
        // CSV-документ содержит сколько же записей, 
        // сколько и строк в файле.
        return;
    }
    
    // Выясняем, какая строка содержит ошибку...

    throw new Exception('Строка [X] содержит ошибку.');
}

Если эти цифры не совпали, то значит где-то произошла ошибка.

Мы выясняем, в какой строке ошибка путём разделения файла пополам и новых проверок. В итоге получаем номер строки с неверным экранированием.

Я не буду размещать здесь весь код, выполнящий эту работу, но в целом объясню как это делается. Начинаем с самого конца файла и получаем одну строку:

$lineFromFile = exec("sed -n '{$line},{$line}p' $path");

Затем получаем строку которая находится по адресу $line - 2 (вычитаем заголовок и компенсируем нулевой индекс).

$lineFromReader = $reader->fetchOne($line - 2);

Сравниваем эти две строки и проверяем, содержат ли они одинаковые значения. Если да, то ошибка находится ПЕРЕД этой строкой, поэтому переходим на середину файла и пробуем снова. Если они совпадают, то значит мы зашли слишком далеко. Устаналиваем переменную на последнюю «хорошую» строку и прыгаем вперёд.

Повторяем это пока не доберемся до «плохой строки», которая обычно заполнена случайным количеством обратных слешей и кавычек. Поздравляем, вы её нашли!

Потоковая передача для экономии памяти

Если вес файла больше десятка мегабайт, то вам нужно убедиться, что вы используете потоковый режим, а не загружаете всё сразу в память.

Потоковая передача файла означает, что он будет загружаться в память небольшими частями. Это делается довольно просто и значительно увеличивает вашу пропускную способность.

// Загружает всё в память (Не делайте так!)
$reader = Reader::createFromString(file_get_contents('/path/to/my/file.csv'));

// Открывает как поток (Делайте так!)
$reader = Reader::createFromPath('/path/to/my/file.csv', 'r');

К счастью, пакет от PHP League решает все проблемы за нас. Нужно просто убедиться, что вызывается правильный метод.

Даже если ваши файлы не расположены локально — вы всё равно можете использовать потоковый режим.

Например, если они на S3, то вы можете использовать AWS stream wrapper. Вот как можно получить поток с S3 на Laravel:

public function getStream($disk, $path)
{
    // Получаем S3-адаптер.
    $adapter = $disk->getDriver()->getAdapter();

    // Регистрируем обёртку потока s3://
    $adapter->getClient()->registerStreamWrapper();

    // Теперь мы можем использовать протокол s3://
    $path = 's3://' . $adapter->getBucket() . '/' . $path;

    // Возвращаем поток
    return fopen($path, 'r', false, stream_context_create([
        's3' => ['seekable' => true]
    ]));
}

//...

$stream = $this->getStream($disk, $path);
$reader = Reader::createFromStream($stream);

Теперь вы можете работать с S3-файлами чанк за чанком, вместо того чтобы его скачивать.

Обратите внимание, что многие методы в этому уроке работают только с локальными файлами. Поэтому, в случае использования S3, вам нужно загрузить файл к себе и работать с ним локально.

Постраничная обработка

Если у вас пара миллионов строк, которые вы пытаетесь обработать за разумное время, то вы обнаружите, что ограничены попыткой прочесть весь файл по строке за раз в одном потоке.

Вместо этого возьмите большую страницу в 50.000-100.000 записей и обрабатывайте её в отдельном процессе. Вы можете создать несколько процессов для параллельной обработки нескольких страниц, а не работать с ними последовательно.

Это делается с помощью одной команды, отвечающей за создание «дочерних» команд для импорта отдельных страниц:

public function importFile($path)
{
    // Полностью зависит от вас и ваших требований
    $maxProcesses = 10;
    $perPage = 50000;

    $pages = ceil($this->linesInFile($path) / $perPage);
    $page = 1;

    $processes = [];

    while ($page <= $pages || count($processes)) {
        $processes = $this->getRunningProcesses($processes);

        // Если есть место для запуска еще одного процесса 
        // и нам нужно импортировать еще одну страницу, 
        // то запускаем новый процесс
        if ($page <= $pages && count($processes) < $maxProcesses) {
            $command = "php artisan import:page $page $path";

            // Выполняем команду в фоновом режиме
            exec($command . ' > /dev/null 2>&1 & echo $!', $processes);

            $page++;
        }

        sleep(10);
    }
}

А затем, в «дочернем» процессе, вы можете воспользоваться операторами для получения записей конкретной страницы:

public function records($reader)
{
    return (new Statement)
        ->offset($this->page * $this->perPage)
        ->limit($this->perPage)
        ->process($reader);
}

Используя схему постраничной обработки, вы контролируете скорость и пропускную способность, опираясь на имеющиеся у вас системные ресурсы.

Обработка через Задачи, Команды и Демоны

Где лучше обрабатывать CVS-файлы — вопрос пока открытый. Но у меня есть несколько советов.

Задачи

Выполнение обработки внутри очереди. Самый простой способ. Запустил и забыл. Идеально подходит для небольшого импорта, иницированного дискретным действием, например загрузка файла пользователем.

Но есть недостатки. Первый — тайм-аут задачи. Если импорт огромный, то вы быстро превысите дефолтный 90-секундный тайм-аут для задач в очереди. Естественно, можно увеличить long или import, но, скорей всего вы снова упретесь в ограничитель. Например, некоторые из наших импортов работают по несколько часов.

Возможно это удастся обойти, если разделять длительные задачи на мелкие, но тогда придется позаботиться о координации между ними. Как узнать какой импорт уже завершён? Возможно, здесь может помочь новое пакетирование задач из Laravel 8, но у меня пока нет такого опыта.

Команды

Если импортируемые данные доступны программно, например через API или лежат на FTP, то вы можете запланировать команду, которая возьмёт их и начнёт обработку. Это удобно, ведь вам не нужно беспокоиться о тайм-аутах. Обязательно вызывайте через ->runInBackground()->withoutOverlapping(), если планируемое время работы исчисляется часами.

Этот метод мы используем для ночного импорта самых больших файлов. Днём заливаем файлы на S3, а ночью команда просыпается и смотрит, есть ли для неё работа.

Всё прекрасно работает, нет нужды беспокоиться о тайм-аутах, мы можем одной «родительской» командой порождать несколько дочерних процессов и получать параллельный импорт.

Демоны

Как вы знаете, демон — это просто процесс, который выполняется в фоновом режиме, а не тот, кто находится под прямым контролем пользователя. Команда horizon:work в Laravel обычно настраивается как демон.

Использование демона для импорта удобно по нескольким причинам. Во-первых, они, как и команды, не имеют тайм-аута. Во-вторых — это то, чего не хватает командам — демон может начать работать немедленно.

Если ваш импорт основан на файлах, закачиваемых пользователем, то вам скорей всего не захочется ждать полуночи, чтобы начать его импортировать. Фактически, даже ежеминутная проверка это недостаточно быстро. Пользователи хотят видеть немедленную реакцию.

Для пользовательского импорта мы держим демона, который ждёт новые необработанные данные в базе данные. Как только он их видит, то начинает обработку.

Если вы реализуете подобную стратегию, то убедитесь, что используете блокировку, иначе два демона могут начать импортировать один тот же документ, что приведёт к дублированию данных.

public function handleNextImport()
{
    // Убедимся, что мы единственный процесс, который ищет новый импорт
    $handler = Cache::lock('selectNextImport')->get(function () {
        $import = Import::processable()->first();
        
        if (!$import) {
            return;
        }

        $import->markAsProcessing();

        return $import->makeHandler();
    });

    optional($handler)->handle();
}

По правде говоря, мы не используем настоящего демона, потому что его настройка это такой гемморой. Мы запускаем псевдо-демона, о чём я писал в статье «Псевдо-демоны Laravel».

Очистка данных

Все ваши данные будут представлены в виде строк, потому что так работает CSV. Скорей всего вы не захотите хранить данные в таком формате, а приведете их к нужным примитивным типам. Я расскажу вам о некоторых проблемах, с которыми я столкнулся при этом.

Удаление лишних пробелов

Первое, что я всегда делаю — utf8_encode и избавляюсь от лишних пробелов. Кодировка UTF-8 убирает некоторые нестандартные значения, которые могут присутствовать в тексте, например \x00.

На этом этапе я также конвертирую любые пустые строки в null, чем они по сути и являются.

public function clean($record)
{
    foreach ($record as $key => $value) {
        $value = utf8_encode($value);
        $value = trim($value);
        $value = $value === '' ? null : $value;

        $record[$key] = $value;
    }

    return $record;
}

Теперь, когда данные более-менее достоверны, можем перейти к приведению их к правильным примитивам.

Булев (Логический тип)

Если ваше поле true/false, то вы должны обратить внимание на:
yes / y / Y / YES
no / n / NO / N
true / TRUE
false / FALSE
1
0
(пусто)

Числа

Тут приведение зависит от того, должно ли быть ваше число целым или с плавающей запятой. Но в любом случае нужно удалить запятые (,) и индикаторы валют ($).

Если вам нужны только целые числа, то я удаляю десятичные дроби, а затем использую функцию digits_only.

function digits_only($val, $replacement = '')
{
    return preg_replace('/\D/', $replacement, $val);
}

$value = explode('.', $value);
$value = head($value);
$value = digits_only($value);

Если нужно обрабатывать как целые числа, так и с плавающей запятой, то вы можете удалять запятые и валютные индикаторы, а затем воспользоваться простым приведением PHP-типов.

$value = 0 + $value;

Если $value это строка float (123.45), то мы и получим float, а если это строка int (123), то получим int.

Даты

Надеемся, что формат дат будет одинаков во всем файле данных. Если это так, то используем метод createFromFormat из Carbon.

public function date($record, $key, $format)
{
    if (Arr::has($record, $key)) {
        $record[$key] = Carbon::createFromFormat($format, $record[$key]);
    }

    return $record;
}

Если Carbon не может созать дату из полученной строки, то он выбросит исключение, что нам и нужно. Лучше получить явную ошибку, чем неверные данные.

EOF

CSV просто фантастический формат для переноса данных. Почти каждая система может экспортировать в него. И, если знать несколько тонкостей, то импорт этих данные тоже пройдет без проблем.

Автор: Aaron Francis
Перевод: Алексей Широков

Наш Телеграм-канал — следите за новостями о Laravel.