Практическое введение в Apache NiFi

Иван Гурин Технологии интеграции информационных систем 8 мин

Apache NiFi - это инструмент для построения и управления потоками данных (dataflow) между различными системами. Его ключевая задача - надёжно, воспроизводимо и наглядно перемещать, преобразовывать и маршрутизировать данные.

NiFi изначально разрабатывался для сценариев data ingestion (доставка данных) и data integration (приведение данных в единое, согласованное представление), где важно:

  • гарантированно не потерять данные;
  • видеть текущее состояние потоков;
  • быстро вносить изменения без перекомпиляции и деплоя кода.

Чтобы понять NiFi, полезно сначала зафиксировать его место в классическом ETL-процессе:

  • Extract - получение данных из источников;
  • Transform - преобразование формата, структуры, значений;
  • Load - запись данных в целевую систему.

NiFi в первую очередь решает задачи Extract и Load, а также простые Transform. Он не предназначен для сложных аналитических вычислений, но отлично подходит как транспортный и интеграционный слой между системами.

NiFi обычно используют вместе со Spark или Airflow: NiFi принимает и подготавливает данные, Spark выполняет вычислительную обработку, а Airflow управляет оркестрацией задач.

Сравнение систем через призму ETL

Система Роль в ETL
Apache NiFi Extract + Load + простые Transform
Apache Airflow Оркестрация ETL‑задач
Apache Spark Transform (вычисления)
Kafka Транспорт / буфер

Базовые понятия NiFi

  • FlowFile - это минимальная единица данных в NiFi. FlowFile всегда имеет Content (данные) и Attributes (метаданные). NiFi не работает «со всей таблицей сразу» - он обрабатывает поток FlowFile. Каждый FlowFile можно маршрутизировать, повторить, отложить или отправить в ошибку.
  • Processor - это один логический шаг обработки FlowFile. Примеры процессоров: прочитать файл из директории, прочитать данные из базы, выполнить SQL-запрос и др.
  • Connection - это очередь FlowFile между процессорами. Она разделяет этапы обработки, создавая очередь сообщений между процессорами.
  • Controller Service (общий ресурс) - это переиспользуемый сервис, который не обрабатывает FlowFile напрямую. Например, JDBC Connection Pool, CSV Reader, Json Writer и др. Общий ресурс упрощает настройку и сопровождение.
  • Process Group - это контейнер для логически связанного потока. Он применяется для декомпозиции потоков.

Типичные кейсы применения NiFi

  • загрузка данных из файлов в БД;
  • доставка событий в Kafka;
  • промежуточная нормализация данных;
  • CDC-пайплайны (как транспортный слой).

Практический пример: запись данных из CSV в MariaDB

Задача - загрузить CSV-файл в таблицу MariaDB, используя файловую модель.

В данном примере будет использован Apache NiFi 2.7.2.

Поток данных

[ListFile]
    |
    v
[FetchFile]
    |
    v
[PutDatabaseRecord]

Каждый FlowFile соответствует одному CSV-файлу.

CSV-файл data.csv

Id,Content
1001,Hello world
1002,Apache NiFi is cool
1003,CSV to MariaDB

Таблица MariaDB

CREATE TABLE `ArticleDocuments` (
	`Id` INT(11) NOT NULL AUTO_INCREMENT,
	`Content` LONGTEXT NOT NULL COLLATE 'utf8mb4_general_ci',
	PRIMARY KEY (`Id`)
) COLLATE='utf8mb4_general_ci' ENGINE=InnoDB;

Настройка процессоров

ListFile

Назначение: обнаружить новые файлы в каталоге и создать FlowFile без содержимого.

Ключевая идея:

ListFile отвечает только за факт появления файла, а не за его чтение.

Ключевые настройки:

  • Input Directory: /result/article_documents
  • Listing Strategy: Tracking Timestamps
  • Entity Tracking State Cache: MapCacheClientService (требуется настройка общего ресурса - локального сервера MapCacheServer с настройками по умолчанию)

Методический комментарий:

  • файл не будет обработан повторно;
  • состояние хранится внутри NiFi.

FetchFile

Назначение: считать содержимое файла, ранее обнаруженного ListFile.

