OpenForm от abgroup.tech — система автоматизации отчётности | Реестр ПО
Оптимизация пайплайна с Apache Iceberg и S3: как ускорить обработку Parquet в 60 раз | Кейс Data Engineering
Исходная задача и контекст
Перед нами стояла типичная для дата‑инженеров задача: обработать поток Parquet‑файлов с данными о внутренних технических процессах заказчика. Ключевой запрос — извлечь метаданные из таблицы, чтобы в дальнейшем работать с ними быстрее и удобнее.
В качестве основного инструмента мы выбрали Apache Iceberg — проверенный в наших проектах фреймворк для управления метаданными. Изначально пайплайн выглядел так:
▪️ Парсинг и сохранение метаданных в Postgres БД.
▪️ Выгрузка файлов в память (порциями размером row_group).
▪️ Разделение файлов на «чанки».
▪️ Добавление колонки file_name для идентификации источника.
▪️ Запись и регистрация файлов в Iceberg.
▪️ Пометка файлов как прочитанных.
На первый взгляд — стандартная схема. Но при масштабировании вскрылись серьёзные проблемы.
Проблемы первой реализации
Когда мы перешли к параллельным загрузкам, Iceberg начал выдавать ошибки. В документации предлагалось решение через retry (optimistic concurrency) — повторные попытки записи при конфликтах. Однако в Python‑библиотеке (PyIceberg) эта функциональность отсутствовала в нужной форме(время обработки не соответствовало ожиданиям из-за того, что система тратила ресурсы на излишние операции загрузки данных в S3 при ошибках) и нам пришлось реализовывать её самостоятельно.
Поиск решения: разделение операций
Мы решили перестроить процесс, разделив два этапа:
▪️ Загрузка данных (в нашем случае через PyArrow).
▪️ Обновление метаданных (через PyIceberg).
Как это работает:
▪️ Файлы отдельно записываются в S3.
▪️ PyIceberg занимается исключительно регистрацией изменений в метаданных: проверяет уникальность, обновляет ссылки, но не трогает сами файлы.
__________________________________________________________________________________________________________ from pyiceberg.io.pyarrow import parquet_files_to_data_files . . . table = catalog.load_table((self.namespace, table_name)) data_file = next( parquet_files_to_data_files( io=table.io, file_paths=iter([file_path]), table_metadata=table.metadata, ) ) . . . @retry( wait=wait_random_exponential(multiplier=0.5, max=2), stop=stop_after_attempt(20), reraise=True, before_sleep=_log_retry, ) def _add_data_file_with_retry( self, table: IcebergTable, data_file: DataFile ) -> None: table.refresh() with table.transaction() as transaction: transaction.update_snapshot().fast_append().append_data_file( data_file ).commit() _add_data_file_with_retry(table=table, data_file=data_file) __________________________________________________________________________________________________________
Отметим, при создании таблицы и добавлении данных pyiceberg формируются две логические «папки»:
▪️ metadata/ — хранит метаинформацию (схемы, партиции, версии);
▪️ data/ — содержит файлы в S3, но файлы могут находиться в любом месте S3‑бакета (даже в другом бакете). Iceberg лишь фиксирует их пути в метаданных.
Например:
«Файл data_part_1.parquet лежит по пути s3://bucket/raw/2024/, и в нём содержатся данные за январь 2024 года».
Это даёт гибкость: можно не перемещать файлы внутри S3 — достаточно обновить метаданные.
Новая схема: минимум операций, максимум скорости
Мы отказались от громоздкой схемы с выгрузкой/загрузкой и перешли к упрощённому пайплайну:
▪️ Парсинг и сохранение метаданных в Postgres БД (как и раньше).
▪️ Регистрация файлов в Iceberg с указанием их текущих путей в S3.
Что изменилось:
▪️ Исчезла необходимость выгружать файлы из S3 для обработки.
▪️ Iceberg анализирует метаданные напрямую в бакете, без перемещения данных.
▪️ Операции выполняются за доли секунды — основное время уходит на межсервисное взаимодействие.
После внедрения новой схемы:
▪️ Скорость обработки выросла в разы: операция, занимавшая минуты, теперь выполняется за полсекунды.
▪️ Нагрузка на инфраструктуру упала: нет избыточных перемещений данных, память и сеть не перегружены.
▪️ Гибкость системы повысилась: файлы можно не перемещать внутри S3, а Iceberg автоматически актуализирует ссылки.
▪️ Масштабируемость улучшилась: система готова к росту объёмов без срочного апгрейда железа.
Этот кейс показывает, что иногда решение проблемы лежит не в наращивании ресурсов, а в пересмотре архитектуры. Связка Iceberg + S3 позволила:
▪️ сократить время обработки данных;
▪️ снизить затраты на инфраструктуру;
▪️ оставить задел для будущего роста.
Теперь система работает быстрее, потребляет меньше ресурсов и остаётся понятной для поддержки.
.jpg&w=1024&q=75)