Skip to content

Commit 0633120

Browse files
author
Greg Rice
committed
Changed rabbitmq module to allow multiple vhosts
1 parent ae63781 commit 0633120

1 file changed

Lines changed: 91 additions & 42 deletions

File tree

rabbit/python_modules/rabbitmq.py

Lines changed: 91 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,23 @@
55
import urllib
66
import time
77
from string import Template
8+
import itertools
89

910
global url, descriptors, last_update, vhost, username, password, url_template, result, result_dict, keyToPath
1011
INTERVAL = 20
1112
descriptors = list()
1213
username, password = "guest", "guest"
1314
stats = {}
14-
last_update = {}
15+
keyToPath = {}
16+
last_update = None
17+
#last_update = {}
1518
compiled_results = {"nodes" : None, "queues" : None, "connections" : None}
1619
#Make initial stat test time dict
17-
for stat_type in ('queues', 'connections','exchanges', 'nodes'):
18-
last_update[stat_type] = None
19-
20-
keyToPath = {}
20+
#for stat_type in ('queues', 'connections','exchanges', 'nodes'):
21+
# last_update[stat_type] = None
2122

23+
### CONFIGURATION SECTION ###
24+
STATS = ['nodes', 'queues']
2225

2326
# QUEUE METRICS #
2427
keyToPath['rmq_messages_ready'] = "%s.messages_ready"
@@ -71,69 +74,99 @@ def dig_it_up(obj,path):
7174
print "Exception"
7275
return False
7376

