Многопоточность
Подсистема поддерживает параллельную обработку очередей через фоновые задания.
Архитектура
┌─────────────────────────────────────────────────────────────────────────────┐
│ РЕГЛАМЕНТНОЕ ЗАДАНИЕ │
│ (запускается по расписанию) │
└────────────────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ МЕНЕДЖЕР ПОТОКОВ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 1. Сборка мусора (завершённые фоновые задания) │ │
│ │ 2. Расчёт количества потоков к созданию │ │
│ │ 3. Захват пачки сообщений │ │
│ │ 4. Запуск фоновых заданий │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────┬────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Фоновое │ │ Фоновое │ │ Фоновое │
│ задание #1 │ │ задание #2 │ │ задание #N │
│ │ │ │ │ │
│ Пачка │ │ Пачка │ │ Пачка │
│ сообщений │ │ сообщений │ │ сообщений │
└──────────────┘ └──────────────┘ └──────────────┘Два уровня параллелизма
1. Формирование сообщений
Регламентное задание: инт_МенеджерПотоковФормированияИсходящихСообщений
// Модуль инт_ФормированиеИсходящихСообщений
Процедура ЗапускМенеджераПотоков() Экспорт
// 1. Удаление записей о завершённых фоновых заданиях
РегистрыСведений.инт_МенеджерПотоковФормированияСообщений.СобратьМусор();
// 2. Создание новых потоков
КоличествоПотоковКСозданию = РегистрыСведений
.инт_МенеджерПотоковФормированияСообщений
.КоличествоПотоковКСозданию();
Для Итератор = 1 По КоличествоПотоковКСозданию Цикл
СоздатьПотокФормированияСообщений();
КонецЦикла;
КонецПроцедуры2. Отправка сообщений
Регламентное задание: инт_МенеджерПотоковРассылкиСообщений
// Модуль инт_ОтправкаИсходящихСообщений
Процедура инт_ЗапуститьМенеджерПотоковРассылкиСообщений() Экспорт
РегистрыСведений.инт_МенеджерПотоковОтправкиСообщений.СобратьМусор();
// Опционально: разделение по типам подписчиков
РазделятьПотокиПоТипам = инт_ОтправкаИсходящихСообщенийПовтИсп
.РазделятьПотокиОтправкиПоТипамПодписчиков();
Если РазделятьПотокиПоТипам Тогда
Для Каждого ТипПодписчика Из инт_ПодписчикиОбщий.СписокТиповПодписчиков() Цикл
СоздатьПотокиОтправкиСообщенийПоТипу(ТипПодписчика);
КонецЦикла;
Иначе
СоздатьПотокиОтправкиСообщенийПоТипу(Неопределено);
КонецЕсли;
КонецПроцедурыРегистры менеджеров потоков
инт_МенеджерПотоковФормированияСообщений
| Измерение | Описание |
|---|---|
ИдентификаторФоновогоЗадания | UUID фонового задания |
| Ресурс | Описание |
|---|---|
СписокСообщений | Массив ID сообщений в обработке |
ДатаСоздания | Время запуска |
инт_МенеджерПотоковОтправкиСообщений
Аналогичная структура для потоков отправки.
Настройки параллелизма
Размер пачки
Количество сообщений, обрабатываемых одним фоновым заданием:
// Модуль инт_ФормированиеИсходящихСообщенийПовтИсп
Функция РазмерПачкиФормированияСообщений() Экспорт
Возврат 100; // По умолчанию
КонецФункции// Модуль инт_ОтправкаИсходящихСообщенийПовтИсп
Функция РазмерПачкиОтправкиСообщений() Экспорт
Возврат 50; // По умолчанию
КонецФункцииМаксимум потоков
Количество одновременных фоновых заданий ограничивается через настройки:
// Модуль менеджера регистра
Функция КоличествоПотоковКСозданию() Экспорт
МаксимумПотоков = ПолучитьМаксимальноеКоличествоПотоков();
ТекущихПотоков = ПолучитьКоличествоАктивныхПотоков();
Возврат Макс(0, МаксимумПотоков - ТекущихПотоков);
КонецФункцииРазделение по типам подписчиков
Для изоляции медленных каналов можно разделить потоки отправки по типам подписчиков:
// Модуль инт_ОтправкаИсходящихСообщенийПовтИсп
Функция РазделятьПотокиОтправкиПоТипамПодписчиков() Экспорт
Возврат Истина; // HTTP, RabbitMQ, Kafka обрабатываются отдельно
КонецФункции┌─────────────────────────────────────────────────────────────────┐
│ Очередь отправки │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Потоки HTTP │ │ Потоки MQ │ │ Потоки Kafka │
│ (быстро) │ │ (умеренно) │ │ (медленно) │
└───────────────┘ └───────────────┘ └───────────────┘Захват сообщений
Механизм исключает обработку одного сообщения несколькими потоками:
// 1. Получить пачку сообщений со статусом "Новый"
МассивСообщений = РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений
.ПолучитьИдентификаторыСообщенийКФормированию(КоличествоСообщенийВПачке);
// 2. Сразу сменить статус на "В обработке" (в транзакции)
НачатьТранзакцию();
РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений.ЗаписатьСтатусСообщений(
МассивСообщений,
Перечисления.инт_СтатусыИсходящихСообщений.ФормированиеСообщения
);
// 3. Запустить фоновое задание
Поток = ФоновыеЗадания.Выполнить(...);
// 4. Зарегистрировать поток в менеджере
РегистрыСведений.инт_МенеджерПотоковФормированияСообщений
.ЗарегистрироватьПоток(Поток.УникальныйИдентификатор, МассивСообщений);
ЗафиксироватьТранзакцию();Сборка мусора
После завершения фонового задания запись из менеджера удаляется:
Процедура СобратьМусор() Экспорт
// Получить ID завершённых фоновых заданий
// Удалить соответствующие записи из регистра менеджера
КонецПроцедурыОбработка ошибок
При ошибке в фоновом задании:
- Сообщение получает статус
ОшибкаФормированияилиОшибкаОтправки - Текст ошибки записывается в историю статусов
- Сообщение остаётся в очереди для повторной обработки
Попытка
// Обработка сообщения
Исключение
РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений.ЗаписатьСтатусСообщения(
ИдентификаторСообщения,
Перечисления.инт_СтатусыИсходящихСообщений.ОшибкаФормирования,
ПодробноеПредставлениеОшибки(ИнформацияОбОшибке())
);
КонецПопытки;Рекомендации по настройке
Размер пачки
| Сценарий | Рекомендация |
|---|---|
| Быстрая обработка (простые JSON) | 100-500 сообщений |
| Сложная логика обработчика | 10-50 сообщений |
| Медленные внешние системы | 10-20 сообщений |
Количество потоков
| Сценарий | Рекомендация |
|---|---|
| Формирование | 2-4 потока |
| Отправка HTTP | 4-8 потоков |
| Отправка в MQ | 2-4 потока |
Ресурсы сервера
Каждое фоновое задание потребляет память и соединение с базой. Не превышайте разумных лимитов.
Мониторинг
Отслеживайте метрику pde_queue_length. Рост очереди означает, что потоки не справляются с нагрузкой.
Регламентные задания
| Задание | Расписание | Назначение |
|---|---|---|
инт_МенеджерПотоковФормированияИсходящихСообщений | Каждые 30 сек | Формирование сообщений |
инт_МенеджерПотоковРассылкиСообщений | Каждые 30 сек | Отправка сообщений |
инт_ОбработкаВходящейОчереди | Каждые 60 сек | Обработка входящих |
инт_ОчисткаУстаревшихСообщений | Раз в сутки | Удаление старых записей |
Следующие шаги
- Очереди сообщений — детали работы очередей
- Валидация — проверка сообщений
- Потоки данных — архитектура потоков