Skip to content

Kafka-консьюмеры (Входящие потоки)

Подсистема поддерживает получение сообщений из топиков Apache Kafka через механизм консьюмеров. Это позволяет организовать входящие потоки данных из внешних систем.

Принцип работы

Архитектура

┌──────────────────────────────────────────────────────────────────────────────┐
│                      Внешняя система (Producer)                               │
└───────────────────────────────┬──────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────────────────┐
│                         Apache Kafka                                          │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐               │
│  │   Topic A       │  │   Topic B       │  │   Topic C       │               │
│  │  (orders)       │  │  (inventory)    │  │  (customers)    │               │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘               │
└───────────────────────────────┬──────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────────────────┐
│                    1C:Enterprise (Consumer)                                   │
│                                                                               │
│  ┌─────────────────────────────────────────────────────────────────────────┐ │
│  │  РЗ: инт_МенеджерВходящихПотоков (каждые 30 сек)                        │ │
│  │  ┌───────────────────────────────────────────────────────────────────┐  │ │
│  │  │  Проверяет активные подписки → Запускает ФЗ консьюмеров           │  │ │
│  │  └───────────────────────────────────────────────────────────────────┘  │ │
│  └─────────────────────────────────────────────────────────────────────────┘ │
│                                │                                              │
│                   ┌────────────┴────────────┐                                │
│                   ▼                          ▼                                │
│  ┌─────────────────────────┐    ┌─────────────────────────┐                  │
│  │  Асинхронная обработка  │    │  Синхронная обработка   │                  │
│  │  (запись в очередь)     │    │  (немедленно + DLQ)     │                  │
│  └─────────────────────────┘    └─────────────────────────┘                  │
└──────────────────────────────────────────────────────────────────────────────┘

Цикл жизни консьюмера

  1. Регламентное задание инт_МенеджерВходящихПотоков запускается каждые 30 секунд
  2. Получает список активных входящих потоков с подписками Kafka
  3. Группирует подписки по эндпоинтам (брокерам)
  4. Для каждой группы проверяет наличие активного фонового задания
  5. Если ФЗ нет или завершено — запускает новый консьюмер
  6. Регистрирует ФЗ в регистре инт_АктивныеКонсьюмеры
bsl
// Упрощённый алгоритм менеджера
Процедура ЗапуститьКонсьюмерыАсинхронныхПодписок()
    
    МассивПодписок = ПолучитьАктивныеПодписки();
    ПодпискиПоЭндпоинтам = СгруппироватьПодпискиПоЭндпоинтам(МассивПодписок);
    
    Для Каждого КлючЗначение Из ПодпискиПоЭндпоинтам Цикл
        // Проверяем наличие активного ФЗ
        Если НЕ ЕстьАктивноеЗадание(КлючЗначение.Значение) Тогда
            ЗапуститьКонсьюмер(КлючЗначение.Значение);
        КонецЕсли;
    КонецЦикла;
    
КонецПроцедуры

Настройка подписчика Kafka

Справочник инт_ПодписчикиKafka

Для настройки консьюмера создайте элемент справочника инт_ПодписчикиKafka:

РеквизитТипОписание
НаименованиеСтрокаПонятное имя подписчика
АктивенБулевоВключён ли консьюмер
ЭндпоинтСправочникСсылка.инт_ЭндпоинтыБрокеры Kafka (bootstrap servers)
ИмяТопикаСтрока(255)Имя топика для подписки
ИмяDLQТопикаСтрока(255)Топик для "мёртвых" сообщений
КоличествоПопытокОтправкиЧислоМакс. попыток при ошибке
ПаузаМеждуПопыткамиОбработкиЧислоИнтервал повтора (сек.)

Привязка к входящему потоку

Подписчик добавляется в табличную часть ПодпискиНаВходящиеСообщения справочника инт_ПотокиДанных:

┌────────────────────────────────────────────────────────────────────┐
│  Поток данных: "Заказы из интернет-магазина"                       │
├────────────────────────────────────────────────────────────────────┤
│  Направление: Входящий                                             │
│  Обработчик: ОбработатьЗаказИзМагазина()                          │
│  Валидация: Включена (схема: orders-v1)                           │
│  Асинхронная обработка: Да                                         │
├────────────────────────────────────────────────────────────────────┤
│  Подписки на входящие сообщения:                                   │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │ Подписка                        │ Тип                        │  │
│  ├──────────────────────────────────┼────────────────────────────│  │
│  │ Kafka: orders-production        │ инт_ПодписчикиKafka        │  │
│  └──────────────────────────────────┴────────────────────────────┘  │
└────────────────────────────────────────────────────────────────────┘

Режимы обработки сообщений

Асинхронная обработка

При АсинхроннаяОбработка = Истина в потоке данных:

  1. Консьюмер читает сообщение из Kafka
  2. Десериализует JSON и выполняет валидацию (если включена)
  3. Записывает сообщение в регистр инт_ОчередьВходящихСообщений
  4. Фиксирует offset в Kafka
  5. Обработка выполняется отдельным регламентным заданием

Преимущества:

  • Высокая пропускная способность
  • Отказоустойчивость — сообщения не теряются при ошибках
  • Возможность повторной обработки

Синхронная обработка

При АсинхроннаяОбработка = Ложь:

  1. Консьюмер читает сообщение
  2. Выполняет валидацию
  3. Немедленно вызывает обработчик потока
  4. При ошибке — отправляет в DLQ (Dead Letter Queue)
  5. Фиксирует offset

Преимущества:

  • Минимальная задержка
  • Гарантия порядка обработки
  • Простота отладки

Consumer Group Strategy

Формирование идентификатора группы

Consumer Group ID генерируется автоматически по формуле:

{GUID_ИБ}_{Суффикс}

Где:

  • GUID_ИБ — уникальный идентификатор информационной базы
  • Суффикс — "consumer" или код подписчика

Пример: a1b2c3d4-e5f6-7890-abcd-ef1234567890_consumer

Одна группа на эндпоинт

Все подписки одного эндпоинта (брокера) обрабатываются одним консьюмером. Это обеспечивает:

  • Эффективное использование соединений
  • Корректный rebalancing партиций
  • Простое масштабирование
┌─────────────────────────────────────────────────────────────────┐
│  Эндпоинт: kafka-prod.example.com:9092                          │
├─────────────────────────────────────────────────────────────────┤
│  Consumer Group: a1b2c3d4_consumer                              │
│  ┌──────────────────────┐  ┌──────────────────────┐             │
│  │  Подписка: orders    │  │  Подписка: inventory │             │
│  │  Топик: orders.v1    │  │  Топик: inventory.v1 │             │
│  └──────────────────────┘  └──────────────────────┘             │
│                    │                    │                        │
│                    └────────┬───────────┘                        │
│                             ▼                                    │
│  ┌──────────────────────────────────────────────────┐           │
│  │  Одно ФЗ консьюмера для обоих топиков            │           │
│  └──────────────────────────────────────────────────┘           │
└─────────────────────────────────────────────────────────────────┘

Dead Letter Queue (DLQ)

Назначение

DLQ — специальный топик для сообщений, которые не удалось обработать. Это предотвращает блокировку очереди из-за "битых" сообщений.

Настройка

Укажите имя DLQ-топика в реквизите ИмяDLQТопика подписчика:

ИмяDLQТопика: orders.v1.dlq

Формат DLQ-сообщения

json
{
  "original_topic": "orders.v1",
  "original_partition": 0,
  "original_offset": 12345,
  "original_payload": "{...исходные данные...}",
  "error_message": "Ошибка валидации: поле 'amount' обязательно",
  "failed_at": "2026-02-02T14:30:00Z"
}

Логика отправки в DLQ

bsl
// При любой ошибке (десериализация, валидация, обработка)
Если НЕ РезультатОбработки.Успех Тогда
    ОтправитьВDLQПриОшибке(
        Подписка,
        ДанныеСообщения,
        ТопикСообщения,
        СмещениеСообщения,
        РазделСообщения,
        РезультатОбработки.ОписаниеОшибки);
