Skip to content

Commit ab6308d

Browse files
committed
Merge pull request ganglia#99 from atdt/master
Add module for ZeroMQ PUB streams.
2 parents dcd01ec + a595f6d commit ab6308d

3 files changed

Lines changed: 204 additions & 0 deletions

File tree

zeromq_pub/README.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
ZeroMQ PUB Monitor for Ganglia
2+
==============================
3+
4+
This is a gmond metric-gathering module which reports a cumulative count
5+
of messages published by ZeroMQ publishers.
6+
7+
Endpoints are specified as configuration parameters in zpubmon.pyconf.
8+
Each configuration key name is used as an endpoint name and its
9+
corresponding value as the endpoint's URI. The configuration param
10+
`groups` is special-cased: if present, its value specifies the group
11+
name for the metrics generated by the module. Otherwise the default
12+
group name ('ZeroMQ') is used.
13+
14+
To test, invoke with one or more pairs of (endpoint name, endpoint URI)
15+
pairs specifying ZMQ publishers to poll. For example::
16+
17+
$ python zpubmon.py system-events tcp://localhost:8006
18+
19+
Message counts will be logged to the console every five seconds.
20+
21+
For more information about configuring Python modules for gmond, see the
22+
`official documentation <http://sourceforge.net/apps/trac/ganglia/wiki
23+
/ganglia_gmond_python_modules>`_.
24+
25+
Copyright (c) 2012 by Ori Livneh <[email protected]>
26+
27+
Licensed under the GNU General Public Licence, version 2.0 or later.

zeromq_pub/conf.d/zpubmon.pyconf

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Sample configuration for zpubmon Ganglia module
2+
3+
modules {
4+
module {
5+
name = "zpubmon"
6+
language = "python"
7+
param groups {
8+
value = "ChangeMe"
9+
}
10+
param server-generated-raw {
11+
value = "tcp://127.0.0.1:8421"
12+
}
13+
param client-generated-raw {
14+
value = "tcp://127.0.0.1:8422"
15+
}
16+
param client-generated-valid {
17+
value = "tcp://127.0.0.1:8484"
18+
}
19+
}
20+
}
21+
22+
collection_group {
23+
collect_every = 10
24+
time_threshold = 60
25+
metric {
26+
name = "server-generated-raw"
27+
title = "Raw server-generated events"
28+
}
29+
metric {
30+
name = "client-generated-raw"
31+
title = "Raw client-generated events"
32+
}
33+
metric {
34+
name = "client-generated-valid"
35+
title = "Valid client-generated events"
36+
}
37+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
"""
4+
ZeroMQ PUB Monitor for Ganglia
5+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
6+
7+
This is a gmond metric-gathering module which reports a cumulative
8+
count of messages published by ZeroMQ publishers.
9+
10+
To test, invoke with one or more pairs of (endpoint name, endpoint
11+
URI) pairs specifying ZMQ publishers to poll. For example:
12+
13+
$ python zpubmon.py system-events tcp://localhost:8006
14+
15+
See README for more details.
16+
17+
:copyright: (c) 2012 by Ori Livneh <[email protected]>
18+
:license: GNU General Public Licence 2.0 or later
19+
20+
"""
21+
import errno
22+
import logging
23+
import sys
24+
import threading
25+
import time
26+
27+
import zmq
28+
29+
30+
logging.basicConfig(format='[ZMQ] %(asctime)s %(message)s', level=logging.INFO)
31+
32+
33+
def zmq_pub_mon(endpoints, counter):
34+
"""
35+
Measure throughput of ZeroMQ publishers.
36+
37+
*endpoints* is a dict that maps human-readable endpoint names to
38+
endpoint URIs. The names are used as metric names in Ganglia and
39+
as the ZMQ_IDENTITY of the underlying socket.
40+
41+
"""
42+
ctx = zmq.Context.instance()
43+
poller = zmq.Poller()
44+
45+
for name, uri in endpoints.iteritems():
46+
logging.info('Registering %s (%s).', name, uri)
47+
sock = ctx.socket(zmq.SUB)
48+
sock.setsockopt(zmq.IDENTITY, name)
49+
sock.connect(uri)
50+
sock.setsockopt(zmq.SUBSCRIBE, '')
51+
poller.register(sock, zmq.POLLIN)
52+
53+
while 1:
54+
try:
55+
for socket, _ in poller.poll():
56+
socket.recv(zmq.NOBLOCK)
57+
name = socket.getsockopt(zmq.IDENTITY)
58+
counter[name] += 1
59+
except zmq.ZMQError as e:
60+
# Calls interrupted by EINTR should be re-tried.
61+
if e.errno == errno.EINTR:
62+
continue
63+
raise
64+
65+
66+
def metric_init(params):
67+
"""
68+
Initialize metrics.
69+
70+
Gmond invokes this method with a dict of arguments specified in
71+
zpubmon.py. If *params* contains a `groups` key, its value is used
72+
as the group name in Ganglia (in lieu of the default 'ZeroMQ').
73+
Other items are interpreted as (name: URI) pairs of ZeroMQ endpoints
74+
to monitor.
75+
76+
`metric_init` spawns a worker thread to monitor these endpoints and
77+
returns a list of metric descriptors.
78+
79+
"""
80+
groups = params.pop('groups', 'ZeroMQ')
81+
counter = {name: 0 for name in params}
82+
83+
thread = threading.Thread(target=zmq_pub_mon, args=(params, counter))
84+
thread.daemon = True
85+
thread.start()
86+
87+
return [{
88+
'name': name,
89+
'value_type': 'uint',
90+
'format': '%d',
91+
'units': 'events',
92+
'slope': 'positive',
93+
'time_max': 20,
94+
'description': 'messages published',
95+
'groups': groups,
96+
'call_back': counter.get,
97+
} for name in params]
98+
99+
100+
def metric_cleanup():
101+
"""
102+
Clean-up handler
103+
104+
Terminates any lingering threads. Gmond calls this function when
105+
it is shutting down.
106+
107+
"""
108+
logging.debug('Shutting down.')
109+
for thread in threading.enumerate():
110+
if thread.isAlive():
111+
thread._Thread__stop() # pylint: disable=W0212
112+
113+
114+
def self_test():
115+
"""
116+
Perform self-test.
117+
118+
Parses *argv* as a collection of (name, URI) pairs specifying ZeroMQ
119+
publishers to be monitored. Message counts are polled and outputted
120+
every five seconds.
121+
122+
"""
123+
params = dict(zip(sys.argv[1::2], sys.argv[2::2]))
124+
if not params:
125+
print 'Usage: %s NAME URI [NAME URI, ...]' % sys.argv[0]
126+
print 'Example: %s my-zmq-stream tcp://localhost:8006' % sys.argv[0]
127+
sys.exit(1)
128+
129+
descriptors = metric_init(params)
130+
131+
while 1:
132+
for descriptor in descriptors:
133+
name = descriptor['name']
134+
call_back = descriptor['call_back']
135+
logging.info('%s: %s', name, call_back(name))
136+
time.sleep(5)
137+
138+
139+
if __name__ == '__main__':
140+
self_test()

0 commit comments

Comments
 (0)