Skip to content

Многопоточность

Подсистема поддерживает параллельную обработку очередей через фоновые задания.

Архитектура

┌─────────────────────────────────────────────────────────────────────────────┐
│                         РЕГЛАМЕНТНОЕ ЗАДАНИЕ                                 │
│                         (запускается по расписанию)                          │
└────────────────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────────────┐
│                         МЕНЕДЖЕР ПОТОКОВ                                     │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  1. Сборка мусора (завершённые фоновые задания)                     │   │
│  │  2. Расчёт количества потоков к созданию                            │   │
│  │  3. Захват пачки сообщений                                          │   │
│  │  4. Запуск фоновых заданий                                          │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
└────────────────────────────────────┬────────────────────────────────────────┘

                    ┌────────────────┼────────────────┐
                    ▼                ▼                ▼
           ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
           │ Фоновое      │  │ Фоновое      │  │ Фоновое      │
           │ задание #1   │  │ задание #2   │  │ задание #N   │
           │              │  │              │  │              │
           │ Пачка        │  │ Пачка        │  │ Пачка        │
           │ сообщений    │  │ сообщений    │  │ сообщений    │
           └──────────────┘  └──────────────┘  └──────────────┘

Два уровня параллелизма

1. Формирование сообщений

Регламентное задание: инт_МенеджерПотоковФормированияИсходящихСообщений

bsl
// Модуль инт_ФормированиеИсходящихСообщений
Процедура ЗапускМенеджераПотоков() Экспорт
    // 1. Удаление записей о завершённых фоновых заданиях
    РегистрыСведений.инт_МенеджерПотоковФормированияСообщений.СобратьМусор();
    
    // 2. Создание новых потоков
    КоличествоПотоковКСозданию = РегистрыСведений
        .инт_МенеджерПотоковФормированияСообщений
        .КоличествоПотоковКСозданию();
        
    Для Итератор = 1 По КоличествоПотоковКСозданию Цикл
        СоздатьПотокФормированияСообщений();
    КонецЦикла;
КонецПроцедуры

2. Отправка сообщений

Регламентное задание: инт_МенеджерПотоковРассылкиСообщений

bsl
// Модуль инт_ОтправкаИсходящихСообщений
Процедура инт_ЗапуститьМенеджерПотоковРассылкиСообщений() Экспорт
    РегистрыСведений.инт_МенеджерПотоковОтправкиСообщений.СобратьМусор();
    
    // Опционально: разделение по типам подписчиков
    РазделятьПотокиПоТипам = инт_ОтправкаИсходящихСообщенийПовтИсп
        .РазделятьПотокиОтправкиПоТипамПодписчиков();
    
    Если РазделятьПотокиПоТипам Тогда
        Для Каждого ТипПодписчика Из инт_ПодписчикиОбщий.СписокТиповПодписчиков() Цикл
            СоздатьПотокиОтправкиСообщенийПоТипу(ТипПодписчика);
        КонецЦикла;
    Иначе
        СоздатьПотокиОтправкиСообщенийПоТипу(Неопределено);
    КонецЕсли;
КонецПроцедуры

Регистры менеджеров потоков

инт_МенеджерПотоковФормированияСообщений

ИзмерениеОписание
ИдентификаторФоновогоЗаданияUUID фонового задания
РесурсОписание
СписокСообщенийМассив ID сообщений в обработке
ДатаСозданияВремя запуска

инт_МенеджерПотоковОтправкиСообщений

Аналогичная структура для потоков отправки.

Настройки параллелизма

Размер пачки

Количество сообщений, обрабатываемых одним фоновым заданием:

bsl
// Модуль инт_ФормированиеИсходящихСообщенийПовтИсп
Функция РазмерПачкиФормированияСообщений() Экспорт
    Возврат 100;  // По умолчанию
КонецФункции
bsl
// Модуль инт_ОтправкаИсходящихСообщенийПовтИсп
Функция РазмерПачкиОтправкиСообщений() Экспорт
    Возврат 50;  // По умолчанию
КонецФункции

Максимум потоков

Количество одновременных фоновых заданий ограничивается через настройки:

