Миграция данных из Parquet в Elasticsearch с помощью Apache NiFi
В предыдущих статьях Практическое введение в Apache NiFi и Миграция данных из MariaDB в Parquet с помощью Apache NiFi были рассмотрены базовые концепции NiFi и практический сценарий batch‑выгрузки данных из OLTP БД в аналитический формат Parquet.
В этой статье продолжим логическую цепочку и разберем следующий шаг типового data‑pipeline: загрузка данных из Parquet‑файлов в Elasticsearch с использованием Apache NiFi.
Сценарий полностью batch‑ориентированный и подходит для:
- первичной инициализации индекса;
- полной переиндексации;
- восстановления Elasticsearch из data lake или файлового хранилища.
Теоретические основы Apache NiFi, FlowFile и Controller Services здесь не повторяются.
Общая схема потока
Используется минимальный и наглядный набор процессоров:
[ListFile]
|
v
[FetchFile]
|
v
[ConvertRecord]
|
v
[PutElasticsearchRecord]
Где:
- ListFile - обнаруживает Parquet‑файлы в каталоге;
- FetchFile - читает содержимое файла;
- ConvertRecord - преобразует Parquet в JSON с явной схемой;
- PutElasticsearchRecord - индексирует записи в Elasticsearch.
Каждый Parquet‑файл обрабатывается как независимый batch.
Исходные данные
На вход подаются Parquet‑файлы, полученные на предыдущем этапе пайплайна (из MariaDB), со следующей логической структурой:
| Поле | Тип данных |
|---|---|
| Id | int |
| Content | string |
На этом этапе появляется важный нюанс интеграции с Elasticsearch, который необходимо учитывать при проектировании потока.
Elasticsearch чувствителен к типам полей, особенно если поле используется как _id документа.
Если поле Id остаётся числовым (int):
- Elasticsearch интерпретирует его иначе, чем строковый
_id; - при повторной загрузке данные не перезаписываются;
- каждый запуск создаёт новые документы.
Чтобы обеспечить идемпотентность загрузки, Id необходимо привести к типу string до записи в Elasticsearch.
Для этого используется процессор ConvertRecord с кастомной схемой.
Настройка процессоров
Настройка процессоров ListFile и FetchFile осуществляется аналогично как в статье Практическое введение в Apache NiFi.
ListFile
Назначение: обнаружение новых Parquet‑файлов. ListFile гарантирует, что каждый файл будет обработан только один раз.
Ключевые настройки:
- Input Directory:
/result/article_documents - Listing Strategy:
Tracking Timestamps - Entity Tracking State Cache:
MapCacheClientService(требуется настройка общего ресурса - локального сервераMapCacheServerс настройками по умолчанию) - File Filter:
(.*)\.parquet
FetchFile
Назначение: загрузка содержимого файла, ранее обнаруженного ListFile, в FlowFile content.
Ключевая настройка:
- File to Fetch:
${absolute.path}/${filename}
ConvertRecord
Назначение:
- преобразование Parquet в JSON;
- изменение типа данных поля
Idсintнаstring.
Используемые сервисы:
- Record Reader -
ParquetReader; - Record Writer -
JsonRecordSetWriter.
Кастомная схема JsonRecordSetWriter
{
"type": "record",
"name": "MyRecord",
"fields": [
{ "name": "Id", "type": "string" },
{ "name": "Content", "type": "string" }
]
}
ConvertRecord в этом сценарии выступает как явная точка контроля схемы и типов данных.
PutElasticsearchRecord
Назначение: запись records в Elasticsearch.
Ключевые настройки:
- Index Operation:
index - Index:
test_article - Record Reader:
JsonTreeReader - ID Record Path:
/Id - Client Service -
ElasticSearchClientServiceImpl;
Параметр ID Record Path используется для сопоставления record с _id документа в Elasticsearch и обеспечивает корректное обновление данных.
Controller Services
ElasticSearchClientServiceImpl
Используется стандартный клиент NiFi для подключения к Elasticsearch.
Минимальные параметры:
- Elasticsearch Hosts:
http://elasticsearch:9200; - Authorization Scheme:
Basic - Username:
elastic - Password:
elastic_password
В статье используется Elasticsearch 8, который запущен в docker со следующей конфигурацией:
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.19.9
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=elastic_password
- xpack.security.http.ssl.enabled=false
- xpack.security.transport.ssl.enabled=false
- ES_JAVA_OPTS=-Xms1g -Xmx1g
ports:
- "9200:9200"
volumes:
- esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
networks:
- elastic
kibana:
image: docker.elastic.co/kibana/kibana:8.19.9
container_name: kibana
depends_on:
- elasticsearch
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=kibana_password
ports:
- "5601:5601"
networks:
- elastic
volumes:
esdata:
networks:
elastic:
name: elastic
driver: bridge
Для связи Apache NiFi и Elasticsearch необходимо также добавить общую сеть в сервисы nifi и elasticsearch, либо объединить их в единый docker-compose.yml.
Проверка результата
После запуска потока:
- Parquet‑файлы читаются ровно один раз;
- каждая запись преобразуется в JSON‑документ;
- поле
Idиспользуется как_idв Elasticsearch; - при повторном запуске данные перезаписываются, а не дублируются.
Для проверки результата можно использовать Kibana по адресу http://127.0.0.1:5601/.
Методические замечания
- ConvertRecord - обязательная точка контроля схемы;
- NiFi особенно хорошо подходит для batch‑переиндексации, а для streaming‑сценариев лучше использовать Kafka‑ориентированные решения.
Заключение
Apache NiFi позволяет без написания кода реализовать надежную batch‑загрузку данных из Parquet‑файлов в Elasticsearch.
Apache NiFi в данном сценарии выступает как:
- транспортный слой;
- преобразователь форматов;
- контроллер идемпотентной загрузки данных.
Использование явной схемы и управления типами данных является ключевым фактором корректной интеграции Parquet и Elasticsearch, особенно при повторных загрузках и переиндексации.