Замена Kafka Consumer на Kafka Connect Elasticsearch Sink

Иван Гурин Технологии интеграции информационных систем 12 мин

Эта статья является продолжением материала Kafka → Elasticsearch: асинхронная индексация статей в ASP.NET Core.

В исходной реализации индексация выполнялась кастомным Kafka consumer’ом, реализованным через HostedService. В этой статье показано, как заменить только consumer, используя Kafka Connect + Elasticsearch Sink, не меняя API, producer и формат сообщений.

В предыдущей статье использовалась следующая архитектура:

ASP.NET Core → Kafka → Custom Consumer (HostedService) → Elasticsearch

Такой подход хорошо подходит для backend-ориентированных сценариев и даёт полный контроль над логикой обработки сообщений, но требует поддержки consumer-кода.

Kafka Connect предлагает альтернативный, инфраструктурный подход:

ASP.NET Core → Kafka → Kafka Connect → Elasticsearch

Цель статьи:

  • заменить Kafka consumer на Kafka Connect;
  • сохранить ASP.NET Core API без изменений;
  • сохранить Kafka producer и формат сообщений;
  • показать минимальные изменения инфраструктуры.

В этой статье мы будем использовать Elasticsearch 8.19.9, потому что на момент написания статьи Elasticsearch Sink Connector совместим только с Elasticsearch 7/8. При использовании Elasticsearch 9.x Kafka Connect завершается с ошибкой при проверке индекса.

Kafka Connect

Kafka Connect - это отдельный runtime-сервис, а не библиотека. Он запускается независимо от ASP.NET Core приложения и управляется через REST API.

Kafka Connect предназначен для построения интеграций между Kafka и внешними системами без написания прикладного producer- и consumer-кода. Он берет на себя задачи чтения и записи данных, управление offset’ами, повторные попытки доставки и параллельную обработку сообщений.

В Kafka Connect существуют два типа коннекторов:

  • Source-коннекторы используются для загрузки данных в Kafka из внешних источников, таких как базы данных или файловые системы.
  • Sink-коннекторы используются для выгрузки данных из Kafka во внешние системы, например, поисковые индексы или хранилища данных.

В данной статье мы подключим Sink-коннектор для загрузки данных в Elasticsearch.

При использовании Kafka Connect:

  • consumer-код полностью исчезает из приложения;
  • логика обработки сообщений описывается конфигурацией;
  • ответственность за доставку и retries переносится на инфраструктуру.

Перед заменой consumer’а зафиксируем, какие части системы не изменяются.

  1. ASP.NET Core приложение:

    • HTTP-контроллеры;
    • ArticleKafkaProducer;
    • возвращаемый HTTP статус 202 Accepted.
  2. Формат Kafka-сообщений

    {
      "id": 123,
      "content": "Полный текст статьи"
    }
    

Kafka record:

  • key: строковое представление id
  • value: JSON статьи

Этот формат полностью совместим с Elasticsearch Sink Connector и позволяет выполнять upsert без дополнительной логики.

Из ASP.NET Core проекта удаляются:

  • KafkaToElasticHostedService;
  • регистрация IConsumer<,>;

Новая инфраструктура

Для перехода на Kafka Connect требуется добавить один новый сервис.

Минимальный набор компонентов

Компонент Статус
Kafka уже используется
Elasticsearch уже используется
Kafka Connect новый компонент
Elasticsearch Sink Connector плагин Kafka Connect

Kafka Connect:

  • запускается как отдельный процесс (обычно в Docker);
  • хранит конфигурацию и offset’ы в Kafka;
  • масштабируется независимо от ASP.NET Core.

docker-compose для Elasticsearch

docker-compose для запуска Elasticsearch включает контейнер kibana для визуализации данных, хранящихся в Elasticsearch.

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.19.9
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=true
      - ELASTIC_PASSWORD=elastic_password
      - xpack.security.http.ssl.enabled=false
      - xpack.security.transport.ssl.enabled=false
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
    ports:
      - "9200:9200"
    volumes:
      - esdata:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:8.19.9
    container_name: kibana
    depends_on:
      - elasticsearch
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=kibana_password
    ports:
      - "5601:5601"
    networks:
      - elastic

volumes:
  esdata:

