|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | +""" |
| 4 | + ZeroMQ PUB Monitor for Ganglia |
| 5 | + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 6 | +
|
| 7 | + This is a gmond metric-gathering module which reports a cumulative |
| 8 | + count of messages published by ZeroMQ publishers. |
| 9 | +
|
| 10 | + To test, invoke with one or more pairs of (endpoint name, endpoint |
| 11 | + URI) pairs specifying ZMQ publishers to poll. For example: |
| 12 | +
|
| 13 | + $ python zpubmon.py system-events tcp://localhost:8006 |
| 14 | +
|
| 15 | + See README for more details. |
| 16 | +
|
| 17 | + :copyright: (c) 2012 by Ori Livneh <[email protected]> |
| 18 | + :license: GNU General Public Licence 2.0 or later |
| 19 | +
|
| 20 | +""" |
| 21 | +import errno |
| 22 | +import logging |
| 23 | +import sys |
| 24 | +import threading |
| 25 | +import time |
| 26 | + |
| 27 | +import zmq |
| 28 | + |
| 29 | + |
| 30 | +logging.basicConfig(format='[ZMQ] %(asctime)s %(message)s', level=logging.INFO) |
| 31 | + |
| 32 | + |
| 33 | +def zmq_pub_mon(endpoints, counter): |
| 34 | + """ |
| 35 | + Measure throughput of ZeroMQ publishers. |
| 36 | +
|
| 37 | + *endpoints* is a dict that maps human-readable endpoint names to |
| 38 | + endpoint URIs. The names are used as metric names in Ganglia and |
| 39 | + as the ZMQ_IDENTITY of the underlying socket. |
| 40 | +
|
| 41 | + """ |
| 42 | + ctx = zmq.Context.instance() |
| 43 | + poller = zmq.Poller() |
| 44 | + |
| 45 | + for name, uri in endpoints.iteritems(): |
| 46 | + logging.info('Registering %s (%s).', name, uri) |
| 47 | + sock = ctx.socket(zmq.SUB) |
| 48 | + sock.setsockopt(zmq.IDENTITY, name) |
| 49 | + sock.connect(uri) |
| 50 | + sock.setsockopt(zmq.SUBSCRIBE, '') |
| 51 | + poller.register(sock, zmq.POLLIN) |
| 52 | + |
| 53 | + while 1: |
| 54 | + try: |
| 55 | + for socket, _ in poller.poll(): |
| 56 | + socket.recv(zmq.NOBLOCK) |
| 57 | + name = socket.getsockopt(zmq.IDENTITY) |
| 58 | + counter[name] += 1 |
| 59 | + except zmq.ZMQError as e: |
| 60 | + # Calls interrupted by EINTR should be re-tried. |
| 61 | + if e.errno == errno.EINTR: |
| 62 | + continue |
| 63 | + raise |
| 64 | + |
| 65 | + |
| 66 | +def metric_init(params): |
| 67 | + """ |
| 68 | + Initialize metrics. |
| 69 | +
|
| 70 | + Gmond invokes this method with a dict of arguments specified in |
| 71 | + zpubmon.py. If *params* contains a `groups` key, its value is used |
| 72 | + as the group name in Ganglia (in lieu of the default 'ZeroMQ'). |
| 73 | + Other items are interpreted as (name: URI) pairs of ZeroMQ endpoints |
| 74 | + to monitor. |
| 75 | +
|
| 76 | + `metric_init` spawns a worker thread to monitor these endpoints and |
| 77 | + returns a list of metric descriptors. |
| 78 | +
|
| 79 | + """ |
| 80 | + groups = params.pop('groups', 'ZeroMQ') |
| 81 | + counter = {name: 0 for name in params} |
| 82 | + |
| 83 | + thread = threading.Thread(target=zmq_pub_mon, args=(params, counter)) |
| 84 | + thread.daemon = True |
| 85 | + thread.start() |
| 86 | + |
| 87 | + return [{ |
| 88 | + 'name': name, |
| 89 | + 'value_type': 'uint', |
| 90 | + 'format': '%d', |
| 91 | + 'units': 'events', |
| 92 | + 'slope': 'positive', |
| 93 | + 'time_max': 20, |
| 94 | + 'description': 'messages published', |
| 95 | + 'groups': groups, |
| 96 | + 'call_back': counter.get, |
| 97 | + } for name in params] |
| 98 | + |
| 99 | + |
| 100 | +def metric_cleanup(): |
| 101 | + """ |
| 102 | + Clean-up handler |
| 103 | +
|
| 104 | + Terminates any lingering threads. Gmond calls this function when |
| 105 | + it is shutting down. |
| 106 | +
|
| 107 | + """ |
| 108 | + logging.debug('Shutting down.') |
| 109 | + for thread in threading.enumerate(): |
| 110 | + if thread.isAlive(): |
| 111 | + thread._Thread__stop() # pylint: disable=W0212 |
| 112 | + |
| 113 | + |
| 114 | +def self_test(): |
| 115 | + """ |
| 116 | + Perform self-test. |
| 117 | +
|
| 118 | + Parses *argv* as a collection of (name, URI) pairs specifying ZeroMQ |
| 119 | + publishers to be monitored. Message counts are polled and outputted |
| 120 | + every five seconds. |
| 121 | +
|
| 122 | + """ |
| 123 | + params = dict(zip(sys.argv[1::2], sys.argv[2::2])) |
| 124 | + if not params: |
| 125 | + print 'Usage: %s NAME URI [NAME URI, ...]' % sys.argv[0] |
| 126 | + print 'Example: %s my-zmq-stream tcp://localhost:8006' % sys.argv[0] |
| 127 | + sys.exit(1) |
| 128 | + |
| 129 | + descriptors = metric_init(params) |
| 130 | + |
| 131 | + while 1: |
| 132 | + for descriptor in descriptors: |
| 133 | + name = descriptor['name'] |
| 134 | + call_back = descriptor['call_back'] |
| 135 | + logging.info('%s: %s', name, call_back(name)) |
| 136 | + time.sleep(5) |
| 137 | + |
| 138 | + |
| 139 | +if __name__ == '__main__': |
| 140 | + self_test() |
0 commit comments