bsl
// Модуль менеджера регистра
Функция КоличествоПотоковКСозданию() Экспорт
    МаксимумПотоков = ПолучитьМаксимальноеКоличествоПотоков();
    ТекущихПотоков = ПолучитьКоличествоАктивныхПотоков();
    
    Возврат Макс(0, МаксимумПотоков - ТекущихПотоков);
КонецФункции

Разделение по типам подписчиков

Для изоляции медленных каналов можно разделить потоки отправки по типам подписчиков:

bsl
// Модуль инт_ОтправкаИсходящихСообщенийПовтИсп
Функция РазделятьПотокиОтправкиПоТипамПодписчиков() Экспорт
    Возврат Истина;  // HTTP, RabbitMQ, Kafka обрабатываются отдельно
КонецФункции
┌─────────────────────────────────────────────────────────────────┐
│                      Очередь отправки                           │
└───────────────────────────┬─────────────────────────────────────┘

        ┌───────────────────┼───────────────────┐
        ▼                   ▼                   ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│ Потоки HTTP   │   │ Потоки MQ     │   │ Потоки Kafka  │
│ (быстро)      │   │ (умеренно)    │   │ (медленно)    │
└───────────────┘   └───────────────┘   └───────────────┘

Захват сообщений

Механизм исключает обработку одного сообщения несколькими потоками:

bsl
// 1. Получить пачку сообщений со статусом "Новый"
МассивСообщений = РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений
    .ПолучитьИдентификаторыСообщенийКФормированию(КоличествоСообщенийВПачке);

// 2. Сразу сменить статус на "В обработке" (в транзакции)
НачатьТранзакцию();
    РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений.ЗаписатьСтатусСообщений(
        МассивСообщений, 
        Перечисления.инт_СтатусыИсходящихСообщений.ФормированиеСообщения
    );
    
    // 3. Запустить фоновое задание
    Поток = ФоновыеЗадания.Выполнить(...);
    
    // 4. Зарегистрировать поток в менеджере
    РегистрыСведений.инт_МенеджерПотоковФормированияСообщений
        .ЗарегистрироватьПоток(Поток.УникальныйИдентификатор, МассивСообщений);
ЗафиксироватьТранзакцию();

Сборка мусора

После завершения фонового задания запись из менеджера удаляется:

bsl
Процедура СобратьМусор() Экспорт
    // Получить ID завершённых фоновых заданий
    // Удалить соответствующие записи из регистра менеджера
КонецПроцедуры

Обработка ошибок

При ошибке в фоновом задании:

  1. Сообщение получает статус ОшибкаФормирования или ОшибкаОтправки
  2. Текст ошибки записывается в историю статусов
  3. Сообщение остаётся в очереди для повторной обработки
bsl
Попытка
    // Обработка сообщения
Исключение
    РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений.ЗаписатьСтатусСообщения(
        ИдентификаторСообщения, 
        Перечисления.инт_СтатусыИсходящихСообщений.ОшибкаФормирования,
        ПодробноеПредставлениеОшибки(ИнформацияОбОшибке())
    );
КонецПопытки;

Рекомендации по настройке

Размер пачки

СценарийРекомендация
Быстрая обработка (простые JSON)100-500 сообщений
Сложная логика обработчика10-50 сообщений
Медленные внешние системы10-20 сообщений

Количество потоков

СценарийРекомендация
Формирование2-4 потока
Отправка HTTP4-8 потоков
Отправка в MQ2-4 потока

Ресурсы сервера

Каждое фоновое задание потребляет память и соединение с базой. Не превышайте разумных лимитов.

Мониторинг

Отслеживайте метрику pde_queue_length. Рост очереди означает, что потоки не справляются с нагрузкой.

Регламентные задания

ЗаданиеРасписаниеНазначение
инт_МенеджерПотоковФормированияИсходящихСообщенийКаждые 30 секФормирование сообщений
инт_МенеджерПотоковРассылкиСообщенийКаждые 30 секОтправка сообщений
инт_ОбработкаВходящейОчередиКаждые 60 секОбработка входящих
инт_ОчисткаУстаревшихСообщенийРаз в суткиУдаление старых записей

Следующие шаги

Документация подсистемы интеграции