Очереди сообщений
Подсистема использует очереди сообщений для надёжной асинхронной доставки данных.
Принцип работы
┌─────────────────┐ ┌─────────────────┐
│ Регистрация │ │ Обработка │
│ сообщения │───────────────────▶│ фоновым │
│ (синхронно) │ Очередь │ заданием │
└─────────────────┘ └─────────────────┘2
3
4
5
Преимущества:
- Операция регистрации быстрая и не блокирует пользователя
- Отказоустойчивость — сообщения не теряются при сбоях
- Масштабируемость — несколько фоновых заданий обрабатывают очередь параллельно
Регистры очередей
инт_ОчередьИсходящихСообщений
Основная очередь для исходящих сообщений.
| Измерение | Тип | Описание |
|---|---|---|
ИдентификаторСообщения | УникальныйИдентификатор | Уникальный ID сообщения |
| Ресурс | Тип | Описание |
|---|---|---|
ПотокДанных | СправочникСсылка.инт_ПотокиДанных | Поток обработки |
ИсходныеДанные | Строка | Сериализованное представление исходных данных (ЗначениеВСтрокуВнутр) |
СформированноеСообщение | ХранилищеЗначения | Готовое сообщение |
инт_ОчередьОтправкиИсходящихСообщений
Очередь для рассылки сообщений подписчикам (сообщение × подписчик).
| Измерение | Тип | Описание |
|---|---|---|
ИдентификаторСообщения | УникальныйИдентификатор | ID сообщения |
Подписчик | СправочникСсылка.инт_Подписчики | Получатель |
инт_ОчередьВходящихСообщений
Очередь входящих сообщений от внешних систем.
| Измерение | Тип | Описание |
|---|---|---|
ИдентификаторСообщения | УникальныйИдентификатор | ID сообщения |
| Ресурс | Тип | Описание |
|---|---|---|
ПотокДанных | СправочникСсылка.инт_ПотокиДанных | Поток обработки |
ТелоСообщения | ХранилищеЗначения | JSON-данные сообщения |
Регистрация сообщения
Базовый вызов
ИдентификаторСообщения = РегистрыСведений.инт_ОчередьИсходящихСообщений.ЗарегистрироватьСообщение(
ИсходныеДанные, // Ссылка на объект или ФиксированнаяСтруктура
ПотокДанных, // СправочникСсылка.инт_ПотокиДанных
РегистрироватьДубль // Булево, по умолчанию Ложь
);2
3
4
5
Параметры
| Параметр | Тип | Описание |
|---|---|---|
ИсходныеДанные | Ссылка/ФиксированнаяСтруктура | Данные для формирования сообщения |
ПотокДанных | СправочникСсылка | Поток для обработки |
РегистрироватьДубль | Булево | Ложь — дубли игнорируются |
Возвращаемое значение
УникальныйИдентификатор— ID зарегистрированного сообщенияНеопределено— если регистрация не выполнена (поток неактивен, дубль)
Пример использования
// В модуле объекта документа
Процедура ОбработкаПроведения(Отказ, Режим)
// ... проведение документа ...
// Регистрация сообщения для отправки
ПотокДанных = Справочники.инт_ПотокиДанных.ЭкспортЗаказов;
РегистрыСведений.инт_ОчередьИсходящихСообщений.ЗарегистрироватьСообщение(
Ссылка,
ПотокДанных
);
КонецПроцедуры2
3
4
5
6
7
8
9
10
11
Защита от дублей
Подсистема автоматически рассчитывает хэш исходных данных и хранит его в регистре инт_ХешиОбъектов.
При РегистрироватьДубль = Ложь:
- Если хэш не изменился — сообщение не регистрируется
- Если хэш изменился — регистрируется новое сообщение
// Принудительная регистрация дубля
РегистрыСведений.инт_ОчередьИсходящихСообщений.ЗарегистрироватьСообщение(
ДокументСсылка,
ПотокДанных,
Истина // РегистрироватьДубль = Истина
);2
3
4
5
6
Статусы сообщений
Исходящие сообщения
Перечисление инт_СтатусыИсходящихСообщений:
| Статус | Описание |
|---|---|
Новый | Зарегистрировано, ожидает формирования |
ФормированиеСообщения | Формируется фоновым заданием |
ОшибкаФормирования | Ошибка при выполнении обработчика |
ГотовоКОтправке | Сформировано, ожидает рассылки |
ПомещеноВОчередьОтправки | Добавлено в очередь для подписчиков |
Отправлено | Доставлено всем подписчикам |
ОшибкаОтправки | Ошибка при отправке |
Статусы рассылки (по подписчикам)
Перечисление инт_СтатусыРассылкиИсходящихСообщений:
| Статус | Описание |
|---|---|
Новый | Ожидает отправки |
ВПроцессеОтправки | Отправляется |
Отправлено | Успешно доставлено |
ОшибкаОтправки | Ошибка, будет повтор |
Регистры статусов
инт_ТекущийСтатусИсходящихСообщений
Хранит текущий статус каждого сообщения:
// Получение текущего статуса
Статус = РегистрыСведений.инт_ТекущийСтатусИсходящихСообщений.ПолучитьСтатус(
ИдентификаторСообщения
);2
3
4
инт_ИсторияСтатусовИсходящихСообщений
Хранит полную историю изменения статусов:
| Измерение | Описание |
|---|---|
ИдентификаторСообщения | ID сообщения |
ДатаИзменения | Метка времени |
| Ресурс | Описание |
|---|---|
Статус | Новый статус |
Комментарий | Текст ошибки или примечание |
Удаление сообщений
// Удаление сообщения и всей связанной информации
РегистрыСведений.инт_ОчередьИсходящихСообщений.УдалитьСообщениеИзОчередиПоИдентификатору(
ИдентификаторСообщения
);2
3
4
Удаляются записи из:
инт_ОчередьИсходящихСообщенийинт_ТекущийСтатусИсходящихСообщенийинт_ИсторияСтатусовИсходящихСообщенийинт_ОчередьОтправкиИсходящихСообщений
Регламентная очистка
Регламентное задание инт_ОчисткаУстаревшихСообщений удаляет успешно отправленные сообщения старше заданного срока.
Получение данных сообщения
// Получение данных сообщения по идентификатору
ДанныеСообщения = РегистрыСведений.инт_ОчередьИсходящихСообщений.ПолучитьДанныеОчередиПоИдентификатору(
ИдентификаторСообщения,
"ИсходныеДанные, ПотокДанных, СформированноеСообщение" // Список полей через запятую
);
// Результат: Структура
// - ИсходныеДанные: Ссылка
// - ПотокДанных: СправочникСсылка
// - СформированноеСообщение: Соответствие (десериализованное)2
3
4
5
6
7
8
9
10
Рекомендации
Момент регистрации
Регистрируйте сообщения в обработчиках проведения документов или подписках на события. Это гарантирует, что сообщение будет создано только для успешно записанных данных.
Мониторинг очередей
Настройте алерты на метрику pde_queue_length для раннего обнаружения проблем с обработкой очередей.
Большие объёмы
При массовой регистрации сообщений (загрузка данных) используйте пакетную обработку и контролируйте размер очереди.
Следующие шаги
- Многопоточность — параллельная обработка очередей
- Валидация — проверка сообщений
- Подписчики — типы подписчиков