Пакет stompx предоставляет высокоуровневую обёртку над STOMP-протоколом, реализуя удобные инструменты для создания потребителей и издателей сообщений, с поддержкой middleware, логирования, повторных попыток и управления группой потребителей и издателей.
Конфигурация stompx-клиента.
Fields:
Массив потребителей.
Массив издателей.
Methods:
Создаёт конфигурацию клиента с указанными опциями.
Функция, применяющая опции к Config.
Functions:
Добавляет клиенты потребителей в конфигурацию клиента.
Добавляет клиенты издателей в конфигурацию клиента.
Конфигурация потребителя сообщений.
Fields:
Адрес брокера (обязательное).
Имя очереди (обязательное).
Количество обработчиков (по умолчанию 1).
Количество предзагруженных сообщений.
Имя пользователя.
Пароль.
Дополнительные заголовки подключения.
Конфигурация издателя сообщений.
Fields:
Адрес брокера (обязательное).
Имя очереди (обязательное).
Имя пользователя.
Пароль.
Дополнительные заголовки подключения.
Клиент, включающий в себя группу потребителей и издателей, способный обновлять соединения и перезапускаться при изменении конфигурации.
Methods:
Создать новый клиент с логгером.
Обновить конфигурацию, синхронно инициализировать клиент с гарантией готовности всех компонентов:
- Блокировка и ожидание первой успешно установленной сессии.
- Запуск всех потребителей, инициализация всех издателей.
- Вернет первую возникшую ошибку во время открытия первой сессии или
nil.
Обновить конфигурацию и перезапустить подключения:
- Останавливает старые соединения,
- Инициализирует новые потребители/издатели,
- Запускает обработку сообщений потребителями.
Завершает все активные подключения.
Наблюдатель за событиями жизненного цикла потребителя (ошибки, запуск, остановка).
Methods:
Создаёт наблюдателя за событиями с указанным логером.
Логирует ошибку потребителя.
Логируют процесс остановки.
Логирует начало потребления сообщений.
DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config
Создаёт конфигурацию потребителя с поддержкой логирования, middleware и подключения по заданным параметрам.
Создаёт издателя сообщений с middleware и настройками подключения.
Создаёт обработчик результата с логированием и восстановлением сервиса при панике.
Добавляет заголовок persistent=true ко всем исходящим сообщениям.
Логирует отправку сообщений.
Добавляет Request-Id в заголовки сообщений.
Повторяет публикацию при ошибках с использованием заданного Retrier.
Логирует входящие сообщения.
Извлекает или генерирует Request-Id и сохраняет его в контексте запроса.
package main
import (
"context"
"log"
"github.com/txix-open/isp-kit/app"
"github.com/txix-open/isp-kit/shutdown"
"github.com/txix-open/isp-kit/log"
"github.com/txix-open/isp-kit/requestid"
"github.com/txix-open/isp-kit/stompx"
"github.com/txix-open/isp-kit/stompx/consumer"
"github.com/txix-open/isp-kit/stompx/publisher"
)
func main() {
logger := log.New()
// Создаём обработчик сообщений
handler := stompx.NewResultHandler(logger, stompx.HandlerFunc(func(ctx context.Context, msg []byte) error {
logger.Info("message received", log.StringType("body", string(msg)))
return nil
}))
// Конфигурация потребителя
consumerCfg := stompx.ConsumerConfig{
Address: "tcp://localhost:61613",
Queue: "/queue/example",
Username: "admin",
Password: "admin",
}
publisherCfg := stompx.PublisherConfig{
Address: "tcp://localhost:61613",
Queue: "/queue/example",
Username: "admin",
Password: "admin",
}
consumerCli := consumer.NewWatcher(stompx.DefaultConsumer(consumerCfg, handler, logger))
publisherCli := stompx.DefaultPublisher(publisherCfg)
// Создаём конфигурацию
config := stompx.NewConfig(
stompx.WithConsumers(consumerCli),
stompx.WithPublishers(publisherCli))
// Создаём клиент
сli := stompx.NewClient(logger)
// Обработка завершения приложения
shutdown.On(func() {
logger.Info("shutting down...")
_ = сli.Close()
logger.Info("shutdown completed")
})
// Запускаем клиент
сli.UpgradeAndServe(context.Background(), config)
...
}