Kafka - Elasticsearch: асинхронная индексация статей в ASP.NET Core

Иван Гурин Технологии интеграции информационных систем 10 мин

В предыдущей статье Подключение полнотекстового поиска из Elasticsearch в ASP.NET Core данные индексировались в Elasticsearch напрямую из HTTP-запроса. Такой подход прост, но плохо масштабируется и делает API синхронно зависимым от состояния Elasticsearch.

В этой статье мы рассмотрим более устойчивый и production-близкий вариант:

HTTP → ASP.NET Core → Kafka → Consumer → Elasticsearch

Цель:

  • HTTP-слой отвечает только за приём данных
  • Kafka - за буферизацию и надёжную доставку
  • Consumer - за индексацию

Фокус статьи - как взять сообщения из Kafka и положить их в Elasticsearch.

Данный пример написан для официального .NET клиента - библиотеки Elastic.Clients.Elasticsearch версии 9.x.

Архитектура примера

Мы используем один ASP.NET Core проект, внутри которого:

  • Метод контроллера через сервис публикует события в Kafka
  • BackgroundService читает Kafka и индексирует документы

Установка пакетов

Добавьте NuGet-пакеты (через консоль диспетчера пакетов):

Install-Package Elastic.Clients.Elasticsearch 
Install-Package Confluent.Kafka

Поток данных

  1. HTTP API принимает статью
  2. Статья сериализуется в JSON и отправляется в Kafka
  3. Hosted Service читает сообщения
  4. Данные кладутся в Elasticsearch с upsert по Id

Обратите внимание:

  • HTTP API не знает, когда документ появится в Elasticsearch
  • Система работает в режиме eventual consistency

Модель документа

public class ArticleDocument
{
    // Используется и как бизнес-ID, и как Elasticsearch _id
    public int Id { get; set; }

    public string Content { get; set; } = string.Empty;
}

Принципиально важный момент:

  • _id в Elasticsearch всегда строка
  • Числовой Id будет автоматически приведён

Kafka-сообщение

Мы передаём полный текст статьи, без обращения к БД из consumer’а.

{
  "id": 123,
  "content": "Полный текст статьи"
}

Это упрощает:

  • replay сообщений
  • тестирование
  • независимость consumer от БД

Kafka Producer

Отправку в Kafka считаем уже знакомой по предыдущей статье Подключение к Kafka в ASP.NET Core (Confluent.Kafka). В ней можно найти конфигурацию kafka для развертывания в docker. Приводим минимальный фрагмент для целостности примера.

using Confluent.Kafka;
using System.Text.Json;

public class ArticleKafkaProducer
{
    private readonly IProducer<string, string> _producer;

    public ArticleKafkaProducer(ProducerConfig config)
    {
        _producer = new ProducerBuilder<string, string>(config).Build();
    }

    public async Task PublishAsync(ArticleDocument article)
    {
        var json = JsonSerializer.Serialize(article);

        await _producer.ProduceAsync("articles", new Message<string, string>
        {
            Key = article.Id.ToString(),
            Value = json
        });
    }
}

Далее необходимо добавить регистрацию сервиса в Program.cs:

builder.Services.AddSingleton(new ProducerConfig
{
    BootstrapServers = "localhost:9094"
});

builder.Services.AddSingleton<ArticleKafkaProducer>();

Использование Kafka Producer из контроллера

Producer вызывается синхронно с HTTP-запросом, но не выполняет индексацию - он только публикует событие.

using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("api/article")]
public class ArticleController : ControllerBase
{
    private readonly ArticleKafkaProducer _producer;

    public ArticleController(ArticleKafkaProducer producer)
    {
        _producer = producer;
    }

    [HttpPost]
    public async Task<IActionResult> CreateArticle([FromBody] ArticleDocument article)
    {
        // Валидация и бизнес-логика опущены
        await _producer.PublishAsync(article);

        return Accepted();
    }
}

Метод возвращает ответ 202 Accepted по следующим причинам:

  • операция индексирования асинхронная
  • данные сначала попадают в Kafka
  • Elasticsearch будет обновлён позже

Таким ответом API явно сигнализирует клиенту: запрос принят, но ещё не полностью обработан.

Elasticsearch клиент

var settings = new ElasticsearchClientSettings(new Uri("http://localhost:9200"))
    .Authentication(new BasicAuthentication("elastic", "elastic_password"))
    .DefaultIndex("articles");

builder.Services.AddSingleton(new ElasticsearchClient(settings));

