Интеграция MariaDB и Elasticsearch с помощью Kafka Connect

Иван Гурин Технологии интеграции информационных систем 13 мин

Kafka Connect - это отдельный runtime-сервис, а не библиотека. Он запускается независимо от ASP.NET Core приложения и управляется через REST API. Его основное назначение - построение интеграций между Kafka и внешними системами без написания собственного producer- и consumer-кода.

В рамках данной статьи будет использован один Source Connector и один Sink Connector:

  • Source Connector - для получения изменений данных из MariaDB;
  • Sink Connector - для загрузки этих изменений в Elasticsearch.

Данная статья является логическим продолжением материала: Kafka → Elasticsearch: асинхронная индексация статей в ASP.NET Core. В предыдущем подходе индексация выполнялась кастомным Kafka consumer’ом, реализованным через HostedService. Здесь показано, как заменить consumer инфраструктурным решением на базе Kafka Connect, не меняя API и формат сообщений.

Для получения изменений из базы данных используется Debezium - платформа Change Data Capture (CDC). Debezium читает binlog транзакционных баз данных и публикует изменения в Kafka. Он поддерживает такие системы, как MySQL, MariaDB, PostgreSQL, MongoDB и SQL Server.

Архитектура примера

Архитектура решения состоит из следующих компонентов:

  • ASP.NET Core веб-приложение - принимает HTTP-запросы и сохраняет данные в базу;
  • MariaDB - основное хранилище данных;
  • Kafka - транспорт для событий;
  • Kafka Connect - инфраструктурный сервис для интеграций;
  • Debezium MariaDB Source Connector - источник CDC-событий;
  • Elasticsearch Sink Connector - загрузка данных в Elasticsearch;
  • Elasticsearch - поисковый движок и хранилище индекса.

Приложение пишет данные только в MariaDB. Все дальнейшие шаги - публикация событий, доставка в Elasticsearch и повторные попытки, - выполняются инфраструктурой.

Инфраструктура

Сервер MariaDB (настройка для Debezium)

services:
  mariadb:
    image: mariadb:11.2
    container_name: mariadb
    hostname: mariadb
    restart: always
    ports:
      - "3306:3306"
    environment:
      MARIADB_ROOT_PASSWORD: root
      MARIADB_DATABASE: article
      MARIADB_USER: debezium
      MARIADB_PASSWORD: dbz
    command:
      # Обязательные параметры для Debezium
      - --server-id=223344
      - --log-bin=mysql-bin
      - --binlog-format=ROW
      - --binlog-row-image=FULL
      - --binlog-expire-logs-seconds=604800
      - --binlog-annotate-row-events=ON
    volumes:
      - mariadb-data:/var/lib/mysql
    networks:
      - mariadbnet

volumes:
  mariadb-data:

networks:
  mariadbnet:
    name: mariadbnet

Elasticsearch и Kibana

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.19.9
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=true
      - ELASTIC_PASSWORD=elastic_password
      - xpack.security.http.ssl.enabled=false
      - xpack.security.transport.ssl.enabled=false
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
    ports:
      - "9200:9200"
    volumes:
      - esdata:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:8.19.9
    container_name: kibana
    depends_on:
      - elasticsearch
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=kibana_password
    ports:
      - "5601:5601"
    networks:
      - elastic

volumes:
  esdata:

networks:
  elastic:
    name: elastic
    driver: bridge

Kafka и Kafka Connect

Kafka Connect подключён одновременно к Kafka, MariaDB и Elasticsearch.

services:
  kafka-ui:
    container_name: kafka-ui
    hostname: kafka-ui
    image: provectuslabs/kafka-ui:latest
    restart: always
    ports:
      - 10025:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
    volumes:
      - './config-ui.yaml:/etc/kafkaui/dynamic_config.yaml'
    networks:
      - kafkanet
      
  kafka:
    image: apache/kafka:3.9.1
    restart: always
    hostname: kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
      KAFKA_LISTENERS: CLIENT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - kafkanet
      
  kafka-connect:
    build: ./kafka-connect/ 
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_GROUP_ID: "kafka-connect-elastic"
      CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    depends_on:
      - kafka
    networks:
      - kafkanet
      - elastic
      - mariadbnet
    
networks:
  kafkanet:
    name: kafkanet
  elastic:
    name: elastic
    external: true
  mariadbnet:
    name: mariadbnet
    external: true

Dockerfile для Kafka Connect

Образ Kafka Connect включает установленный из confluent-hub плагин для подключения к Elasticsearch и плагин Debezium MariaDB Connector, который устанавливается из архива.

FROM confluentinc/cp-kafka-connect:7.7.7

# Устанавка Elasticsearch Sink Connector
# https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:15.1.0

ENV DEBEZIUM_VERSION=3.4.0.Final
ENV PLUGIN_DIR=/usr/share/java

USER root

