Замена Kafka Consumer на Kafka Connect Elasticsearch Sink
Эта статья является продолжением материала Kafka → Elasticsearch: асинхронная индексация статей в ASP.NET Core.
В исходной реализации индексация выполнялась кастомным Kafka consumer’ом, реализованным через
HostedService. В этой статье показано, как заменить только consumer, используя Kafka Connect + Elasticsearch Sink, не меняя API, producer и формат сообщений.
В предыдущей статье использовалась следующая архитектура:
ASP.NET Core → Kafka → Custom Consumer (HostedService) → Elasticsearch
Такой подход хорошо подходит для backend-ориентированных сценариев и даёт полный контроль над логикой обработки сообщений, но требует поддержки consumer-кода.
Kafka Connect предлагает альтернативный, инфраструктурный подход:
ASP.NET Core → Kafka → Kafka Connect → Elasticsearch
Цель статьи:
- заменить Kafka consumer на Kafka Connect;
- сохранить ASP.NET Core API без изменений;
- сохранить Kafka producer и формат сообщений;
- показать минимальные изменения инфраструктуры.
В этой статье мы будем использовать Elasticsearch 8.19.9, потому что на момент написания статьи Elasticsearch Sink Connector совместим только с Elasticsearch 7/8. При использовании Elasticsearch 9.x Kafka Connect завершается с ошибкой при проверке индекса.
Kafka Connect
Kafka Connect - это отдельный runtime-сервис, а не библиотека. Он запускается независимо от ASP.NET Core приложения и управляется через REST API.
Kafka Connect предназначен для построения интеграций между Kafka и внешними системами без написания прикладного producer- и consumer-кода. Он берет на себя задачи чтения и записи данных, управление offset’ами, повторные попытки доставки и параллельную обработку сообщений.
В Kafka Connect существуют два типа коннекторов:
- Source-коннекторы используются для загрузки данных в Kafka из внешних источников, таких как базы данных или файловые системы.
- Sink-коннекторы используются для выгрузки данных из Kafka во внешние системы, например, поисковые индексы или хранилища данных.
В данной статье мы подключим Sink-коннектор для загрузки данных в Elasticsearch.
При использовании Kafka Connect:
- consumer-код полностью исчезает из приложения;
- логика обработки сообщений описывается конфигурацией;
- ответственность за доставку и retries переносится на инфраструктуру.
Перед заменой consumer’а зафиксируем, какие части системы не изменяются.
ASP.NET Core приложение:
- HTTP-контроллеры;
ArticleKafkaProducer;- возвращаемый HTTP статус
202 Accepted.
Формат Kafka-сообщений
{ "id": 123, "content": "Полный текст статьи" }
Kafka record:
- key: строковое представление
id - value: JSON статьи
Этот формат полностью совместим с Elasticsearch Sink Connector и позволяет выполнять upsert без дополнительной логики.
Из ASP.NET Core проекта удаляются:
KafkaToElasticHostedService;- регистрация
IConsumer<,>;
Новая инфраструктура
Для перехода на Kafka Connect требуется добавить один новый сервис.
Минимальный набор компонентов
| Компонент | Статус |
|---|---|
| Kafka | уже используется |
| Elasticsearch | уже используется |
| Kafka Connect | новый компонент |
| Elasticsearch Sink Connector | плагин Kafka Connect |
Kafka Connect:
- запускается как отдельный процесс (обычно в Docker);
- хранит конфигурацию и offset’ы в Kafka;
- масштабируется независимо от ASP.NET Core.
docker-compose для Elasticsearch
docker-compose для запуска Elasticsearch включает контейнер kibana для визуализации данных, хранящихся в Elasticsearch.
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.19.9
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=elastic_password
- xpack.security.http.ssl.enabled=false
- xpack.security.transport.ssl.enabled=false
- ES_JAVA_OPTS=-Xms1g -Xmx1g
ports:
- "9200:9200"
volumes:
- esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
networks:
- elastic
kibana:
image: docker.elastic.co/kibana/kibana:8.19.9
container_name: kibana
depends_on:
- elasticsearch
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=kibana_password
ports:
- "5601:5601"
networks:
- elastic
volumes:
esdata:
networks:
elastic:
name: elastic
driver: bridge
docker-compose для Kafka Connect
Ниже приведён docker-compose для запуска Kafka Connect.
Предполагается, что Kafka доступна на порту
9094, как и в предыдущих статьях. Контейнерkafka-connectиспользует сеть elastic для подключения к Elasticsearch.
services:
kafka-ui:
container_name: kafka-ui
hostname: kafka-ui
image: provectuslabs/kafka-ui:latest
restart: always
ports:
- 10025:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- './config-ui.yaml:/etc/kafkaui/dynamic_config.yaml'
networks:
- kafkanet
kafka:
image: apache/kafka:3.9.1
restart: always
hostname: kafka
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_LISTENERS: CLIENT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
networks:
- kafkanet
kafka-connect:
build: ./kafka-connect/
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_GROUP_ID: "kafka-connect-elastic"
CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
depends_on:
- kafka
networks:
- kafkanet
- elastic
networks:
kafkanet:
name: kafkanet
elastic:
name: elastic
external: true
Сервис Kafka Connect собирается из следующего Dockerfile:
FROM confluentinc/cp-kafka-connect:7.7.7
# Устанавка Elasticsearch Sink Connector
# https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:15.1.0
Настройка Elasticsearch Sink Connector
Elasticsearch Sink Connector настраивается через REST API Kafka Connect. Конфигурация определяет, как сообщения из Kafka попадают в Elasticsearch.
Создание коннектора
Конфигурация контейнера включает указание типа коннектора и конфигурацию подключения. После запуска Kafka Connect создайте коннектор через REST API:
curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "articles-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "articles",
"connection.url": "http://elasticsearch:9200",
"connection.username": "elastic",
"connection.password": "elastic_password",
"key.ignore": "false",
"schema.ignore": "true",
"write.method": "upsert",
"data.stream.type": "none",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
Пояснение параметров:
topics- Kafka topic со статьямиkey.ignore=false- Kafka message key используется как Elasticsearch_idwrite.method=upsert- повторные сообщения безопасно обновляют документschema.ignore=true- используется JSON без Schema Registry
Удаление коннектора
Если коннектор создан некорректно, то его необходимо удалить и создать заново. Запрос для удаления коннектора:
curl --location --request DELETE 'http://localhost:8083/connectors/articles-elasticsearch-sink'
Проверка статуса работы коннектора
Для проверки статуса работы коннектора и диагностики ошибок используйте запрос:
curl --location 'http://localhost:8083/connectors/articles-elasticsearch-sink/status'
Дополнительные запросы
Если коннектор не создается, для диагностики причины могут помочь запросы:
Список установленных плагинов:
curl --location 'http://localhost:8083/connector-plugins'
Список созданных коннекторов:
curl --location 'http://localhost:8083/connectors'
Проверка работы
- Запустить Elasticsearch
- Запустить Kafka и Kafka Connect
- Создать Elasticsearch Sink Connector
- Запустить ASP.NET Core приложение
- Отправить статью через HTTP API
- Проверить наличие документа в Elasticsearch
Преимущества и недостатки нового подхода
Преимущества Kafka Connect
- отсутствие consumer-кода в приложении;
- production-ready retries и batching;
- независимый lifecycle от ASP.NET Core;
- стандартный инструмент data pipeline.
Недостатки
- ограниченные возможности бизнес-логики;
- сложнее локальная отладка;
- дополнительный инфраструктурный компонент;
- логика обработки описывается конфигурацией, а не кодом.
Заключение
Переход от кастомного Kafka consumer’а к Kafka Connect:
- не требует изменений API и producer’а;
- сохраняет формат сообщений;
- снижает объём прикладного кода;
- переносит ответственность за доставку данных на инфраструктуру.
Kafka Connect - хороший выбор для data-pipeline сценариев. Кастомный consumer остаётся предпочтительным, когда требуется сложная доменная логика обработки сообщений.
Эта статья логически дополняет предыдущий материал и демонстрирует эволюцию архитектуры от application-driven к infrastructure-driven подходу.