Skip to content

Очереди сообщений

Подсистема использует очереди сообщений для надёжной асинхронной доставки данных.

Принцип работы

┌─────────────────┐                    ┌─────────────────┐
│  Регистрация    │                    │  Обработка      │
│  сообщения      │───────────────────▶│  фоновым        │
│  (синхронно)    │   Очередь          │  заданием       │
└─────────────────┘                    └─────────────────┘

Преимущества:

  • Операция регистрации быстрая и не блокирует пользователя
  • Отказоустойчивость — сообщения не теряются при сбоях
  • Масштабируемость — несколько фоновых заданий обрабатывают очередь параллельно

Регистры очередей

инт_ОчередьИсходящихСообщений

Основная очередь для исходящих сообщений.

ИзмерениеТипОписание
ИдентификаторСообщенияУникальныйИдентификаторУникальный ID сообщения
РесурсТипОписание
ПотокДанныхСправочникСсылка.инт_ПотокиДанныхПоток обработки
ИсходныеДанныеСтрокаСериализованное представление исходных данных (ЗначениеВСтрокуВнутр)
СформированноеСообщениеХранилищеЗначенияГотовое сообщение

инт_ОчередьОтправкиИсходящихСообщений

Очередь для рассылки сообщений подписчикам (сообщение × подписчик).

ИзмерениеТипОписание
ИдентификаторСообщенияУникальныйИдентификаторID сообщения
ПодписчикСправочникСсылка.инт_ПодписчикиПолучатель

инт_ОчередьВходящихСообщений

Очередь входящих сообщений от внешних систем.

ИзмерениеТипОписание
ИдентификаторСообщенияУникальныйИдентификаторID сообщения
РесурсТипОписание
ПотокДанныхСправочникСсылка.инт_ПотокиДанныхПоток обработки
ТелоСообщенияХранилищеЗначенияJSON-данные сообщения

Регистрация сообщения

Базовый вызов

bsl
ИдентификаторСообщения = РегистрыСведений.инт_ОчередьИсходящихСообщений.ЗарегистрироватьСообщение(
    ИсходныеДанные,   // Ссылка на объект или ФиксированнаяСтруктура
    ПотокДанных,      // СправочникСсылка.инт_ПотокиДанных
    РегистрироватьДубль  // Булево, по умолчанию Ложь
);

Параметры

ПараметрТипОписание
ИсходныеДанныеСсылка/ФиксированнаяСтруктураДанные для формирования сообщения
ПотокДанныхСправочникСсылкаПоток для обработки
РегистрироватьДубльБулевоЛожь — дубли игнорируются

Возвращаемое значение

  • УникальныйИдентификатор — ID зарегистрированного сообщения
  • Неопределено — если регистрация не выполнена (поток неактивен, дубль)

Пример использования

bsl
// В модуле объекта документа
Процедура ОбработкаПроведения(Отказ, Режим)
    // ... проведение документа ...
    
    // Регистрация сообщения для отправки
    ПотокДанных = Справочники.инт_ПотокиДанных.ЭкспортЗаказов;
    РегистрыСведений.инт_ОчередьИсходящихСообщений.ЗарегистрироватьСообщение(
        Ссылка,
        ПотокДанных
    );
КонецПроцедуры

Защита от дублей

Подсистема автоматически рассчитывает хэш исходных данных и хранит его в регистре инт_ХешиОбъектов.

При РегистрироватьДубль = Ложь:

  • Если хэш не изменился — сообщение не регистрируется
  • Если хэш изменился — регистрируется новое сообщение
bsl
// Принудительная регистрация дубля
РегистрыСведений.инт_ОчередьИсходящихСообщений.ЗарегистрироватьСообщение(
    ДокументСсылка,
    ПотокДанных,
    Истина  // РегистрироватьДубль = Истина
);

Статусы сообщений

Исходящие сообщения

Перечисление инт_СтатусыИсходящихСообщений:

СтатусОписание
НовыйЗарегистрировано, ожидает формирования
ФормированиеСообщенияФормируется фоновым заданием
ОшибкаФормированияОшибка при выполнении обработчика
ГотовоКОтправкеСформировано, ожидает рассылки
ПомещеноВОчередьОтправкиДобавлено в очередь для подписчиков
ОтправленоДоставлено всем подписчикам
ОшибкаОтправкиОшибка при отправке

Статусы рассылки (по подписчикам)

Перечисление инт_СтатусыРассылкиИсходящихСообщений:

СтатусОписание
НовыйОжидает отправки
ВПроцессеОтправкиОтправляется
ОтправленоУспешно доставлено
ОшибкаОтправкиОшибка, будет повтор

Регистры статусов

инт_ТекущийСтатусИсходящихСообщений

Хранит текущий статус каждого сообщения:

bsl
// Получение текущего статуса
Статус = РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений.ПолучитьСтатус(
    ИдентификаторСообщения
);

инт_ИсторияСтатусовИсходящихСообщений

Хранит полную историю изменения статусов:

ИзмерениеОписание
ИдентификаторСообщенияID сообщения
ДатаИзмененияМетка времени
РесурсОписание
СтатусНовый статус
КомментарийТекст ошибки или примечание

Удаление сообщений

bsl
// Удаление сообщения и всей связанной информации
РегистрыСведений.инт_ОчередьИсходящихСообщений.УдалитьСообщениеИзОчередиПоИдентификатору(
    ИдентификаторСообщения
);

Удаляются записи из:

  • инт_ОчередьИсходящихСообщений
  • инт_ТекущийСтатусИсходящихСообщений
  • инт_ИсторияСтатусовИсходящихСообщений
  • инт_ОчередьОтправкиИсходящихСообщений

Регламентная очистка

Регламентное задание инт_ОчисткаУстаревшихСообщений удаляет успешно отправленные сообщения старше заданного срока.

Получение данных сообщения

bsl
// Получение данных сообщения по идентификатору
ДанныеСообщения = РегистрыСведений.инт_ОчередьИсходящихСообщений.ПолучитьДанныеОчередиПоИдентификатору(
    ИдентификаторСообщения,
    "ИсходныеДанные, ПотокДанных, СформированноеСообщение"  // Список полей через запятую
);

// Результат: Структура
// - ИсходныеДанные: Ссылка
// - ПотокДанных: СправочникСсылка
// - СформированноеСообщение: Соответствие (десериализованное)

Рекомендации

Момент регистрации

Регистрируйте сообщения в обработчиках проведения документов или подписках на события. Это гарантирует, что сообщение будет создано только для успешно записанных данных.

Мониторинг очередей

Настройте алерты на метрику pde_queue_length для раннего обнаружения проблем с обработкой очередей.

Большие объёмы

При массовой регистрации сообщений (загрузка данных) используйте пакетную обработку и контролируйте размер очереди.

Следующие шаги

Документация подсистемы интеграции