networks:
  elastic:
    name: elastic
    driver: bridge

docker-compose для Kafka Connect

Ниже приведён docker-compose для запуска Kafka Connect.

Предполагается, что Kafka доступна на порту 9094, как и в предыдущих статьях. Контейнер kafka-connect использует сеть elastic для подключения к Elasticsearch.

services:
  kafka-ui:
    container_name: kafka-ui
    hostname: kafka-ui
    image: provectuslabs/kafka-ui:latest
    restart: always
    ports:
      - 10025:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
    volumes:
      - './config-ui.yaml:/etc/kafkaui/dynamic_config.yaml'
    networks:
      - kafkanet
      
  kafka:
    image: apache/kafka:3.9.1
    restart: always
    hostname: kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
      KAFKA_LISTENERS: CLIENT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - kafkanet
      
  kafka-connect:
    build: ./kafka-connect/ 
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_GROUP_ID: "kafka-connect-elastic"
      CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    depends_on:
      - kafka
    networks:
      - kafkanet
      - elastic
    
networks:
  kafkanet:
    name: kafkanet
  elastic:
    name: elastic
    external: true

Сервис Kafka Connect собирается из следующего Dockerfile:

FROM confluentinc/cp-kafka-connect:7.7.7

# Устанавка Elasticsearch Sink Connector
# https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:15.1.0

Настройка Elasticsearch Sink Connector

Elasticsearch Sink Connector настраивается через REST API Kafka Connect. Конфигурация определяет, как сообщения из Kafka попадают в Elasticsearch.

Создание коннектора

Конфигурация контейнера включает указание типа коннектора и конфигурацию подключения. После запуска Kafka Connect создайте коннектор через REST API:

curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
  "name": "articles-elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "articles",
    "connection.url": "http://elasticsearch:9200",
    "connection.username": "elastic",
    "connection.password": "elastic_password",
    "key.ignore": "false",
    "schema.ignore": "true",
    "write.method": "upsert",
    "data.stream.type": "none",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}'

Пояснение параметров:

  • topics - Kafka topic со статьями
  • key.ignore=false - Kafka message key используется как Elasticsearch _id
  • write.method=upsert - повторные сообщения безопасно обновляют документ
  • schema.ignore=true - используется JSON без Schema Registry

Удаление коннектора

Если коннектор создан некорректно, то его необходимо удалить и создать заново. Запрос для удаления коннектора:

curl --location --request DELETE 'http://localhost:8083/connectors/articles-elasticsearch-sink'

Проверка статуса работы коннектора

Для проверки статуса работы коннектора и диагностики ошибок используйте запрос:

curl --location 'http://localhost:8083/connectors/articles-elasticsearch-sink/status'

Дополнительные запросы

Если коннектор не создается, для диагностики причины могут помочь запросы:

Список установленных плагинов:

curl --location 'http://localhost:8083/connector-plugins'

Список созданных коннекторов:

curl --location 'http://localhost:8083/connectors'

Проверка работы

  1. Запустить Elasticsearch
  2. Запустить Kafka и Kafka Connect
  3. Создать Elasticsearch Sink Connector
  4. Запустить ASP.NET Core приложение
  5. Отправить статью через HTTP API
  6. Проверить наличие документа в Elasticsearch

Преимущества и недостатки нового подхода

Преимущества Kafka Connect

  • отсутствие consumer-кода в приложении;
  • production-ready retries и batching;
  • независимый lifecycle от ASP.NET Core;
  • стандартный инструмент data pipeline.

Недостатки

  • ограниченные возможности бизнес-логики;
  • сложнее локальная отладка;
  • дополнительный инфраструктурный компонент;
  • логика обработки описывается конфигурацией, а не кодом.

Заключение

Переход от кастомного Kafka consumer’а к Kafka Connect:

  • не требует изменений API и producer’а;
  • сохраняет формат сообщений;
  • снижает объём прикладного кода;
  • переносит ответственность за доставку данных на инфраструктуру.

Kafka Connect - хороший выбор для data-pipeline сценариев. Кастомный consumer остаётся предпочтительным, когда требуется сложная доменная логика обработки сообщений.

Эта статья логически дополняет предыдущий материал и демонстрирует эволюцию архитектуры от application-driven к infrastructure-driven подходу.