Kafka-консьюмеры (Входящие потоки)
Подсистема поддерживает получение сообщений из топиков Apache Kafka через механизм консьюмеров. Это позволяет организовать входящие потоки данных из внешних систем.
Принцип работы
Архитектура
┌──────────────────────────────────────────────────────────────────────────────┐
│ Внешняя система (Producer) │
└───────────────────────────────┬──────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Apache Kafka │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Topic A │ │ Topic B │ │ Topic C │ │
│ │ (orders) │ │ (inventory) │ │ (customers) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└───────────────────────────────┬──────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ 1C:Enterprise (Consumer) │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ РЗ: инт_МенеджерВходящихПотоков (каждые 30 сек) │ │
│ │ ┌───────────────────────────────────────────────────────────────────┐ │ │
│ │ │ Проверяет активные подписки → Запускает ФЗ консьюмеров │ │ │
│ │ └───────────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ Асинхронная обработка │ │ Синхронная обработка │ │
│ │ (запись в очередь) │ │ (немедленно + DLQ) │ │
│ └─────────────────────────┘ └─────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘Цикл жизни консьюмера
- Регламентное задание
инт_МенеджерВходящихПотоковзапускается каждые 30 секунд - Получает список активных входящих потоков с подписками Kafka
- Группирует подписки по эндпоинтам (брокерам)
- Для каждой группы проверяет наличие активного фонового задания
- Если ФЗ нет или завершено — запускает новый консьюмер
- Регистрирует ФЗ в регистре
инт_АктивныеКонсьюмеры
// Упрощённый алгоритм менеджера
Процедура ЗапуститьКонсьюмерыАсинхронныхПодписок()
МассивПодписок = ПолучитьАктивныеПодписки();
ПодпискиПоЭндпоинтам = СгруппироватьПодпискиПоЭндпоинтам(МассивПодписок);
Для Каждого КлючЗначение Из ПодпискиПоЭндпоинтам Цикл
// Проверяем наличие активного ФЗ
Если НЕ ЕстьАктивноеЗадание(КлючЗначение.Значение) Тогда
ЗапуститьКонсьюмер(КлючЗначение.Значение);
КонецЕсли;
КонецЦикла;
КонецПроцедурыНастройка подписчика Kafka
Справочник инт_ПодписчикиKafka
Для настройки консьюмера создайте элемент справочника инт_ПодписчикиKafka:
| Реквизит | Тип | Описание |
|---|---|---|
Наименование | Строка | Понятное имя подписчика |
Активен | Булево | Включён ли консьюмер |
Эндпоинт | СправочникСсылка.инт_Эндпоинты | Брокеры Kafka (bootstrap servers) |
ИмяТопика | Строка(255) | Имя топика для подписки |
ИмяDLQТопика | Строка(255) | Топик для "мёртвых" сообщений |
КоличествоПопытокОтправки | Число | Макс. попыток при ошибке |
ПаузаМеждуПопыткамиОбработки | Число | Интервал повтора (сек.) |
Привязка к входящему потоку
Подписчик добавляется в табличную часть ПодпискиНаВходящиеСообщения справочника инт_ПотокиДанных:
┌────────────────────────────────────────────────────────────────────┐
│ Поток данных: "Заказы из интернет-магазина" │
├────────────────────────────────────────────────────────────────────┤
│ Направление: Входящий │
│ Обработчик: ОбработатьЗаказИзМагазина() │
│ Валидация: Включена (схема: orders-v1) │
│ Асинхронная обработка: Да │
├────────────────────────────────────────────────────────────────────┤
│ Подписки на входящие сообщения: │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Подписка │ Тип │ │
│ ├──────────────────────────────────┼────────────────────────────│ │
│ │ Kafka: orders-production │ инт_ПодписчикиKafka │ │
│ └──────────────────────────────────┴────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘Режимы обработки сообщений
Асинхронная обработка
При АсинхроннаяОбработка = Истина в потоке данных:
- Консьюмер читает сообщение из Kafka
- Десериализует JSON и выполняет валидацию (если включена)
- Записывает сообщение в регистр
инт_ОчередьВходящихСообщений - Фиксирует offset в Kafka
- Обработка выполняется отдельным регламентным заданием
Преимущества:
- Высокая пропускная способность
- Отказоустойчивость — сообщения не теряются при ошибках
- Возможность повторной обработки
Синхронная обработка
При АсинхроннаяОбработка = Ложь:
- Консьюмер читает сообщение
- Выполняет валидацию
- Немедленно вызывает обработчик потока
- При ошибке — отправляет в DLQ (Dead Letter Queue)
- Фиксирует offset
Преимущества:
- Минимальная задержка
- Гарантия порядка обработки
- Простота отладки
Consumer Group Strategy
Формирование идентификатора группы
Consumer Group ID генерируется автоматически по формуле:
{GUID_ИБ}_{Суффикс}Где:
GUID_ИБ— уникальный идентификатор информационной базыСуффикс— "consumer" или код подписчика
Пример: a1b2c3d4-e5f6-7890-abcd-ef1234567890_consumer
Одна группа на эндпоинт
Все подписки одного эндпоинта (брокера) обрабатываются одним консьюмером. Это обеспечивает:
- Эффективное использование соединений
- Корректный rebalancing партиций
- Простое масштабирование
┌─────────────────────────────────────────────────────────────────┐
│ Эндпоинт: kafka-prod.example.com:9092 │
├─────────────────────────────────────────────────────────────────┤
│ Consumer Group: a1b2c3d4_consumer │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ Подписка: orders │ │ Подписка: inventory │ │
│ │ Топик: orders.v1 │ │ Топик: inventory.v1 │ │
│ └──────────────────────┘ └──────────────────────┘ │
│ │ │ │
│ └────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Одно ФЗ консьюмера для обоих топиков │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Dead Letter Queue (DLQ)
Назначение
DLQ — специальный топик для сообщений, которые не удалось обработать. Это предотвращает блокировку очереди из-за "битых" сообщений.
Настройка
Укажите имя DLQ-топика в реквизите ИмяDLQТопика подписчика:
ИмяDLQТопика: orders.v1.dlqФормат DLQ-сообщения
{
"original_topic": "orders.v1",
"original_partition": 0,
"original_offset": 12345,
"original_payload": "{...исходные данные...}",
"error_message": "Ошибка валидации: поле 'amount' обязательно",
"failed_at": "2026-02-02T14:30:00Z"
}Логика отправки в DLQ
// При любой ошибке (десериализация, валидация, обработка)
Если НЕ РезультатОбработки.Успех Тогда
ОтправитьВDLQПриОшибке(
Подписка,
ДанныеСообщения,
ТопикСообщения,
СмещениеСообщения,
РазделСообщения,
РезультатОбработки.ОписаниеОшибки);
КонецЕсли;Важно
Если DLQ-топик не настроен, сообщение будет потеряно при ошибке обработки. Всегда настраивайте DLQ для критичных потоков!
Цикл чтения сообщений
Основной цикл
Пока РазрешеноСлушать Цикл
// 1. Читаем сообщение (с таймаутом)
РезультатЧтения = Компонента.ПрочитатьСообщение();
Если НЕ РезультатЧтения Тогда
// Проверяем флаг активности подписок
РазрешеноСлушать = ПроверитьАктивностьПодписок();
Продолжить;
КонецЕсли;
// 2. Получаем метаданные сообщения
ДанныеСообщения = Компонента.ПолучитьДанныеСообщения();
ТопикСообщения = Компонента.ПолучитьТопикСообщения();
СмещениеСообщения = Компонента.ПолучитьСмещениеСообщения();
РазделСообщения = Компонента.ПолучитьРазделСообщения();
// 3. Находим поток по топику
СтруктураПодписки = СоответствиеТопикПодписка.Получить(ТопикСообщения);
// 4. Обрабатываем сообщение
РезультатОбработки = ОбработатьВходящееСообщение(
ДанныеСообщения,
СтруктураПодписки.ПотокДанных,
СтруктураПодписки.Подписка);
// 5. При ошибке — отправляем в DLQ
Если НЕ РезультатОбработки.Успех Тогда
ОтправитьВDLQПриОшибке(...);
КонецЕсли;
// 6. Фиксируем offset (даже при ошибке!)
НовоеСмещение = Число(СмещениеСообщения) + 1;
Компонента.ЗафиксироватьСмещение(ТопикСообщения, НовоеСмещение, РазделСообщения);
КонецЦикла;Обработка входящего сообщения
Функция ОбработатьВходящееСообщение(ДанныеСообщения, ПотокДанных, Подписка)
// 1. Десериализация JSON
Попытка
МодельДанных = инт_КоннекторHTTP.JsonВОбъект(ДанныеСообщения);
Исключение
Возврат НовыйРезультат(Ложь, "Ошибка десериализации JSON");
КонецПопытки;
// 2. Валидация по схеме (если включена в потоке)
Если ПотокДанных.Валидация Тогда
Попытка
Справочники.инт_ПотокиДанных.ВалидироватьСообщениеПоПотоку(МодельДанных, ПотокДанных);
Исключение
Возврат НовыйРезультат(Ложь, "Ошибка валидации: " + ОписаниеОшибки());
КонецПопытки;
КонецЕсли;
// 3. Отправка на обработку
// Для асинхронных — запись в очередь
// Для синхронных — немедленная обработка
инт_ОбработкаВходящихПотоков.ОбработкаВходящегоСообщенияПоПотоку(
МодельДанных,
ПотокДанных,
Новый УникальныйИдентификатор);
Возврат НовыйРезультат(Истина, "");
КонецФункцииМониторинг консьюмеров
Регистр инт_АктивныеКонсьюмеры
Хранит информацию о запущенных консьюмерах:
| Измерение | Тип | Описание |
|---|---|---|
Подписка | ОпределяемыйТип.инт_Подписка | Ссылка на подписчик |
ИдентификаторФоновогоЗадания | УникальныйИдентификатор | ID фонового задания |
| Ресурс | Тип | Описание |
|---|---|---|
ВремяЗапуска | ДатаВремя | Когда запущен консьюмер |
ПоследняяАктивность | ДатаВремя | Последний heartbeat |
СтатусАктивности | Строка | "Активен", "Kafka" |
Программное управление
// Остановка всех консьюмеров
инт_МенеджерВходящихПотоков.ОстановитьВсеКонсьюмеры();
// Остановка консьюмеров конкретной подписки
инт_МенеджерВходящихПотоков.ОстановитьКонсьюмерыПодписки(ПодписчикKafka);
// Получение списка активных консьюмеров
АктивныеКонсьюмеры = РегистрыСведений.инт_АктивныеКонсьюмеры.ПолучитьАктивныеКонсьюмеры();Журнал регистрации
Все события консьюмеров логируются с событием ПодсистемаИнтеграции.КонсьюмерОчередей:
ПодсистемаИнтеграции.КонсьюмерОчередей — основные события
ПодсистемаИнтеграции.КонсьюмерОчередей.DLQ — отправка в DLQНастройки Kafka
Параметры консьюмера
Компонента simpleKafka1C использует библиотеку librdkafka. При создании консьюмера устанавливаются параметры:
| Параметр | Значение | Описание |
|---|---|---|
group.id | {GUID_ИБ}_consumer | Идентификатор consumer group |
auto.offset.reset | earliest | Начинать с начала при новой группе |
enable.auto.commit | false | Ручная фиксация offset |
Глобальные константы
| Константа | Тип | Описание |
|---|---|---|
инт_ИдентификаторКлиентаKafka | Строка | client.id для логов |
инт_КаталогЛоговKafka | Строка | Путь для логов librdkafka |
инт_УровеньОтладкиKafka | Строка | Категории отладки (debug) |
инт_ИнтервалСтатистикиKafka | Число | Интервал сбора статистики (мс) |
Уровни отладки
Для диагностики проблем можно включить детальное логирование:
// Для продюсера
инт_РаботаСКафка.ПолучитьУровеньОтладкиПродюсера()
// "broker,topic,msg"
// Для консьюмера
инт_РаботаСКафка.ПолучитьУровеньОтладкиКонсьюмера()
// "consumer,cgrp,topic,fetch"
// Полная отладка (осторожно — много логов!)
инт_РаботаСКафка.ПолучитьУровеньОтладкиПолный()
// "generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,..."Требования
Внешняя компонента
Для работы требуется внешняя компонента SimpleKafka1C (на базе librdkafka):
- Общий макет:
инт_simple_kafka - Тип: Native
- Подключение: НеИзолированно
Права доступа
Консьюмеры работают в фоновых заданиях. Убедитесь, что:
- Роль имеет права на регистры
инт_ОчередьВходящихСообщений,инт_АктивныеКонсьюмеры - Разрешено подключение внешних компонент
- Есть сетевой доступ к брокерам Kafka
Рекомендации
Всегда настраивайте DLQ
Без DLQ "битые" сообщения будут молча отбрасываться. Создайте отдельный топик .dlq для каждого основного топика.
Используйте асинхронную обработку
Для высоконагруженных потоков включайте АсинхроннаяОбработка = Истина. Это обеспечивает устойчивость к временным сбоям.
Мониторьте lag
Следите за отставанием consumer group через Kafka UI или метрики. Большой lag означает, что обработка не успевает за поступлением.
Не останавливайте регламентное задание
Если остановить инт_МенеджерВходящихПотоков, консьюмеры продолжат работать до завершения, но не перезапустятся при падении.
Проверяйте сетевую доступность
При недоступности брокеров консьюмер завершится с ошибкой. Менеджер перезапустит его через 30 секунд.
Примеры
Минимальная настройка
Создайте эндпоинт с адресом брокера:
Наименование: Kafka Production Адрес ресурса: kafka-broker.example.com:9092Создайте подписчика Kafka:
Наименование: Заказы из магазина Эндпоинт: Kafka Production ИмяТопика: shop.orders.v1 ИмяDLQТопика: shop.orders.v1.dlq Активен: ДаСоздайте входящий поток данных:
Наименование: Заказы из интернет-магазина Направление: Входящий Обработчик: МойМодуль.ОбработатьЗаказДобавьте подписчика в табличную часть
ПодпискиНаВходящиеСообщенияпотокаУбедитесь, что регламентное задание
инт_МенеджерВходящихПотоковактивно
Связанные разделы
- Подписчики — все типы подписчиков
- Валидация — проверка входящих сообщений
- Очереди сообщений — хранение и обработка
- Мониторинг — метрики Prometheus