КонецЕсли;

Важно

Если DLQ-топик не настроен, сообщение будет потеряно при ошибке обработки. Всегда настраивайте DLQ для критичных потоков!

Цикл чтения сообщений

Основной цикл

bsl
Пока РазрешеноСлушать Цикл
    
    // 1. Читаем сообщение (с таймаутом)
    РезультатЧтения = Компонента.ПрочитатьСообщение();
    
    Если НЕ РезультатЧтения Тогда
        // Проверяем флаг активности подписок
        РазрешеноСлушать = ПроверитьАктивностьПодписок();
        Продолжить;
    КонецЕсли;
    
    // 2. Получаем метаданные сообщения
    ДанныеСообщения = Компонента.ПолучитьДанныеСообщения();
    ТопикСообщения = Компонента.ПолучитьТопикСообщения();
    СмещениеСообщения = Компонента.ПолучитьСмещениеСообщения();
    РазделСообщения = Компонента.ПолучитьРазделСообщения();
    
    // 3. Находим поток по топику
    СтруктураПодписки = СоответствиеТопикПодписка.Получить(ТопикСообщения);
    
    // 4. Обрабатываем сообщение
    РезультатОбработки = ОбработатьВходящееСообщение(
        ДанныеСообщения,
        СтруктураПодписки.ПотокДанных,
        СтруктураПодписки.Подписка);
    
    // 5. При ошибке — отправляем в DLQ
    Если НЕ РезультатОбработки.Успех Тогда
        ОтправитьВDLQПриОшибке(...);
    КонецЕсли;
    
    // 6. Фиксируем offset (даже при ошибке!)
    НовоеСмещение = Число(СмещениеСообщения) + 1;
    Компонента.ЗафиксироватьСмещение(ТопикСообщения, НовоеСмещение, РазделСообщения);
    
КонецЦикла;

Обработка входящего сообщения

bsl
Функция ОбработатьВходящееСообщение(ДанныеСообщения, ПотокДанных, Подписка)
    
    // 1. Десериализация JSON
    Попытка
        МодельДанных = инт_КоннекторHTTP.JsonВОбъект(ДанныеСообщения);
    Исключение
        Возврат НовыйРезультат(Ложь, "Ошибка десериализации JSON");
    КонецПопытки;
    
    // 2. Валидация по схеме (если включена в потоке)
    Если ПотокДанных.Валидация Тогда
        Попытка
            Справочники.инт_ПотокиДанных.ВалидироватьСообщениеПоПотоку(МодельДанных, ПотокДанных);
        Исключение
            Возврат НовыйРезультат(Ложь, "Ошибка валидации: " + ОписаниеОшибки());
        КонецПопытки;
    КонецЕсли;
    
    // 3. Отправка на обработку
    // Для асинхронных — запись в очередь
    // Для синхронных — немедленная обработка
    инт_ОбработкаВходящихПотоков.ОбработкаВходящегоСообщенияПоПотоку(
        МодельДанных,
        ПотокДанных,
        Новый УникальныйИдентификатор);
    
    Возврат НовыйРезультат(Истина, "");
    
КонецФункции

Мониторинг консьюмеров

Регистр инт_АктивныеКонсьюмеры

Хранит информацию о запущенных консьюмерах:

ИзмерениеТипОписание
ПодпискаОпределяемыйТип.инт_ПодпискаСсылка на подписчик
ИдентификаторФоновогоЗаданияУникальныйИдентификаторID фонового задания
РесурсТипОписание
ВремяЗапускаДатаВремяКогда запущен консьюмер
ПоследняяАктивностьДатаВремяПоследний heartbeat
СтатусАктивностиСтрока"Активен", "Kafka"

Программное управление

bsl
// Остановка всех консьюмеров
инт_МенеджерВходящихПотоков.ОстановитьВсеКонсьюмеры();

// Остановка консьюмеров конкретной подписки
инт_МенеджерВходящихПотоков.ОстановитьКонсьюмерыПодписки(ПодписчикKafka);