# Скачать и распаковать Debezium MariaDB Connector
RUN mkdir -p ${PLUGIN_DIR}/debezium-connector-mariadb \
    && curl -fSL https://repo1.maven.org/maven2/io/debezium/debezium-connector-mariadb/${DEBEZIUM_VERSION}/debezium-connector-mariadb-${DEBEZIUM_VERSION}-plugin.tar.gz \
    | tar -xz --strip-components=1 -C ${PLUGIN_DIR}/debezium-connector-mariadb

# Проверка, что jar-файлы распаковались
RUN ls -la ${PLUGIN_DIR}/debezium-connector-mariadb

ASP.NET Core приложение

DbContext

using Microsoft.EntityFrameworkCore;

public class ArticleDbContext : DbContext
{
    public virtual DbSet<ArticleDocument> ArticleDocuments { get; set; }

    public ArticleDbContext(DbContextOptions<ArticleDbContext> options) : base(options)
    {
        base.Database.SetCommandTimeout(30);
    }
}

Модель

public class ArticleDocument
{
    public int Id { get; set; }

    // Основное поле для полнотекстового поиска
    public string Content { get; set; } = string.Empty;
}

Регистрация DbContext

var connectionString = builder.Configuration.GetConnectionString("ArticleDbConnection");
builder.Services.AddDbContext<ArticleDbContext>(options =>
{
    options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
});

Контроллер

[ApiController]
[Route("api/article")]
public class ArticleController : ControllerBase
{
    private readonly ArticleDbContext _context;

    public ArticleController(ArticleDbContext context)
    {
        _context = context;
    }

    [HttpPost]
    public async Task<IActionResult> CreateArticle([FromBody] ArticleDocument article)
    {
        await _context.ArticleDocuments.AddAsync(article);
        await _context.SaveChangesAsync();
        return Accepted();
    }
}

База данных

База данных создаётся с использованием подхода Code-First. Структура созданной таблицы:

CREATE TABLE `ArticleDocuments` (
	`Id` INT(11) NOT NULL AUTO_INCREMENT,
	`Content` LONGTEXT NOT NULL COLLATE 'utf8mb4_general_ci',
	PRIMARY KEY (`Id`) USING BTREE
)
COLLATE='utf8mb4_general_ci' ENGINE=InnoDB;

Для работы Debezium в режиме репликации необходимо выдать дополнительные права:

docker exec -it mariadb mariadb -uroot -proot
GRANT
  SELECT,
  REPLICATION SLAVE,
  REPLICATION CLIENT,
  BINLOG MONITOR,
  RELOAD
ON *.* TO 'debezium'@'%';

FLUSH PRIVILEGES;

Настройка Kafka Connect

Source Connector (Debezium)

Обратите внимание, что server.id для Debezium должен отличаться от значения в MariaDB. В топике schema-changes.article будут сохраняться изменения схемы базы данных, а в топике cdc.article.ArticleDocuments данные из таблицы ArticleDocuments в базе данных article.

curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
    "name": "mariadb-cdc",
    "config": {
      "connector.class": "io.debezium.connector.mariadb.MariaDbConnector",
      "database.hostname": "mariadb",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.server.id": "223355",
      "database.server.name": "mariadb",
      "database.include.list": "article",
      "table.include.list": "article.ArticleDocuments",
      "snapshot.mode": "initial",
      "include.schema.changes": "false",
      "topic.prefix": "cdc",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
      "schema.history.internal.kafka.topic": "schema-changes.article",
      "decimal.handling.mode": "double",
      "time.precision.mode": "connect",
      "tombstones.on.delete": "true",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "transforms.unwrap.delete.handling.mode": "rewrite",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false"
    }
}'

Sink Connector (Elasticsearch)

curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
    "name": "article-elasticsearch-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "topics": "cdc.article.ArticleDocuments",
      "connection.url": "http://elasticsearch:9200",
      "connection.username": "elastic",
      "connection.password": "elastic_password",
      "key.ignore": "false",
      "schema.ignore": "true",
      "write.method": "upsert",
      "behavior.on.null.values": "delete",
      "transforms": "extractKey",
      "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
      "transforms.extractKey.field": "Id",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false"
    }
}'

Проверка работы

  1. Запустить все сервисы через Docker Compose.
  2. Убедиться, что Kafka Connect доступен на http://localhost:8083.
  3. Проверить статус обоих коннекторов (/status).
  4. Отправить POST-запрос в ASP.NET Core API.
  5. Проверить появление документа в Elasticsearch.

Методические ремарки

  • Пример намеренно упрощён: отсутствуют DLQ, мониторинг и schema registry.
  • Kafka Connect не заменяет бизнес-логику, а только транспорт данных.
  • Для production следует выделять отдельные topic’и для ошибок и ретраев.
  • Важно внимательно работать с правами MariaDB - Debezium чувствителен к ним.

Заключение

Kafka Connect позволяет выстроить надежный и расширяемый data-pipeline между MariaDB и Elasticsearch без изменения кода приложения. Использование Debezium делает интеграцию событийно-ориентированной, а Elasticsearch Sink Connector берет на себя задачу индексации. Далее пример можно развивать, добавляя mapping, alias, масштабирование и отказоустойчивость.