Интеграция MariaDB и Elasticsearch с помощью Kafka Connect
Kafka Connect - это отдельный runtime-сервис, а не библиотека. Он запускается независимо от ASP.NET Core приложения и управляется через REST API. Его основное назначение - построение интеграций между Kafka и внешними системами без написания собственного producer- и consumer-кода.
В рамках данной статьи будет использован один Source Connector и один Sink Connector:
- Source Connector - для получения изменений данных из MariaDB;
- Sink Connector - для загрузки этих изменений в Elasticsearch.
Данная статья является логическим продолжением материала:
Kafka → Elasticsearch: асинхронная индексация статей в ASP.NET Core. В предыдущем подходе индексация выполнялась кастомным Kafka consumer’ом, реализованным через HostedService. Здесь показано, как заменить consumer инфраструктурным решением на базе Kafka Connect, не меняя API и формат сообщений.
Для получения изменений из базы данных используется Debezium - платформа Change Data Capture (CDC). Debezium читает binlog транзакционных баз данных и публикует изменения в Kafka. Он поддерживает такие системы, как MySQL, MariaDB, PostgreSQL, MongoDB и SQL Server.
Архитектура примера
Архитектура решения состоит из следующих компонентов:
- ASP.NET Core веб-приложение - принимает HTTP-запросы и сохраняет данные в базу;
- MariaDB - основное хранилище данных;
- Kafka - транспорт для событий;
- Kafka Connect - инфраструктурный сервис для интеграций;
- Debezium MariaDB Source Connector - источник CDC-событий;
- Elasticsearch Sink Connector - загрузка данных в Elasticsearch;
- Elasticsearch - поисковый движок и хранилище индекса.
Приложение пишет данные только в MariaDB. Все дальнейшие шаги - публикация событий, доставка в Elasticsearch и повторные попытки, - выполняются инфраструктурой.
Инфраструктура
Сервер MariaDB (настройка для Debezium)
services:
mariadb:
image: mariadb:11.2
container_name: mariadb
hostname: mariadb
restart: always
ports:
- "3306:3306"
environment:
MARIADB_ROOT_PASSWORD: root
MARIADB_DATABASE: article
MARIADB_USER: debezium
MARIADB_PASSWORD: dbz
command:
# Обязательные параметры для Debezium
- --server-id=223344
- --log-bin=mysql-bin
- --binlog-format=ROW
- --binlog-row-image=FULL
- --binlog-expire-logs-seconds=604800
- --binlog-annotate-row-events=ON
volumes:
- mariadb-data:/var/lib/mysql
networks:
- mariadbnet
volumes:
mariadb-data:
networks:
mariadbnet:
name: mariadbnet
Elasticsearch и Kibana
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.19.9
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=elastic_password
- xpack.security.http.ssl.enabled=false
- xpack.security.transport.ssl.enabled=false
- ES_JAVA_OPTS=-Xms1g -Xmx1g
ports:
- "9200:9200"
volumes:
- esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
networks:
- elastic
kibana:
image: docker.elastic.co/kibana/kibana:8.19.9
container_name: kibana
depends_on:
- elasticsearch
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=kibana_password
ports:
- "5601:5601"
networks:
- elastic
volumes:
esdata:
networks:
elastic:
name: elastic
driver: bridge
Kafka и Kafka Connect
Kafka Connect подключён одновременно к Kafka, MariaDB и Elasticsearch.
services:
kafka-ui:
container_name: kafka-ui
hostname: kafka-ui
image: provectuslabs/kafka-ui:latest
restart: always
ports:
- 10025:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- './config-ui.yaml:/etc/kafkaui/dynamic_config.yaml'
networks:
- kafkanet
kafka:
image: apache/kafka:3.9.1
restart: always
hostname: kafka
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_LISTENERS: CLIENT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
networks:
- kafkanet
kafka-connect:
build: ./kafka-connect/
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_GROUP_ID: "kafka-connect-elastic"
CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
depends_on:
- kafka
networks:
- kafkanet
- elastic
- mariadbnet
networks:
kafkanet:
name: kafkanet
elastic:
name: elastic
external: true
mariadbnet:
name: mariadbnet
external: true
Dockerfile для Kafka Connect
Образ Kafka Connect включает установленный из confluent-hub плагин для подключения к Elasticsearch и плагин Debezium MariaDB Connector, который устанавливается из архива.
FROM confluentinc/cp-kafka-connect:7.7.7
# Устанавка Elasticsearch Sink Connector
# https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:15.1.0
ENV DEBEZIUM_VERSION=3.4.0.Final
ENV PLUGIN_DIR=/usr/share/java
USER root
# Скачать и распаковать Debezium MariaDB Connector
RUN mkdir -p ${PLUGIN_DIR}/debezium-connector-mariadb \
&& curl -fSL https://repo1.maven.org/maven2/io/debezium/debezium-connector-mariadb/${DEBEZIUM_VERSION}/debezium-connector-mariadb-${DEBEZIUM_VERSION}-plugin.tar.gz \
| tar -xz --strip-components=1 -C ${PLUGIN_DIR}/debezium-connector-mariadb
# Проверка, что jar-файлы распаковались
RUN ls -la ${PLUGIN_DIR}/debezium-connector-mariadb
ASP.NET Core приложение
DbContext
using Microsoft.EntityFrameworkCore;
public class ArticleDbContext : DbContext
{
public virtual DbSet<ArticleDocument> ArticleDocuments { get; set; }
public ArticleDbContext(DbContextOptions<ArticleDbContext> options) : base(options)
{
base.Database.SetCommandTimeout(30);
}
}
Модель
public class ArticleDocument
{
public int Id { get; set; }
// Основное поле для полнотекстового поиска
public string Content { get; set; } = string.Empty;
}
Регистрация DbContext
var connectionString = builder.Configuration.GetConnectionString("ArticleDbConnection");
builder.Services.AddDbContext<ArticleDbContext>(options =>
{
options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString));
});
Контроллер
[ApiController]
[Route("api/article")]
public class ArticleController : ControllerBase
{
private readonly ArticleDbContext _context;
public ArticleController(ArticleDbContext context)
{
_context = context;
}
[HttpPost]
public async Task<IActionResult> CreateArticle([FromBody] ArticleDocument article)
{
await _context.ArticleDocuments.AddAsync(article);
await _context.SaveChangesAsync();
return Accepted();
}
}
База данных
База данных создаётся с использованием подхода Code-First. Структура созданной таблицы:
CREATE TABLE `ArticleDocuments` (
`Id` INT(11) NOT NULL AUTO_INCREMENT,
`Content` LONGTEXT NOT NULL COLLATE 'utf8mb4_general_ci',
PRIMARY KEY (`Id`) USING BTREE
)
COLLATE='utf8mb4_general_ci' ENGINE=InnoDB;
Для работы Debezium в режиме репликации необходимо выдать дополнительные права:
docker exec -it mariadb mariadb -uroot -proot
GRANT
SELECT,
REPLICATION SLAVE,
REPLICATION CLIENT,
BINLOG MONITOR,
RELOAD
ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
Настройка Kafka Connect
Source Connector (Debezium)
Обратите внимание, что server.id для Debezium должен отличаться от значения в MariaDB.
В топике schema-changes.article будут сохраняться изменения схемы базы данных, а в топике cdc.article.ArticleDocuments данные из таблицы ArticleDocuments в базе данных article.
curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "mariadb-cdc",
"config": {
"connector.class": "io.debezium.connector.mariadb.MariaDbConnector",
"database.hostname": "mariadb",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "223355",
"database.server.name": "mariadb",
"database.include.list": "article",
"table.include.list": "article.ArticleDocuments",
"snapshot.mode": "initial",
"include.schema.changes": "false",
"topic.prefix": "cdc",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.article",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
Sink Connector (Elasticsearch)
curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "article-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "cdc.article.ArticleDocuments",
"connection.url": "http://elasticsearch:9200",
"connection.username": "elastic",
"connection.password": "elastic_password",
"key.ignore": "false",
"schema.ignore": "true",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "Id",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
}
}'
Проверка работы
- Запустить все сервисы через Docker Compose.
- Убедиться, что Kafka Connect доступен на
http://localhost:8083. - Проверить статус обоих коннекторов (
/status). - Отправить POST-запрос в ASP.NET Core API.
- Проверить появление документа в Elasticsearch.
Методические ремарки
- Пример намеренно упрощён: отсутствуют DLQ, мониторинг и schema registry.
- Kafka Connect не заменяет бизнес-логику, а только транспорт данных.
- Для production следует выделять отдельные topic’и для ошибок и ретраев.
- Важно внимательно работать с правами MariaDB - Debezium чувствителен к ним.
Заключение
Kafka Connect позволяет выстроить надежный и расширяемый data-pipeline между MariaDB и Elasticsearch без изменения кода приложения. Использование Debezium делает интеграцию событийно-ориентированной, а Elasticsearch Sink Connector берет на себя задачу индексации. Далее пример можно развивать, добавляя mapping, alias, масштабирование и отказоустойчивость.