This service watches the health of pipeline by monitoring one or more buffers in parallel. It will stop or restart designated pipeline services if the buffer queue length exceeds a threshold value or the time since the last output or input message exceeds a specified time.
Queue monitoring helps detect the slow processing of messages, and ingress and egress monitoring is helpful in detecting how pipeline services are processing messages. In other words, the service can detect if the pipeline is not processing messages at the expected rate or if the pipeline is not processing messages at all.
The watchdog parses metrics in the OpenMetrics text format exposed by the buffer service. When a metric has multiple labeled samples (e.g. different reason values), the maximum value across all samples is used by default. This can be customized with label filters. Transient errors such as missing metrics or HTTP failures are logged and the current polling cycle is skipped without crashing.
The watchdog service is configured using the following environment variables:
CONFIG_FILE_PATH- The path to the configuration file. Required.LOGLEVEL- The log level for the service. Default isINFO.
Configuration file is YAML file with the following structure:
watch:
- buffer: <str>
queue:
action: <restart|stop>
length: <int>
cooldown: <int>
polling_interval: <int>
container:
- labels: [<str>]
# other labels
label_filters: # optional
<metric_name>:
<label_key>: <label_value>
egress:
action: <restart|stop>
idle: <int>
cooldown: <int>
polling_interval: <int>
container:
- labels: [<str>]
label_filters: # optional
<metric_name>:
<label_key>: <label_value>
ingress:
action: <restart|stop>
idle: <int>
cooldown: <int>
polling_interval: <int>
container:
- labels: [<str>]
# other labels
label_filters: # optional
<metric_name>:
<label_key>: <label_value>
pyfunc: # optional
action: <restart|stop>
cooldown: <int>
polling_interval: <int>
container:
- labels: [<str>]
module: <str>
class_name: <str>
kwargs: # optional
<key>: <value>
label_filters: # optional
<metric_name>:
<label_key>: <label_value>
# other buffersWhere:
buffer- url of the buffer to watch.queue- configuration for the buffer queue. Optional.action- action to take when the queue length exceeds the length threshold. It can berestartorstop.length- threshold length for the queue.cooldown- interval in seconds to wait after applying the action.polling_interval- interval in seconds to check the queue length.container- list of labels to match for the action. Actions are performed on containers that match any of the label sets.labels- one or more labels to match on the same container, i.e. the container must have all labels.
label_filters- optional mapping of metric name to label key-value pairs used to select specific metric samples. See Label filters below.
ingressoregress- configuration for the input or output traffic of the buffer. Optional.action- action to take when the time since the last input or output message exceeds the idle threshold. It can berestartorstop.idle- threshold time in seconds since the last input or output message.cooldown- interval in seconds to wait after applying the action.polling_interval- interval in seconds between buffer traffic checks. Optional. Default equals toidle.container- list of labels to match for the action. Actions are performed on containers that match any of the label sets.labels- one or more labels to match on the same container, i.e. the container must have all labels.
label_filters- optional mapping of metric name to label key-value pairs used to select specific metric samples. See Label filters below.
pyfunc- configuration for a custom Python trigger. Optional.action- action to take when the trigger returnsTrue. It can berestartorstop.cooldown- interval in seconds to wait after applying the action.polling_interval- interval in seconds between trigger calls.container- list of labels to match for the action. Actions are performed on containers that match any of the label sets.labels- one or more labels to match on the same container, i.e. the container must have all labels.
module- Python module path to import (e.g.watchdog.triggers.discrepancy).class_name- class name within the module. The class must be callable (implement__call__). It is instantiated once at startup withkwargsand called on each polling cycle with the parsed metrics dict.kwargs- optional keyword arguments passed to the class constructor.label_filters- optional mapping of metric name to label key-value pairs used to select specific metric samples. See Label filters below.
Note: For each buffer, at least one of the queue, ingress, egress, or pyfunc sections must be present.
Buffer metrics may expose multiple samples for the same metric name, distinguished by labels (e.g. last_sent_message{reason="send_success"} and last_sent_message{reason="ack_success"}). By default, the watchdog aggregates all samples for a metric using max(). This works well in most cases, but when you need to watch a specific label variant you can use label_filters.
label_filters is a mapping where each key is a metric name and the value is a dictionary of label key-value pairs. Only samples whose labels match all specified pairs are considered. Metrics not mentioned in label_filters continue to use max() aggregation.
Example:
egress:
action: restart
cooldown: 60s
idle: 100s
container:
- labels: egress-client-label=egress-client-value
label_filters:
last_sent_message:
reason: send_successIn this example, only the last_sent_message sample with reason="send_success" is used for the idle check; the ack_success sample is ignored.
You can find an example configuration file in the samples folder.
The pyfunc watch type allows you to define arbitrary restart triggers as Python classes. The watchdog imports the class at startup, instantiates it with kwargs, and calls it on each polling cycle with the parsed buffer metrics. Metrics are fetched and parsed automatically by the watchdog — the trigger only needs to evaluate them. If the call returns True, the configured action is executed on the matched containers.
The trigger class contract:
from typing import Dict
class MyTrigger:
def __init__(self, **kwargs):
# kwargs come from the config's `kwargs` section.
...
def __call__(self, metrics: Dict[str, float]) -> bool:
# metrics are fetched automatically from the buffer.
# Return True to trigger the action.
...A built-in example trigger is provided in watchdog.triggers.discrepancy. It detects stuck modules by checking whether egress is idle while ingress is still active — frames are going in but not coming out.
watch:
- buffer: ${oc.env:BUFFER_URL}
pyfunc:
action: restart
cooldown: 600s
polling_interval: 10s
container:
- labels: [com.savant.module=detector]
module: watchdog.triggers.discrepancy
class_name: DiscrepancyCheck
kwargs:
egress_idle: 60
ingress_idle: 30The DiscrepancyCheck trigger accepts (via kwargs):
egress_idle- seconds of egress idle time before considering it stalled.ingress_idle- seconds within which ingress must have been active for the trigger to fire. If ingress is also idle, the problem is upstream (no input), not a stuck module.
The configuration file supports variable interpolation. You can use a path to another node or environment variable in the configuration file by wrapping it in ${}. For example:
${oc.env:BUFFER_URL}- will be replaced with the value of theBUFFER_URLenvironment variable.${.idle}- will be replaced with the value of theidlekey in the same section.
For more information, refer to the OmegaConf documentation.
You can find the watchdog service image on:
- for x86: GitHub Packages
- for l4t: GitHub Packages
Configuration of a docker service might be as follows:
pipeline-watchdog:
image: ghcr.io/insight-platform/savant-watchdog-l4t:0.5.0
restart: unless-stopped
network_mode: host
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./config.yml:/app/config.yml
environment:
- LOGLEVEL=INFO
- CONFIG_FILE_PATH=/app/config.ymlThe sample demonstrates how to start the watchdog service with an example pipeline to watch the buffer and restart the SDK client based on configuration and buffer state. The client randomly pauses message processing, triggering the watchdog to restart it when the egress idle threshold is exceeded.
This sample is designed to run on x86 architecture only.
docker compose -f samples/pipeline_watchdog/docker-compose.x86.yml up --build -dAfter starting the pipeline, you can check the logs of the client container:
docker logs -f pipeline_watchdog-client-1When the client stops processing messages for more than egress.idle seconds (see config) you will see the following logs in the client container, and the container itself will be restarted:
Traceback (most recent call last):
File "/opt/savant/src/client.py", line 52, in <module>
main()
File "/opt/savant/src/client.py", line 37, in main
time.sleep(sleep_duration)
KeyboardInterrupt
docker compose -f samples/pipeline_watchdog/docker-compose.x86.yml downRun the tests from the watchdog service directory:
cd services/watchdog
pytestDependencies: pytest, pytest-asyncio, aiohttp, aiodocker, prometheus_client, omegaconf.