Миграция данных из Parquet в Elasticsearch с помощью Apache NiFi

Иван Гурин Технологии интеграции информационных систем 7 мин

В предыдущих статьях Практическое введение в Apache NiFi и Миграция данных из MariaDB в Parquet с помощью Apache NiFi были рассмотрены базовые концепции NiFi и практический сценарий batch‑выгрузки данных из OLTP БД в аналитический формат Parquet.

В этой статье продолжим логическую цепочку и разберем следующий шаг типового data‑pipeline: загрузка данных из Parquet‑файлов в Elasticsearch с использованием Apache NiFi.

Сценарий полностью batch‑ориентированный и подходит для:

  • первичной инициализации индекса;
  • полной переиндексации;
  • восстановления Elasticsearch из data lake или файлового хранилища.

Теоретические основы Apache NiFi, FlowFile и Controller Services здесь не повторяются.

Общая схема потока

Используется минимальный и наглядный набор процессоров:

[ListFile]
      |
      v
[FetchFile]
      |
      v
[ConvertRecord]
      |
      v
[PutElasticsearchRecord]

Где:

  • ListFile - обнаруживает Parquet‑файлы в каталоге;
  • FetchFile - читает содержимое файла;
  • ConvertRecord - преобразует Parquet в JSON с явной схемой;
  • PutElasticsearchRecord - индексирует записи в Elasticsearch.

Каждый Parquet‑файл обрабатывается как независимый batch.

Исходные данные

На вход подаются Parquet‑файлы, полученные на предыдущем этапе пайплайна (из MariaDB), со следующей логической структурой:

Поле Тип данных
Id int
Content string

На этом этапе появляется важный нюанс интеграции с Elasticsearch, который необходимо учитывать при проектировании потока.

Elasticsearch чувствителен к типам полей, особенно если поле используется как _id документа.

Если поле Id остаётся числовым (int):

  • Elasticsearch интерпретирует его иначе, чем строковый _id;
  • при повторной загрузке данные не перезаписываются;
  • каждый запуск создаёт новые документы.

Чтобы обеспечить идемпотентность загрузки, Id необходимо привести к типу string до записи в Elasticsearch.

Для этого используется процессор ConvertRecord с кастомной схемой.

Настройка процессоров

Настройка процессоров ListFile и FetchFile осуществляется аналогично как в статье Практическое введение в Apache NiFi.

ListFile

Назначение: обнаружение новых Parquet‑файлов. ListFile гарантирует, что каждый файл будет обработан только один раз.

Ключевые настройки:

  • Input Directory: /result/article_documents
  • Listing Strategy: Tracking Timestamps
  • Entity Tracking State Cache: MapCacheClientService (требуется настройка общего ресурса - локального сервера MapCacheServer с настройками по умолчанию)
  • File Filter: (.*)\.parquet

FetchFile

Назначение: загрузка содержимого файла, ранее обнаруженного ListFile, в FlowFile content.

Ключевая настройка:

  • File to Fetch: ${absolute.path}/${filename}

ConvertRecord

Назначение:

  • преобразование Parquet в JSON;
  • изменение типа данных поля Id с int на string.

Используемые сервисы:

  • Record Reader - ParquetReader;
  • Record Writer - JsonRecordSetWriter.

Кастомная схема JsonRecordSetWriter

{
  "type": "record",
  "name": "MyRecord",
  "fields": [
    { "name": "Id", "type": "string" },
    { "name": "Content", "type": "string" }
  ]
}

ConvertRecord в этом сценарии выступает как явная точка контроля схемы и типов данных.

PutElasticsearchRecord

Назначение: запись records в Elasticsearch.

Ключевые настройки:

  • Index Operation: index
  • Index: test_article
  • Record Reader: JsonTreeReader
  • ID Record Path: /Id
  • Client Service - ElasticSearchClientServiceImpl;

Параметр ID Record Path используется для сопоставления record с _id документа в Elasticsearch и обеспечивает корректное обновление данных.

Controller Services

ElasticSearchClientServiceImpl

Используется стандартный клиент NiFi для подключения к Elasticsearch.

Минимальные параметры:

  • Elasticsearch Hosts: http://elasticsearch:9200;
  • Authorization Scheme: Basic
  • Username: elastic
  • Password: elastic_password

В статье используется Elasticsearch 8, который запущен в docker со следующей конфигурацией:

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.19.9
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=true
      - ELASTIC_PASSWORD=elastic_password
      - xpack.security.http.ssl.enabled=false
      - xpack.security.transport.ssl.enabled=false
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
    ports:
      - "9200:9200"
    volumes:
      - esdata:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:8.19.9
    container_name: kibana
    depends_on:
      - elasticsearch
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=kibana_password
    ports:
      - "5601:5601"
    networks:
      - elastic

volumes:
  esdata:

networks:
  elastic:
    name: elastic
    driver: bridge

Для связи Apache NiFi и Elasticsearch необходимо также добавить общую сеть в сервисы nifi и elasticsearch, либо объединить их в единый docker-compose.yml.

Проверка результата

После запуска потока:

  1. Parquet‑файлы читаются ровно один раз;
  2. каждая запись преобразуется в JSON‑документ;
  3. поле Id используется как _id в Elasticsearch;
  4. при повторном запуске данные перезаписываются, а не дублируются.

Для проверки результата можно использовать Kibana по адресу http://127.0.0.1:5601/.

Методические замечания

  • ConvertRecord - обязательная точка контроля схемы;
  • NiFi особенно хорошо подходит для batch‑переиндексации, а для streaming‑сценариев лучше использовать Kafka‑ориентированные решения.

Заключение

Apache NiFi позволяет без написания кода реализовать надежную batch‑загрузку данных из Parquet‑файлов в Elasticsearch.

Apache NiFi в данном сценарии выступает как:

  • транспортный слой;
  • преобразователь форматов;
  • контроллер идемпотентной загрузки данных.

Использование явной схемы и управления типами данных является ключевым фактором корректной интеграции Parquet и Elasticsearch, особенно при повторных загрузках и переиндексации.