Практическое введение в Apache NiFi
Apache NiFi - это инструмент для построения и управления потоками данных (dataflow) между различными системами. Его ключевая задача - надёжно, воспроизводимо и наглядно перемещать, преобразовывать и маршрутизировать данные.
NiFi изначально разрабатывался для сценариев data ingestion (доставка данных) и data integration (приведение данных в единое, согласованное представление), где важно:
- гарантированно не потерять данные;
- видеть текущее состояние потоков;
- быстро вносить изменения без перекомпиляции и деплоя кода.
Чтобы понять NiFi, полезно сначала зафиксировать его место в классическом ETL-процессе:
- Extract - получение данных из источников;
- Transform - преобразование формата, структуры, значений;
- Load - запись данных в целевую систему.
NiFi в первую очередь решает задачи Extract и Load, а также простые Transform. Он не предназначен для сложных аналитических вычислений, но отлично подходит как транспортный и интеграционный слой между системами.
NiFi обычно используют вместе со Spark или Airflow: NiFi принимает и подготавливает данные, Spark выполняет вычислительную обработку, а Airflow управляет оркестрацией задач.
Сравнение систем через призму ETL
| Система | Роль в ETL |
|---|---|
| Apache NiFi | Extract + Load + простые Transform |
| Apache Airflow | Оркестрация ETL‑задач |
| Apache Spark | Transform (вычисления) |
| Kafka | Транспорт / буфер |
Базовые понятия NiFi
- FlowFile - это минимальная единица данных в NiFi. FlowFile всегда имеет Content (данные) и Attributes (метаданные). NiFi не работает «со всей таблицей сразу» - он обрабатывает поток FlowFile. Каждый FlowFile можно маршрутизировать, повторить, отложить или отправить в ошибку.
- Processor - это один логический шаг обработки FlowFile. Примеры процессоров: прочитать файл из директории, прочитать данные из базы, выполнить SQL-запрос и др.
- Connection - это очередь FlowFile между процессорами. Она разделяет этапы обработки, создавая очередь сообщений между процессорами.
- Controller Service (общий ресурс) - это переиспользуемый сервис, который не обрабатывает FlowFile напрямую. Например, JDBC Connection Pool, CSV Reader, Json Writer и др. Общий ресурс упрощает настройку и сопровождение.
- Process Group - это контейнер для логически связанного потока. Он применяется для декомпозиции потоков.
Типичные кейсы применения NiFi
- загрузка данных из файлов в БД;
- доставка событий в Kafka;
- промежуточная нормализация данных;
- CDC-пайплайны (как транспортный слой).
Практический пример: запись данных из CSV в MariaDB
Задача - загрузить CSV-файл в таблицу MariaDB, используя файловую модель.
В данном примере будет использован Apache NiFi 2.7.2.
Поток данных
[ListFile]
|
v
[FetchFile]
|
v
[PutDatabaseRecord]
Каждый FlowFile соответствует одному CSV-файлу.
CSV-файл data.csv
Id,Content
1001,Hello world
1002,Apache NiFi is cool
1003,CSV to MariaDB
Таблица MariaDB
CREATE TABLE `ArticleDocuments` (
`Id` INT(11) NOT NULL AUTO_INCREMENT,
`Content` LONGTEXT NOT NULL COLLATE 'utf8mb4_general_ci',
PRIMARY KEY (`Id`)
) COLLATE='utf8mb4_general_ci' ENGINE=InnoDB;
Настройка процессоров
ListFile
Назначение: обнаружить новые файлы в каталоге и создать FlowFile без содержимого.
Ключевая идея:
ListFile отвечает только за факт появления файла, а не за его чтение.
Ключевые настройки:
- Input Directory:
/result/article_documents - Listing Strategy:
Tracking Timestamps - Entity Tracking State Cache:
MapCacheClientService(требуется настройка общего ресурса - локального сервераMapCacheServerс настройками по умолчанию)
Методический комментарий:
- файл не будет обработан повторно;
- состояние хранится внутри NiFi.
FetchFile
Назначение: считать содержимое файла, ранее обнаруженного ListFile.
Как это работает:
- FlowFile от ListFile содержит атрибуты
pathиfilename - FetchFile использует их для чтения файла и заполнения content
Ключевые настройки:
- File to Fetch:
${absolute.path}/${filename}
Методический комментарий:
Разделение ListFile и FetchFile - типовой прием надежного batch-ETL.
PutDatabaseRecord
Назначение: загрузить CSV-файл в таблицу MariaDB.
Особенности примера:
- используется file-based модель;
- CSV должен соответствовать структуре таблицы;
- отсутствует валидация данных.
Ключевые настройки:
- Database Connection Pooling Service:
MariaDBConnectionPool - Table Name:
ArticleDocuments - Statement Type:
INSERT - Record Reader:
CSVReader
Важно понимать:
PutDatabaseRecord работает с records, но преобразование происходит внутри процессора.
Комментарий:
Для production рекомендуется явное использование ConvertRecord и схем.
Настройка общего ресурса MariaDBConnectionPool
- Database Connection URL:
jdbc:mariadb://mariadb:3306/article - Database Driver Class Name:
org.mariadb.jdbc.Driver - Database Driver Locations:
/opt/nifi/nifi-current/drivers/org.mariadb.jdbc_mariadb-java-client-3.3.3.jar - Database User:
root
Инфраструктура
СУБД MariaDB
Запуск MariaDB вынесен в отдельный файл docker-compose.yml:
services:
mariadb:
image: mariadb:11.2
container_name: mariadb
hostname: mariadb
restart: always
ports:
- "3306:3306"
environment:
MARIADB_ROOT_PASSWORD: root
MARIADB_DATABASE: article
MARIADB_USER: user
MARIADB_PASSWORD: password
volumes:
- mariadb-data:/var/lib/mysql
networks:
- mariadbnet
volumes:
mariadb-data:
networks:
mariadbnet:
name: mariadbnet
Apache NiFi
NiFi также можно запускать в Docker-контейнере. Это самый простой путь для локальных экспериментов.
NiFi в Docker поддерживает подключение кастомных расширений:
- JDBC-драйверы;
- собственные NAR-модули.
Соберем образ, который включает драйвер для работы с MariaDB. NAR-модули в данном примере не используются, поэтому папка nifi/nars/ может быть пустой.
FROM apache/nifi:2.7.2
# NARs
COPY nifi/nars/*.nar /opt/nifi/nifi-current/nar_extensions/
# MariaDB JDBC driver
COPY nifi/drivers/org.mariadb.jdbc_mariadb-java-client-3.3.3.jar \
/opt/nifi/nifi-current/drivers/
Создадим docker-compose.yml в этой же папке для запуска NiFi. Он связан единой сетью mariadbnet с MariaDB:
services:
nifi:
build: .
container_name: nifi
ports:
- "8443:8443"
environment:
- SINGLE_USER_CREDENTIALS_USERNAME=admin
# Минимальная длина пароля - 12 символов
- SINGLE_USER_CREDENTIALS_PASSWORD=password12345
volumes:
- ./nifi/drivers:/opt/nifi/nifi-current/drivers
- ./result:/result
networks:
- mariadbnet
networks:
mariadbnet:
name: mariadbnet
external: true
Проверка работы
- Запустить
docker compose up -d - Открыть веб-интерфейс:
https://localhost:8443/nifi/ - Создать процессоры и соединить их согласно схеме с типом отношения
success - Включить Controller Services
- Положить CSV-файл в каталог
./result/article_documents - Запустить процессоры
- Проверить данные в MariaDB:
SELECT * FROM ArticleDocuments;
Методические ремарки
- NiFi не заменяет полноценную ETL-логику
- Большие трансформации сложнее поддерживать визуально
- В production рекомендуется использовать явные схемы, обрабатывать возможные ошибки в отдельных процессорах
Заключение
Apache NiFi - это инструмент управления потоками данных, а не вычислительная платформа. Он хорошо подходит для data ingestion и data integration.