Как это работает:

  • FlowFile от ListFile содержит атрибуты path и filename
  • FetchFile использует их для чтения файла и заполнения content

Ключевые настройки:

  • File to Fetch: ${absolute.path}/${filename}

Методический комментарий:

Разделение ListFile и FetchFile - типовой прием надежного batch-ETL.

PutDatabaseRecord

Назначение: загрузить CSV-файл в таблицу MariaDB.

Особенности примера:

  • используется file-based модель;
  • CSV должен соответствовать структуре таблицы;
  • отсутствует валидация данных.

Ключевые настройки:

  • Database Connection Pooling Service: MariaDBConnectionPool
  • Table Name: ArticleDocuments
  • Statement Type: INSERT
  • Record Reader: CSVReader

Важно понимать:

PutDatabaseRecord работает с records, но преобразование происходит внутри процессора.

Комментарий:

Для production рекомендуется явное использование ConvertRecord и схем.

Настройка общего ресурса MariaDBConnectionPool

  • Database Connection URL: jdbc:mariadb://mariadb:3306/article
  • Database Driver Class Name: org.mariadb.jdbc.Driver
  • Database Driver Locations: /opt/nifi/nifi-current/drivers/org.mariadb.jdbc_mariadb-java-client-3.3.3.jar
  • Database User: root

Инфраструктура

СУБД MariaDB

Запуск MariaDB вынесен в отдельный файл docker-compose.yml:

services:
  mariadb:
    image: mariadb:11.2
    container_name: mariadb
    hostname: mariadb
    restart: always
    ports:
      - "3306:3306"
    environment:
      MARIADB_ROOT_PASSWORD: root
      MARIADB_DATABASE: article
      MARIADB_USER: user
      MARIADB_PASSWORD: password
    volumes:
      - mariadb-data:/var/lib/mysql
    networks:
      - mariadbnet

volumes:
  mariadb-data:

networks:
  mariadbnet:
    name: mariadbnet

Apache NiFi

NiFi также можно запускать в Docker-контейнере. Это самый простой путь для локальных экспериментов.

NiFi в Docker поддерживает подключение кастомных расширений:

  • JDBC-драйверы;
  • собственные NAR-модули.

Соберем образ, который включает драйвер для работы с MariaDB. NAR-модули в данном примере не используются, поэтому папка nifi/nars/ может быть пустой.

FROM apache/nifi:2.7.2

# NARs
COPY nifi/nars/*.nar /opt/nifi/nifi-current/nar_extensions/

# MariaDB JDBC driver
COPY nifi/drivers/org.mariadb.jdbc_mariadb-java-client-3.3.3.jar \
     /opt/nifi/nifi-current/drivers/

Создадим docker-compose.yml в этой же папке для запуска NiFi. Он связан единой сетью mariadbnet с MariaDB:

services:
  nifi:
    build: .
    container_name: nifi
    ports:
      - "8443:8443"
    environment:
      - SINGLE_USER_CREDENTIALS_USERNAME=admin
      # Минимальная длина пароля - 12 символов
      - SINGLE_USER_CREDENTIALS_PASSWORD=password12345
    volumes:
      - ./nifi/drivers:/opt/nifi/nifi-current/drivers
      - ./result:/result
    networks:
      - mariadbnet

networks:
  mariadbnet:
    name: mariadbnet
    external: true

Проверка работы

  1. Запустить docker compose up -d
  2. Открыть веб-интерфейс: https://localhost:8443/nifi/
  3. Создать процессоры и соединить их согласно схеме с типом отношения success
  4. Включить Controller Services
  5. Положить CSV-файл в каталог ./result/article_documents
  6. Запустить процессоры
  7. Проверить данные в MariaDB:
SELECT * FROM ArticleDocuments;

Методические ремарки

  • NiFi не заменяет полноценную ETL-логику
  • Большие трансформации сложнее поддерживать визуально
  • В production рекомендуется использовать явные схемы, обрабатывать возможные ошибки в отдельных процессорах

Заключение

Apache NiFi - это инструмент управления потоками данных, а не вычислительная платформа. Он хорошо подходит для data ingestion и data integration.