Оптимизация пайплайна с Apache Iceberg и S3: как ускорить обработку Parquet в 60 раз | Кейс Data Engineering

Оптимизация пайплайна с Apache Iceberg и S3: как ускорить обработку Parquet в 60 раз | Кейс Data Engineering

Обзоры

Исходная задача и контекст

Перед нами стояла типичная для дата‑инженеров задача: обработать поток Parquet‑файлов с данными о внутренних технических процессах заказчика. Ключевой запрос — извлечь метаданные из таблицы, чтобы в дальнейшем работать с ними быстрее и удобнее.

В качестве основного инструмента мы выбрали Apache Iceberg — проверенный в наших проектах фреймворк для управления метаданными. Изначально пайплайн выглядел так:

▪️ Парсинг и сохранение метаданных в Postgres БД.

▪️ Выгрузка файлов в память (порциями размером row_group).

▪️ Разделение файлов на «чанки».

▪️ Добавление колонки file_name для идентификации источника.

▪️ Запись и регистрация файлов в Iceberg.

▪️ Пометка файлов как прочитанных.

На первый взгляд — стандартная схема. Но при масштабировании вскрылись серьёзные проблемы.

Проблемы первой реализации

Когда мы перешли к параллельным загрузкам, Iceberg начал выдавать ошибки. В документации предлагалось решение через retry (optimistic concurrency) — повторные попытки записи при конфликтах. Однако в Python‑библиотеке (PyIceberg) эта функциональность отсутствовала в нужной форме(время обработки не соответствовало ожиданиям из-за того, что система тратила ресурсы на излишние операции загрузки данных в S3 при ошибках) и нам пришлось реализовывать её самостоятельно.

Поиск решения: разделение операций

Мы решили перестроить процесс, разделив два этапа:

▪️ Загрузка данных (в нашем случае через PyArrow).

▪️ Обновление метаданных (через PyIceberg).

Как это работает:

▪️ Файлы отдельно записываются в S3.

▪️ PyIceberg занимается исключительно регистрацией изменений в метаданных: проверяет уникальность, обновляет ссылки, но не трогает сами файлы.

__________________________________________________________________________________________________________
from pyiceberg.io.pyarrow import parquet_files_to_data_files

.
.
.


table = catalog.load_table((self.namespace, table_name))

data_file = next(
	parquet_files_to_data_files(
 io=table.io,
 file_paths=iter([file_path]),
 table_metadata=table.metadata,
	)
)

.
.
.

@retry(
 wait=wait_random_exponential(multiplier=0.5, max=2),
 stop=stop_after_attempt(20),
 reraise=True,
 before_sleep=_log_retry,
)
def _add_data_file_with_retry(
 self, table: IcebergTable, data_file: DataFile
) -> None:
	table.refresh()
 with table.transaction() as transaction:
 transaction.update_snapshot().fast_append().append_data_file(
 data_file
 ).commit()

_add_data_file_with_retry(table=table, data_file=data_file)
__________________________________________________________________________________________________________

Отметим, при создании таблицы и добавлении данных pyiceberg формируются две логические «папки»:

▪️ metadata/ — хранит метаинформацию (схемы, партиции, версии);

▪️ data/ — содержит файлы в S3, но файлы могут находиться в любом месте S3‑бакета (даже в другом бакете). Iceberg лишь фиксирует их пути в метаданных.

Например:

«Файл data_part_1.parquet лежит по пути s3://bucket/raw/2024/, и в нём содержатся данные за январь 2024 года».

Это даёт гибкость: можно не перемещать файлы внутри S3 — достаточно обновить метаданные.

Новая схема: минимум операций, максимум скорости

Мы отказались от громоздкой схемы с выгрузкой/загрузкой и перешли к упрощённому пайплайну:

▪️ Парсинг и сохранение метаданных в Postgres БД (как и раньше).

▪️ Регистрация файлов в Iceberg с указанием их текущих путей в S3.

Что изменилось:

▪️ Исчезла необходимость выгружать файлы из S3 для обработки.

▪️ Iceberg анализирует метаданные напрямую в бакете, без перемещения данных.

▪️ Операции выполняются за доли секунды — основное время уходит на межсервисное взаимодействие.

После внедрения новой схемы:

▪️ Скорость обработки выросла в разы: операция, занимавшая минуты, теперь выполняется за полсекунды.

▪️ Нагрузка на инфраструктуру упала: нет избыточных перемещений данных, память и сеть не перегружены.

▪️ Гибкость системы повысилась: файлы можно не перемещать внутри S3, а Iceberg автоматически актуализирует ссылки.

▪️ Масштабируемость улучшилась: система готова к росту объёмов без срочного апгрейда железа.

Этот кейс показывает, что иногда решение проблемы лежит не в наращивании ресурсов, а в пересмотре архитектуры. Связка Iceberg + S3 позволила:

▪️ сократить время обработки данных;

▪️ снизить затраты на инфраструктуру;

▪️ оставить задел для будущего роста.

Теперь система работает быстрее, потребляет меньше ресурсов и остаётся понятной для поддержки.

Читать также