// Получение списка активных консьюмеров
АктивныеКонсьюмеры = РегистрыСведений.инт_АктивныеКонсьюмеры.ПолучитьАктивныеКонсьюмеры();

Журнал регистрации

Все события консьюмеров логируются с событием ПодсистемаИнтеграции.КонсьюмерОчередей:

ПодсистемаИнтеграции.КонсьюмерОчередей       — основные события
ПодсистемаИнтеграции.КонсьюмерОчередей.DLQ   — отправка в DLQ

Настройки Kafka

Параметры консьюмера

Компонента simpleKafka1C использует библиотеку librdkafka. При создании консьюмера устанавливаются параметры:

ПараметрЗначениеОписание
group.id{GUID_ИБ}_consumerИдентификатор consumer group
auto.offset.resetearliestНачинать с начала при новой группе
enable.auto.commitfalseРучная фиксация offset

Глобальные константы

КонстантаТипОписание
инт_ИдентификаторКлиентаKafkaСтрокаclient.id для логов
инт_КаталогЛоговKafkaСтрокаПуть для логов librdkafka
инт_УровеньОтладкиKafkaСтрокаКатегории отладки (debug)
инт_ИнтервалСтатистикиKafkaЧислоИнтервал сбора статистики (мс)

Уровни отладки

Для диагностики проблем можно включить детальное логирование:

bsl
// Для продюсера
инт_РаботаСКафка.ПолучитьУровеньОтладкиПродюсера()
// "broker,topic,msg"

// Для консьюмера
инт_РаботаСКафка.ПолучитьУровеньОтладкиКонсьюмера()
// "consumer,cgrp,topic,fetch"

// Полная отладка (осторожно — много логов!)
инт_РаботаСКафка.ПолучитьУровеньОтладкиПолный()
// "generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,..."

Требования

Внешняя компонента

Для работы требуется внешняя компонента SimpleKafka1C (на базе librdkafka):

  • Общий макет: инт_simple_kafka
  • Тип: Native
  • Подключение: НеИзолированно

Права доступа

Консьюмеры работают в фоновых заданиях. Убедитесь, что:

  • Роль имеет права на регистры инт_ОчередьВходящихСообщений, инт_АктивныеКонсьюмеры
  • Разрешено подключение внешних компонент
  • Есть сетевой доступ к брокерам Kafka

Рекомендации

Всегда настраивайте DLQ

Без DLQ "битые" сообщения будут молча отбрасываться. Создайте отдельный топик .dlq для каждого основного топика.

Используйте асинхронную обработку

Для высоконагруженных потоков включайте АсинхроннаяОбработка = Истина. Это обеспечивает устойчивость к временным сбоям.

Мониторьте lag

Следите за отставанием consumer group через Kafka UI или метрики. Большой lag означает, что обработка не успевает за поступлением.

Не останавливайте регламентное задание

Если остановить инт_МенеджерВходящихПотоков, консьюмеры продолжат работать до завершения, но не перезапустятся при падении.

Проверяйте сетевую доступность

При недоступности брокеров консьюмер завершится с ошибкой. Менеджер перезапустит его через 30 секунд.

Примеры

Минимальная настройка

  1. Создайте эндпоинт с адресом брокера:

    Наименование: Kafka Production
    Адрес ресурса: kafka-broker.example.com:9092
  2. Создайте подписчика Kafka:

    Наименование: Заказы из магазина
    Эндпоинт: Kafka Production
    ИмяТопика: shop.orders.v1
    ИмяDLQТопика: shop.orders.v1.dlq
    Активен: Да
  3. Создайте входящий поток данных:

    Наименование: Заказы из интернет-магазина
    Направление: Входящий
    Обработчик: МойМодуль.ОбработатьЗаказ
  4. Добавьте подписчика в табличную часть ПодпискиНаВходящиеСообщения потока

  5. Убедитесь, что регламентное задание инт_МенеджерВходящихПотоков активно


Связанные разделы

Документация подсистемы интеграции