Подключение RabbitMQ в ASP.NET Core

Иван Гурин Технологии интеграции информационных систем 7 мин

RabbitMQ - это брокер сообщений, реализующий протокол AMQP (Advanced Message Queuing Protocol). Он используется для асинхронного взаимодействия между компонентами системы и отлично подходит для:

  • фоновых задач;
  • очередей сообщений;
  • балансировки нагрузки между воркерами;
  • интеграции сервисов.

В основе RabbitMQ лежит концепция exchange → queue → consumer:

  • Exchange принимает сообщения от producer’ов;
  • Queue хранит сообщения;
  • Consumer читает сообщения из очереди.

Типы exchange

  • direct - сообщение попадает в очередь, если routingKey совпадает;
  • fanout - сообщение рассылается во все связанные очереди;
  • topic - маршрутизация по шаблонам (order.*, *.created).

В этой статье мы рассмотрим упрощённый вариант с использованием default exchange и прямой отправкой сообщения в очередь.

Стек технологий

  • ASP.NET Core
  • NuGet-пакет RabbitMQ.Client
  • RabbitMQ
  • Docker / Docker Compose

Данный пример написан для RabbitMQ.Client версии 7.x. Начиная с версии 7.0 библиотека перешла на async-only API:

  • отсутствуют синхронные методы (CreateModel, QueueDeclare, BasicPublish);
  • используются только асинхронные методы (CreateConnectionAsync, CreateChannelAsync, QueueDeclareAsync, BasicPublishAsync).

Архитектура примера

HTTP GET /home/send
        ↓
HomeController
        ↓
RabbitMqProducerService
        ↓
RabbitMQ Queue (demo-queue)

Пример сознательно упрощён: без интерфейсов и дополнительных абстракций.

Установка пакета RabbitMQ.Client

Добавьте NuGet-пакет:

Install-Package RabbitMQ.Client

RabbitMqProducerService

Сервис отвечает за подключение к RabbitMQ и отправку сообщений в очередь.

using RabbitMQ.Client;
using System.Text;

namespace IntegrationProject.Services;

public class RabbitMqProducerService
{
    private readonly ConnectionFactory _connectionFactory;
    private IConnection? _connection;
    private IChannel? _channel;

    public RabbitMqProducerService(ConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
    }

    public async Task InitializeAsync()
    {
        _connection ??= await _connectionFactory.CreateConnectionAsync();

        // Создаём канал (logical connection) - все операции выполняются через него
        _channel = await _connection.CreateChannelAsync();

        // ⚠️ Объявление очереди
        // В RabbitMQ producer обязан гарантировать, что очередь существует.
        // QueueDeclare - идемпотентная операция:
        // если очередь уже есть с теми же параметрами, ничего не произойдёт
        await _channel.QueueDeclareAsync(
            queue: "demo-queue",

            // durable: true  → очередь переживёт рестарт брокера
            // durable: false → очередь будет удалена при перезапуске RabbitMQ
            durable: false,

            // exclusive: true  → очередь доступна только этому соединению
            // exclusive: false → очередь доступна другим подключениям
            exclusive: false,

            // autoDelete: true  → очередь будет удалена,
            //                    когда отключится последний consumer
            // autoDelete: false → очередь живёт до явного удаления
            autoDelete: false,

            arguments: null
        );
    }

    public async Task SendAsync(string message)
    {
        if (_channel is null)
        {
            await InitializeAsync();
        }

        var body = Encoding.UTF8.GetBytes(message);

        // Отправка сообщения в default exchange ("")
        await _channel.BasicPublishAsync(
            exchange: "",
            routingKey: "demo-queue",
            body: body
        );
    }
}

⚠️ В данном примере используется ленивая асинхронная инициализация соединения и канала. Такой подход упрощает код и подходит для учебных целей, но в production рекомендуется:

  • инициализировать соединение при старте приложения;
  • использовать отдельный канал на поток или синхронизацию;
  • явно управлять жизненным циклом ресурсов.

Регистрация в DI

В Program.cs зарегистрируем ConnectionFactory и сервис:

builder.Services.AddSingleton(new ConnectionFactory
{
    HostName = "localhost",
    Port = 5672,
    UserName = "rabbituser",
    Password = "rabbitpassword"
});

builder.Services.AddSingleton<RabbitMqProducerService>();

HomeController

Контроллер отправляет сообщение в очередь RabbitMQ.

using Microsoft.AspNetCore.Mvc;

public class HomeController : Controller
{
    private readonly RabbitMqProducerService _rabbitMqProducer;

    public HomeController(RabbitMqProducerService rabbitMqProducer)
    {
        _rabbitMqProducer = rabbitMqProducer;
    }

    [HttpGet]
    public async Task<IActionResult> Send()
    {
        await _rabbitMqProducer.SendAsync("Hello from ASP.NET Core");

        return Ok("Message sent to RabbitMQ");
    }

    public IActionResult Index()
    {
        return View();
    }
}

Docker Compose для RabbitMQ

services:
  rabbitmq:
    image: "rabbitmq:4.2.2-management"
    hostname: rabbitmq
    restart: always
    environment:
      - RABBITMQ_DEFAULT_USER=rabbituser
      - RABBITMQ_DEFAULT_PASS=rabbitpassword
      - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 2147483648
    volumes:
      - ./rabbitmq:/var/lib/rabbitmq
    ports:
      - "15672:15672"
      - "5672:5672"

Проверка работы

  1. Запустите RabbitMQ через Docker Compose
  2. Откройте RabbitMQ Management UI (http://localhost:15672)
  3. Запустите ASP.NET Core приложение
  4. Вызовите /home/send
  5. Убедитесь, что сообщение появилось в очереди demo-queue

Сравнение Kafka и RabbitMQ

Критерий Kafka RabbitMQ
Модель Event streaming Message queue
Хранение сообщений Да Опционально
Replay сообщений Да Нет
Задержка Очень низкая Низкая
Простота Сложнее Проще

Заключение

В этой статье мы:

  • разобрали, что такое RabbitMQ;
  • подключили RabbitMQ к ASP.NET Core;
  • реализовали producer-сервис;
  • отправили сообщение в очередь через HTTP endpoint.

Данный пример - хорошая отправная точка. В production обычно добавляют:

  • подтверждения доставки (ack);
  • durable-очереди;
  • exchange-типы (direct, fanout, topic);
  • consumers и retry-механику.