77+
def refreshStats(stats = ('nodes', 'queues'), vhosts = ['/']):
78+
79+
global url_template
80+
global last_update, url, compiled_results
81+
82+
now = time.time()
83+
84+
if not last_update:
85+
diff = INTERVAL
86+
else:
87+
diff = now - last_update
88+
89+
if diff >= INTERVAL or not last_update:
90+
print "Fetching Results after %d seconds" % INTERVAL
91+
last_update = now
92+
for stat in stats:
93+
for vhost in vhosts:
94+
result_dict = {}
95+
urlstring = url_template.safe_substitute(stats = stat, vhost = vhost)
96+
result = json.load(urllib.urlopen(urlstring)
97+
# Rearrange results so entry is held in a dict keyed by name - queue name, host name, etc.
98+
if group in ('queues', 'nodes', 'exchanges'):
99+
for entry in result:
100+
name = entry['name']
101+
result_dict[name] = entry
102+
compiled_results[(stat, vhost)] = result_dict
103+
104+
return compiled_results
105+
74106
def refreshGroup(group):
75107

76108

77109
global url_template
78-
urlstring = url_template.safe_substitute(stats = group)
110+
urlstring = url_template.safe_substitute(stats = group, vhost = vhost)
79111

80112
global last_update, url, compiled_results
81113

82114
now = time.time()
83-
if not last_update[group]:
115+
if not last_update[(group, vhost)]:
84116
diff = INTERVAL
85117
else:
86-
diff = now - last_update[group]
118+
diff = now - last_update[(group, vhost)]
87119

88-
if diff >= INTERVAL or not last_update[group]:
120+
if diff >= INTERVAL or not last_update[(group, vhost)]:
89121
result_dict = {}
90122
print "Fetching stats after %d seconds" % INTERVAL
91123
result = json.load(urllib.urlopen(urlstring))
92-
compiled_results[group] = result
93-
last_update[group] = now
124+
compiled_results[(group, vhost)] = result
125+
last_update[(group, vhost)] = now
94126
#Refresh dict by names. We'll probably move this elsewhere.
95127
if group in ('queues', 'nodes'):
96128
for entry in result:
97129
name_attribute = entry['name']
98130
result_dict[name_attribute] = entry
99-
compiled_results[group] = result_dict
131+
compiled_results[(group,vhost)] = result_dict
100132

101-
return compiled_results[group]
102-
103-
def getConnectionTotal(name):
104-
result = refreshGroup('connections')
105-
return result.length()
106-
107-
def getConnectionStats(name):
108-
pass
133+
return compiled_results[(group, vhost)]
109134

110135
def validatedResult(value):
111136
if not isInstance(value, bool):
112137
return float(value)
113138
else:
114139
return None
115140

116-
def list_queues():
141+
def list_queues(vhost):
117142
# Make a list of queues
118-
results = refreshGroup('queues')
143+
results = refreshGroup('queues', vhost = vhost)
119144
return results.keys()
120145

121-
def list_nodes():
122-
results = refreshGroup('nodes')
123-
return results.keys()
146+
def list_queues(vhost):
147+
global compiled_results
148+
queues = compiled_results[('queues', vhost)].keys()
149+
return queues
150+
151+
def list_nodes(vhost):
152+
global compiled_results
153+
nodes = compiled_results[('nodes', vhost)].keys()
154+
return nodes
124155

125156
def getQueueStat(name):
126157
#Split a name like "rmq_backing_queue_ack_egress_rate.access"
127158

128159
#handle queue names with . in them
129-
split_name = name.split(".")
160+
split_name, vhost = name.split("#")
161+
split_name = split_name.split(".")
130162
stat_name = split_name[0]
131163
queue_name = ".".join(split_name[1:])
132164

133-
result = refreshGroup('queues')
165+
# Run refreshStats to get the result object
166+
result = refreshStats(('queues', vhost))
134167

135168
value = dig_it_up(result, keyToPath[stat_name] % queue_name)
136-
print name, value
169+
print name, values
137170

138171
#Convert Booleans
139172
if value is True:
@@ -145,9 +178,11 @@ def getQueueStat(name):
145178

146179
def getNodeStat(name):
147180
#Split a name like "rmq_backing_queue_ack_egress_rate.access"
148-
stat_name, node_name = name.split(".")
149-
result = refreshGroup('nodes')
181+
stat_name = name.split(".")[0]
182+
node_name, vhost = name.split(".")[1].split("#")
183+
result = refreshStats(('nodes', vhost))
150184
value = dig_it_up(result, keyToPath[stat_name] % node_name)
185+
151186
print name,value
152187
#Convert Booleans
153188
if value is True:
@@ -156,24 +191,38 @@ def getNodeStat(name):
156191
value = 0
157192

158193
return float(value)
194+
195+
def product(*args, **kwds):
196+
# replacement for itertools.product
197+
# product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
198+
pools = map(tuple, args) * kwds.get('repeat', 1)
199+
result = [[]]
200+
for pool in pools:
201+
result = [x+[y] for x in result for y in pool]
202+
for prod in result:
203+
yield tuple(prod)
159204

160205
def metric_init(params):
161206
''' Create the metric definition object '''
162-
global descriptors, stats, vhost, username, password, urlstring, url_template, compiled_results
207+
global descriptors, stats, vhost, username, password, urlstring, url_template, compiled_results, STATS
163208
print 'received the following params:'
164209
#Set this globally so we can refresh stats
165210
if 'host' not in params:
166211
params['host'], params['vhost'],params['username'],params['password'] = "localhost", "/", "guest", "guest"
167-
vhost = params['vhost']
212+
213+
# Set the vhosts as a list split from params
214+
vhosts = params['vhost'].split(',')
168215
username, password = params['username'], params['password']
169216
host = params['host']
170217

171-
url = 'http://%s:%s@%s:55672/api/$stats' % (username, password, host)
218+
url = 'http://%s:%s@%s:55672/api/$stats/$vhost' % (username, password, host)
172219
url_template = Template(url)
173220
print params
174221

175-
refreshGroup("nodes")
176-
refreshGroup("queues")
222+
refreshStats(stats = STATS, vhosts = vhosts)
223+
224+
refreshGroup("nodes", vhost = vhost)
225+
refreshGroup("queues", vhost = vhost)
177226

178227
def create_desc(prop):
179228
d = {
@@ -194,9 +243,10 @@ def create_desc(prop):
194243

195244

196245
def buildQueueDescriptors():
197-
for queue in list_queues():
198-
for metric in QUEUE_METRICS:
199-
name = "%s.%s" % (metric, queue)
246+
for vhost, metric in product(vhosts, QUEUE_METRICS):
247+
queues = list_queues(vhost)
248+
for queue in queues:
249+
name = "%s.%s#%s" % (metric, queue, vhost)
200250
print name
201251
d1 = create_desc({'name': name.encode('ascii','ignore'),
202252
'call_back': getQueueStat,
@@ -210,10 +260,9 @@ def buildQueueDescriptors():
210260
descriptors.append(d1)
211261

212262
def buildNodeDescriptors():
213-
for node in list_nodes():
214-
#node = node.split('@')[0]
215-
for stat in NODE_METRICS:
216-
name = '%s.%s' % (stat, node)
263+
for vhost, metric in product(vhosts, NODE_METRICS)
264+
for node in list_nodes():
265+
name = '%s.%s#%s' % (stat, node, vhost)
217266
print name
218267
d2 = create_desc({'name': name.encode('ascii','ignore'),
219268
'call_back': getNodeStat,

0 commit comments

Comments
 (0)