Клиент:

  • потокобезопасен
  • регистрируется как Singleton

Kafka Consumer как Hosted Service

Это ключевой элемент всей архитектуры.

using Confluent.Kafka;
using Elastic.Clients.Elasticsearch;
using System.Text.Json;

public class KafkaToElasticHostedService : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly ElasticsearchClient _client;

    public KafkaToElasticHostedService(
        IConsumer<string, string> consumer,
        ElasticsearchClient client)
    {
        _consumer = consumer;
        _client = client;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe("articles");

        // Даем хосту завершить старт
        await Task.Yield();

        while (!stoppingToken.IsCancellationRequested)
        {
            var result = _consumer.Consume(stoppingToken);

            if (result?.Message == null)
                continue;

            var document = JsonSerializer.Deserialize<ArticleDocument>(result.Message.Value);

            if (document == null)
                continue;

            // Upsert по Id
            await _client.IndexAsync(document, i => i
                .Id(document.Id)
                .Index("articles")
            );

            // Коммит offset ТОЛЬКО после успешной записи
            _consumer.Commit(result);
        }
    }
}

Ключевые моменты

  • Consumer живёт в BackgroundService
  • Offset коммитится после записи в Elasticsearch
  • Повторное сообщение = безопасный upsert

Регистрация Kafka Consumer

builder.Services.AddSingleton<IConsumer<string, string>>(_ =>
{
    var config = new ConsumerConfig
    {
        BootstrapServers = "localhost:9094",
        GroupId = "article-indexer",
        AutoOffsetReset = AutoOffsetReset.Earliest,
        EnableAutoCommit = false
    };

    return new ConsumerBuilder<string, string>(config)
    .Build();
});

builder.Services.AddHostedService<KafkaToElasticHostedService>();

Почему отключён auto-commit:

  • контролируется момент подтверждения
  • избегаем потери сообщений

В примере consumer в KafkaToElasticHostedService подписывается на topic articles сразу при старте. Если topic не существует, Kafka consumer выбросит исключение.

В production-решениях обычно:

  • управляют созданием топиков отдельно
  • используют AutoCreateTopicsEnable

В учебном примере topic необходимо создать заранее, либо закомментировать добавление AddHostedService до отправки первого сообщения.

Индекс Elasticsearch

Для примера используется автосоздание индекса. Для production рекомендуется создать его явно.

Проверка работы

  1. Запустить Kafka
  2. Запустить Elasticsearch
  3. Запустить ASP.NET Core
  4. Отправить статью в HTTP API
  5. Убедиться, что сообщение появилось в Kafka
  6. Убедиться, что документ появился в Elasticsearch
  7. Выполнить поиск

Для отправки сообщения в kafka можно использовать следующий запрос:

curl --location 'http://localhost/api/article' \
--header 'Content-Type: application/json' \
--data '{
    "id": 123,
    "content": "Тестовая статья. Kafka - Elasticsearch: асинхронная индексация статей в ASP.NET Core"
}'

Для поиска используйте метод api/search из предыдущей статьи:

curl --location 'http://localhost/api/search/?q=kafka'

Результат должен быть таким:

[
    {
        "id": 123,
        "content": "Тестовая статья. Kafka - Elasticsearch: асинхронная индексация статей в ASP.NET Core"
    }
]

Методические ремарки

Что важно для production

  • отдельный Worker Service
  • health-check Kafka и Elasticsearch
  • контроль схем сообщений
  • мониторинг consumer lag
  • ручное управление refresh

Типичные ошибки

  • индексирование без Id
  • логика consumer в контроллерах
  • использование Elasticsearch как primary DB

Альтернатива: Kafka Connect + Elasticsearch Sink

В production-системах возможна индексация через Kafka Connect + Elasticsearch Sink Connector, без написания consumer’а.

Плюсы:

  • меньше кода
  • готовые retry-механизмы

Минусы:

  • отдельная инфраструктура
  • ограниченная бизнес-логика
  • сложнее отлаживать

Для учебных и backend-ориентированных проектов кастомный consumer даёт лучший контроль и понимание процесса.

Заключение

В этой статье мы:

  • вынесли индексацию из HTTP слоя
  • добавили Kafka как буфер
  • реализовали consumer через Hosted Service
  • сделали идемпотентный upsert в Elasticsearch

Получилась архитектура:

  • устойчивая
  • расширяемая
  • близкая к production

Эта статья логически продолжает предыдущую и показывает, как эволюционирует поиск в реальных системах.