Подключение к Kafka в ASP.NET Core (Confluent.Kafka)
Apache Kafka - это распределённая платформа потоковой передачи сообщений (event streaming), предназначенная для высокопроизводительной, отказоустойчивой и масштабируемой передачи данных между сервисами. Kafka широко используется для:
- асинхронного взаимодействия микросервисов;
- обработки событий (event-driven архитектура);
- логирования и аналитики;
- интеграции между различными системами.
В этой статье мы разберём базовое подключение к Kafka из ASP.NET Core приложения и реализуем простой пример отправки сообщения в Kafka Topic с использованием библиотеки Confluent.Kafka.
Стек технологий
- ASP.NET Core
- NuGet-пакет
Confluent.Kafka - Apache Kafka
- Docker / Docker Compose (для локального запуска Kafka)
Архитектура примера
Поток данных будет следующим:
HTTP GET /home/send
↓
HomeController
↓
KafkaProducerService
↓
Kafka Topic (demo-topic)
Мы не используем интерфейсы и DI-абстракции ради простоты - только конкретный сервис.
Установка пакета Confluent.Kafka
Добавьте NuGet-пакет (через консоль диспетчера пакетов):
Install-Package Confluent.Kafka
KafkaProducerService
Создадим сервис, который отвечает за отправку сообщений в Kafka.
using Confluent.Kafka;
public class KafkaProducerService
{
private readonly IProducer<Null, string> _producer;
public KafkaProducerService(ProducerConfig config)
{
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task ProduceAsync(string topic, string message)
{
var kafkaMessage = new Message<Null, string>
{
Value = message
};
await _producer.ProduceAsync(topic, kafkaMessage);
}
}
Регистрация сервиса в DI
В Program.cs зарегистрируем сервис как Singleton:
/* в реальном проекте лучше вынести это в `appsettings.json` */
builder.Services.AddSingleton(new ProducerConfig
{
BootstrapServers = "localhost:9094"
});
builder.Services.AddSingleton<KafkaProducerService>();
HomeController
Контроллер, который отправляет сообщение в Kafka при GET-запросе.
using Microsoft.AspNetCore.Mvc;
public class HomeController : Controller
{
private readonly KafkaProducerService _kafkaProducer;
public HomeController(KafkaProducerService kafkaProducer)
{
_kafkaProducer = kafkaProducer;
}
[HttpGet]
public async Task<IActionResult> Send()
{
await _kafkaProducer.ProduceAsync(
topic: "demo-topic",
message: "Hello from ASP.NET Core"
);
return Ok("Message sent to Kafka");
}
public IActionResult Index()
{
return View();
}
}
После запуска приложения вызовите:
GET http://localhost:<port>/home/send
Docker Compose для Kafka
services:
kafka-ui:
container_name: kafka-ui
hostname: kafka-ui
image: provectuslabs/kafka-ui:latest
restart: always
ports:
- 10025:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- './config-ui.yaml:/etc/kafkaui/dynamic_config.yaml'
networks:
- kafkanet
kafka:
image: apache/kafka:3.9.1
restart: always
hostname: kafka
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_LISTENERS: CLIENT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
networks:
- kafkanet
networks:
kafkanet:
name: kafkanet
Конфигурация Kafka UI
auth:
type: DISABLED
kafka:
clusters:
- bootstrapServers: kafka:9092
name: idx
properties:
security.protocol: PLAINTEXT
readOnly: false
rbac:
roles: []
webclient: {}
Проверка работы
- Запустите Kafka через Docker Compose
- Запустите ASP.NET Core приложение
- Откройте
/home/send - Убедитесь, что сообщение появилось в
demo-topicчерез Kafka UI
Заключение
В этой статье мы:
- кратко разобрали, что такое Kafka;
- подключили Kafka к ASP.NET Core с помощью
Confluent.Kafka; - реализовали producer-сервис;
- отправили сообщение в Kafka Topic через HTTP endpoint.
Этот пример - минимальная основа, которую можно расширять:
- добавить конфигурации ретраев;
- внедрить
IHostedService; - использовать ключи сообщений;
- добавить consumers и обработку ошибок.