Подключение к Kafka в ASP.NET Core (Confluent.Kafka)

Иван Гурин Технологии интеграции информационных систем 6 мин

Apache Kafka - это распределённая платформа потоковой передачи сообщений (event streaming), предназначенная для высокопроизводительной, отказоустойчивой и масштабируемой передачи данных между сервисами. Kafka широко используется для:

  • асинхронного взаимодействия микросервисов;
  • обработки событий (event-driven архитектура);
  • логирования и аналитики;
  • интеграции между различными системами.

В этой статье мы разберём базовое подключение к Kafka из ASP.NET Core приложения и реализуем простой пример отправки сообщения в Kafka Topic с использованием библиотеки Confluent.Kafka.

Стек технологий

  • ASP.NET Core
  • NuGet-пакет Confluent.Kafka
  • Apache Kafka
  • Docker / Docker Compose (для локального запуска Kafka)

Архитектура примера

Поток данных будет следующим:

HTTP GET /home/send
        ↓
HomeController
        ↓
KafkaProducerService
        ↓
Kafka Topic (demo-topic)

Мы не используем интерфейсы и DI-абстракции ради простоты - только конкретный сервис.

Установка пакета Confluent.Kafka

Добавьте NuGet-пакет (через консоль диспетчера пакетов):

Install-Package Confluent.Kafka

KafkaProducerService

Создадим сервис, который отвечает за отправку сообщений в Kafka.

using Confluent.Kafka;

public class KafkaProducerService
{
    private readonly IProducer<Null, string> _producer;

    public KafkaProducerService(ProducerConfig config)
    {
        _producer = new ProducerBuilder<Null, string>(config).Build();
    }

    public async Task ProduceAsync(string topic, string message)
    {
        var kafkaMessage = new Message<Null, string>
        {
            Value = message
        };

        await _producer.ProduceAsync(topic, kafkaMessage);
    }
}

Регистрация сервиса в DI

В Program.cs зарегистрируем сервис как Singleton:

/* в реальном проекте лучше вынести это в `appsettings.json` */

builder.Services.AddSingleton(new ProducerConfig
{
    BootstrapServers = "localhost:9094"
});

builder.Services.AddSingleton<KafkaProducerService>();

HomeController

Контроллер, который отправляет сообщение в Kafka при GET-запросе.

using Microsoft.AspNetCore.Mvc;

public class HomeController : Controller
{
    private readonly KafkaProducerService _kafkaProducer;

    public HomeController(KafkaProducerService kafkaProducer)
    {
        _kafkaProducer = kafkaProducer;
    }

    [HttpGet]
    public async Task<IActionResult> Send()
    {
        await _kafkaProducer.ProduceAsync(
            topic: "demo-topic",
            message: "Hello from ASP.NET Core"
        );

        return Ok("Message sent to Kafka");
    }

    public IActionResult Index()
    {
        return View();
    }
}

После запуска приложения вызовите:

GET http://localhost:<port>/home/send

Docker Compose для Kafka

services:
  kafka-ui:
    container_name: kafka-ui
    hostname: kafka-ui
    image: provectuslabs/kafka-ui:latest
    restart: always
    ports:
      - 10025:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
    volumes:
      - './config-ui.yaml:/etc/kafkaui/dynamic_config.yaml'
    networks:
      - kafkanet
      
  kafka:
    image: apache/kafka:3.9.1
    restart: always
    hostname: kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
      KAFKA_LISTENERS: CLIENT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - kafkanet
    
networks:
  kafkanet:
    name: kafkanet

Конфигурация Kafka UI

auth:
  type: DISABLED
kafka:
  clusters:
  - bootstrapServers: kafka:9092
    name: idx
    properties:
      security.protocol: PLAINTEXT
    readOnly: false
rbac:
  roles: []
webclient: {}

Проверка работы

  1. Запустите Kafka через Docker Compose
  2. Запустите ASP.NET Core приложение
  3. Откройте /home/send
  4. Убедитесь, что сообщение появилось в demo-topic через Kafka UI

Заключение

В этой статье мы:

  • кратко разобрали, что такое Kafka;
  • подключили Kafka к ASP.NET Core с помощью Confluent.Kafka;
  • реализовали producer-сервис;
  • отправили сообщение в Kafka Topic через HTTP endpoint.

Этот пример - минимальная основа, которую можно расширять:

  • добавить конфигурации ретраев;
  • внедрить IHostedService;
  • использовать ключи сообщений;
  • добавить consumers и обработку ошибок.