Kafka - Elasticsearch: асинхронная индексация статей в ASP.NET Core
В предыдущей статье Подключение полнотекстового поиска из Elasticsearch в ASP.NET Core данные индексировались в Elasticsearch напрямую из HTTP-запроса. Такой подход прост, но плохо масштабируется и делает API синхронно зависимым от состояния Elasticsearch.
В этой статье мы рассмотрим более устойчивый и production-близкий вариант:
HTTP → ASP.NET Core → Kafka → Consumer → Elasticsearch
Цель:
- HTTP-слой отвечает только за приём данных
- Kafka - за буферизацию и надёжную доставку
- Consumer - за индексацию
Фокус статьи - как взять сообщения из Kafka и положить их в Elasticsearch.
Данный пример написан для официального .NET клиента - библиотеки Elastic.Clients.Elasticsearch версии 9.x.
Архитектура примера
Мы используем один ASP.NET Core проект, внутри которого:
- Метод контроллера через сервис публикует события в Kafka
- BackgroundService читает Kafka и индексирует документы
Установка пакетов
Добавьте NuGet-пакеты (через консоль диспетчера пакетов):
Install-Package Elastic.Clients.Elasticsearch
Install-Package Confluent.Kafka
Поток данных
- HTTP API принимает статью
- Статья сериализуется в JSON и отправляется в Kafka
- Hosted Service читает сообщения
- Данные кладутся в Elasticsearch с upsert по Id
Обратите внимание:
- HTTP API не знает, когда документ появится в Elasticsearch
- Система работает в режиме eventual consistency
Модель документа
public class ArticleDocument
{
// Используется и как бизнес-ID, и как Elasticsearch _id
public int Id { get; set; }
public string Content { get; set; } = string.Empty;
}
Принципиально важный момент:
_idв Elasticsearch всегда строка- Числовой
Idбудет автоматически приведён
Kafka-сообщение
Мы передаём полный текст статьи, без обращения к БД из consumer’а.
{
"id": 123,
"content": "Полный текст статьи"
}
Это упрощает:
- replay сообщений
- тестирование
- независимость consumer от БД
Kafka Producer
Отправку в Kafka считаем уже знакомой по предыдущей статье Подключение к Kafka в ASP.NET Core (Confluent.Kafka). В ней можно найти конфигурацию kafka для развертывания в docker. Приводим минимальный фрагмент для целостности примера.
using Confluent.Kafka;
using System.Text.Json;
public class ArticleKafkaProducer
{
private readonly IProducer<string, string> _producer;
public ArticleKafkaProducer(ProducerConfig config)
{
_producer = new ProducerBuilder<string, string>(config).Build();
}
public async Task PublishAsync(ArticleDocument article)
{
var json = JsonSerializer.Serialize(article);
await _producer.ProduceAsync("articles", new Message<string, string>
{
Key = article.Id.ToString(),
Value = json
});
}
}
Далее необходимо добавить регистрацию сервиса в Program.cs:
builder.Services.AddSingleton(new ProducerConfig
{
BootstrapServers = "localhost:9094"
});
builder.Services.AddSingleton<ArticleKafkaProducer>();
Использование Kafka Producer из контроллера
Producer вызывается синхронно с HTTP-запросом, но не выполняет индексацию - он только публикует событие.
using Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("api/article")]
public class ArticleController : ControllerBase
{
private readonly ArticleKafkaProducer _producer;
public ArticleController(ArticleKafkaProducer producer)
{
_producer = producer;
}
[HttpPost]
public async Task<IActionResult> CreateArticle([FromBody] ArticleDocument article)
{
// Валидация и бизнес-логика опущены
await _producer.PublishAsync(article);
return Accepted();
}
}
Метод возвращает ответ 202 Accepted по следующим причинам:
- операция индексирования асинхронная
- данные сначала попадают в Kafka
- Elasticsearch будет обновлён позже
Таким ответом API явно сигнализирует клиенту: запрос принят, но ещё не полностью обработан.
Elasticsearch клиент
var settings = new ElasticsearchClientSettings(new Uri("http://localhost:9200"))
.Authentication(new BasicAuthentication("elastic", "elastic_password"))
.DefaultIndex("articles");
builder.Services.AddSingleton(new ElasticsearchClient(settings));
Клиент:
- потокобезопасен
- регистрируется как Singleton
Kafka Consumer как Hosted Service
Это ключевой элемент всей архитектуры.
using Confluent.Kafka;
using Elastic.Clients.Elasticsearch;
using System.Text.Json;
public class KafkaToElasticHostedService : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly ElasticsearchClient _client;
public KafkaToElasticHostedService(
IConsumer<string, string> consumer,
ElasticsearchClient client)
{
_consumer = consumer;
_client = client;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("articles");
// Даем хосту завершить старт
await Task.Yield();
while (!stoppingToken.IsCancellationRequested)
{
var result = _consumer.Consume(stoppingToken);
if (result?.Message == null)
continue;
var document = JsonSerializer.Deserialize<ArticleDocument>(result.Message.Value);
if (document == null)
continue;
// Upsert по Id
await _client.IndexAsync(document, i => i
.Id(document.Id)
.Index("articles")
);
// Коммит offset ТОЛЬКО после успешной записи
_consumer.Commit(result);
}
}
}
Ключевые моменты
- Consumer живёт в
BackgroundService - Offset коммитится после записи в Elasticsearch
- Повторное сообщение = безопасный upsert
Регистрация Kafka Consumer
builder.Services.AddSingleton<IConsumer<string, string>>(_ =>
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9094",
GroupId = "article-indexer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
return new ConsumerBuilder<string, string>(config)
.Build();
});
builder.Services.AddHostedService<KafkaToElasticHostedService>();
Почему отключён auto-commit:
- контролируется момент подтверждения
- избегаем потери сообщений
В примере consumer в KafkaToElasticHostedService подписывается на topic
articlesсразу при старте. Если topic не существует, Kafka consumer выбросит исключение.В production-решениях обычно:
- управляют созданием топиков отдельно
- используют AutoCreateTopicsEnable
В учебном примере topic необходимо создать заранее, либо закомментировать добавление
AddHostedServiceдо отправки первого сообщения.
Индекс Elasticsearch
Для примера используется автосоздание индекса. Для production рекомендуется создать его явно.
Проверка работы
- Запустить Kafka
- Запустить Elasticsearch
- Запустить ASP.NET Core
- Отправить статью в HTTP API
- Убедиться, что сообщение появилось в Kafka
- Убедиться, что документ появился в Elasticsearch
- Выполнить поиск
Для отправки сообщения в kafka можно использовать следующий запрос:
curl --location 'http://localhost/api/article' \
--header 'Content-Type: application/json' \
--data '{
"id": 123,
"content": "Тестовая статья. Kafka - Elasticsearch: асинхронная индексация статей в ASP.NET Core"
}'
Для поиска используйте метод api/search из предыдущей статьи:
curl --location 'http://localhost/api/search/?q=kafka'
Результат должен быть таким:
[
{
"id": 123,
"content": "Тестовая статья. Kafka - Elasticsearch: асинхронная индексация статей в ASP.NET Core"
}
]
Методические ремарки
Что важно для production
- отдельный Worker Service
- health-check Kafka и Elasticsearch
- контроль схем сообщений
- мониторинг consumer lag
- ручное управление refresh
Типичные ошибки
- индексирование без Id
- логика consumer в контроллерах
- использование Elasticsearch как primary DB
Альтернатива: Kafka Connect + Elasticsearch Sink
В production-системах возможна индексация через Kafka Connect + Elasticsearch Sink Connector, без написания consumer’а.
Плюсы:
- меньше кода
- готовые retry-механизмы
Минусы:
- отдельная инфраструктура
- ограниченная бизнес-логика
- сложнее отлаживать
Для учебных и backend-ориентированных проектов кастомный consumer даёт лучший контроль и понимание процесса.
Заключение
В этой статье мы:
- вынесли индексацию из HTTP слоя
- добавили Kafka как буфер
- реализовали consumer через Hosted Service
- сделали идемпотентный upsert в Elasticsearch
Получилась архитектура:
- устойчивая
- расширяемая
- близкая к production
Эта статья логически продолжает предыдущую и показывает, как эволюционирует поиск в реальных системах.