|
27 | 27 | keyToPath = {} |
28 | 28 | last_update = None |
29 | 29 | #last_update = {} |
30 | | -compiled_results = {"nodes" : None, "queues" : None, "connections" : None} |
| 30 | +compiled_results = {"nodes" : None, "queues" : None, "connections" : None, "exchanges": None} |
31 | 31 | #Make initial stat test time dict |
32 | 32 | #for stat_type in ('queues', 'connections','exchanges', 'nodes'): |
33 | 33 | # last_update[stat_type] = None |
|
51 | 51 | 'rmq_backing_queue_ack_egress_rate', |
52 | 52 | 'rmq_backing_queue_ack_ingress_rate', |
53 | 53 | 'rmq_backing_queue_egress_rate', |
54 | | - 'rmq_backing_queue_ingress_rate' |
| 54 | + 'rmq_backing_queue_ingress_rate', |
| 55 | + 'rmq_exchange_publish_in_rate', |
| 56 | + 'rmq_exchange_publish_out_rate', |
55 | 57 | ] |
56 | 58 |
|
57 | 59 | QUEUE_METRICS = ['rmq_messages_ready', |
|
80 | 82 | keyToPath['rmq_running'] = "%s{0}running".format(JSON_PATH_SEPARATOR) #Boolean |
81 | 83 |
|
82 | 84 | NODE_METRICS = ['rmq_disk_free', 'rmq_mem_used', 'rmq_disk_free_alarm', 'rmq_running', 'rmq_proc_used', 'rmq_mem_proc_used', 'rmq_fd_used', 'rmq_mem_alarm', 'rmq_mem_code', 'rmq_mem_binary', 'rmq_sockets_used'] |
83 | | - |
84 | 85 |
|
| 86 | +# EXCHANGE METRICS # |
| 87 | + |
| 88 | +keyToPath['rmq_exchange_publish_in_rate'] = "%s{0}message_stats{0}publish_in_details{0}rate".format(JSON_PATH_SEPARATOR) |
| 89 | +keyToPath['rmq_exchange_publish_out_rate'] = "%s{0}message_stats{0}publish_out_details{0}rate".format(JSON_PATH_SEPARATOR) |
| 90 | + |
| 91 | +EXCHANGE_METRICS = ['rmq_exchange_publish_in_rate', 'rmq_exchange_publish_out_rate'] |
85 | 92 |
|
86 | 93 |
|
87 | 94 | def metric_cleanup(): |
@@ -146,21 +153,27 @@ def list_nodes(): |
146 | 153 | nodes = compiled_results[('nodes', '/')].keys() |
147 | 154 | return nodes |
148 | 155 |
|
| 156 | +def list_exchanges(vhost): |
| 157 | + global compiled_results |
| 158 | + exchanges = compiled_results[('exchanges', vhost)].keys() |
| 159 | + return exchanges |
| 160 | + |
| 161 | + |
149 | 162 | def getQueueStat(name): |
150 | 163 | refreshStats(stats = STATS, vhosts = vhosts) |
151 | 164 | #Split a name like "rmq_backing_queue_ack_egress_rate.access" |
152 | | - |
| 165 | + |
153 | 166 | #handle queue names with . in them |
154 | | - |
| 167 | + |
155 | 168 | log.debug(name) |
156 | 169 | stat_name, queue_name, vhost = name.split(METRIC_TOKEN_SEPARATOR) |
157 | | - |
| 170 | + |
158 | 171 | vhost = vhost.replace('-', '/') #decoding vhost from metric name |
159 | 172 | # Run refreshStats to get the result object |
160 | 173 | result = compiled_results[('queues', vhost)] |
161 | | - |
| 174 | + |
162 | 175 | value = dig_it_up(result, keyToPath[stat_name] % queue_name) |
163 | | - |
| 176 | + |
164 | 177 | if zero_rates_when_idle and stat_name in RATE_METRICS and 'idle_since' in result[queue_name].keys(): |
165 | 178 | value = 0 |
166 | 179 |
|
@@ -190,6 +203,33 @@ def getNodeStat(name): |
190 | 203 |
|
191 | 204 | return float(value) |
192 | 205 |
|
| 206 | +def getExchangeStat(name): |
| 207 | + refreshStats(stats = STATS, vhosts = vhosts) |
| 208 | + #Split a name like "rmq_backing_queue_ack_egress_rate.access" |
| 209 | + |
| 210 | + #handle queue names with . in them |
| 211 | + |
| 212 | + log.debug(name) |
| 213 | + stat_name, exchange_name, vhost = name.split(METRIC_TOKEN_SEPARATOR) |
| 214 | + |
| 215 | + vhost = vhost.replace('-', '/') #decoding vhost from metric name |
| 216 | + # Run refreshStats to get the result object |
| 217 | + result = compiled_results[('exchanges', vhost)] |
| 218 | + |
| 219 | + value = dig_it_up(result, keyToPath[stat_name] % exchange_name) |
| 220 | + |
| 221 | + if zero_rates_when_idle and stat_name in RATE_METRICS and 'idle_since' in result[exchange_name].keys(): |
| 222 | + value = 0 |
| 223 | + |
| 224 | + #Convert Booleans |
| 225 | + if value is True: |
| 226 | + value = 1 |
| 227 | + elif value is False: |
| 228 | + value = 0 |
| 229 | + |
| 230 | + return float(value) |
| 231 | + |
| 232 | + |
193 | 233 | def product(*args, **kwds): |
194 | 234 | # replacement for itertools.product |
195 | 235 | # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy |
@@ -298,10 +338,29 @@ def buildNodeDescriptors(): |
298 | 338 | log.debug(d2) |
299 | 339 | descriptors.append(d2) |
300 | 340 |
|
| 341 | + def buildExchangeDescriptors(): |
| 342 | + for vhost, metric in product(vhosts, EXCHANGE_METRICS): |
| 343 | + exchanges = list_exchanges(vhost) |
| 344 | + for exchange in exchanges: |
| 345 | + name = "{1}{0}{2}{0}{3}".format(METRIC_TOKEN_SEPARATOR, metric, exchange, vhost.replace('/', '-')) |
| 346 | + log.debug(name) |
| 347 | + d1 = create_desc({'name': name.encode('ascii','ignore'), |
| 348 | + 'call_back': getExchangeStat, |
| 349 | + 'value_type': 'float', |
| 350 | + 'units': 'N', |
| 351 | + 'slope': 'both', |
| 352 | + 'format': '%f', |
| 353 | + 'description': 'Exchange_Metric', |
| 354 | + 'groups' : 'rabbitmq,exchange'}) |
| 355 | + log.debug(d1) |
| 356 | + descriptors.append(d1) |
| 357 | + |
301 | 358 | if 'queues' in STATS: |
302 | 359 | buildQueueDescriptors() |
303 | 360 | if 'nodes' in STATS: |
304 | 361 | buildNodeDescriptors() |
| 362 | + if 'exchanges' in STATS: |
| 363 | + buildExchangeDescriptors() |
305 | 364 | # buildTestNodeStat() |
306 | 365 |
|
307 | 366 | return descriptors |
@@ -343,7 +402,7 @@ def parse_args(argv): |
343 | 402 | action='store', dest='admin_port', default=15672, |
344 | 403 | help='') |
345 | 404 | parser.add_option('--stats', |
346 | | - action='store', dest='stats', default='nodes,queues', |
| 405 | + action='store', dest='stats', default='nodes,queues,exchanges', |
347 | 406 | help='csv of which stats to emit, choies: nodes, queues') |
348 | 407 | parser.add_option('--vhosts', |
349 | 408 | action='store', dest='vhosts', default='/', |
@@ -381,6 +440,10 @@ def main(argv): |
381 | 440 | if opts.list_only is True: |
382 | 441 | print 'nodes:' |
383 | 442 | pprint.pprint(list_nodes()) |
| 443 | + print 'exchanges:' |
| 444 | + for vhost in parameters['vhosts']: |
| 445 | + print 'vhost: %s' % vhost |
| 446 | + pprint.pprint(list_exchanges(vhost)) |
384 | 447 | print 'queues:' |
385 | 448 | for vhost in parameters['vhosts']: |
386 | 449 | print 'vhost: %s' % vhost |
|
0 commit comments