Skip to content

Commit 153a7dc

Browse files
committed
Merge pull request ganglia#202 from tony-ten/master
Adding twemproxy module
2 parents 3198aaf + 5562e08 commit 153a7dc

3 files changed

Lines changed: 315 additions & 0 deletions

File tree

twemproxy/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Twemproxy Ganglia Metrics
2+
3+
## Gathers pool and servers statistics
4+
See [https://github.com/twitter/twemproxy#observability](https://github.com/twitter/twemproxy#observability)
5+
6+
By default, the module gathers stats of all active pools and its list of servers.
7+
Use the "exclude" param to skip gathering stats of a pool and its list of servers.
8+
9+
10+
## License
11+
[MIT License](http://opensource.org/licenses/mit-license.html)
12+
13+
## Contact
14+

twemproxy/conf.d/twemproxy.pyconf

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
modules {
3+
module {
4+
name = 'twemproxy'
5+
language = 'python'
6+
7+
param stats_addr {
8+
value = 'localhost'
9+
}
10+
param stats_port {
11+
value = '22222'
12+
}
13+
# Exclude pool stats (and their servers)
14+
# param exclude {
15+
# Use a comma to specify more than one pool name in the value
16+
# value = 'alpha'
17+
# }
18+
}
19+
}
20+
21+
collection_group {
22+
collect_every = 30
23+
time_threshold = 90
24+
25+
metric {
26+
name_match = "twemproxy_(.+)"
27+
}
28+
29+
}
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
2+
# The MIT License (MIT)
3+
4+
# Copyright (c) 2015 The Enthusiast Network
5+
6+
# Permission is hereby granted, free of charge, to any person obtaining a copy
7+
# of this software and associated documentation files (the "Software"), to deal
8+
# in the Software without restriction, including without limitation the rights
9+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
# copies of the Software, and to permit persons to whom the Software is
11+
# furnished to do so, subject to the following conditions:
12+
13+
# The above copyright notice and this permission notice shall be included in all
14+
# copies or substantial portions of the Software.
15+
16+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
# SOFTWARE.
23+
#
24+
# Author: Tony Baltazar <[email protected]>
25+
26+
27+
28+
import socket
29+
import sys
30+
31+
try:
32+
import json
33+
except ImportError:
34+
import simplejson as json
35+
36+
37+
38+
NAME_PREFIX = 'twemproxy_'
39+
40+
descriptors = list()
41+
Desc_Skel = {}
42+
43+
_Twemproxy_Connection = None
44+
45+
class Twemproxy():
46+
47+
def __init__(self, params, name_prefix):
48+
self.stats_addr = params["stats_addr"]
49+
self.stats_port = int(params["stats_port"])
50+
self.name_prefix = name_prefix
51+
52+
if "exclude" in params:
53+
self.exclude = [ exclude.strip() for exclude in params["exclude"].split(",") ]
54+
else:
55+
self.exclude = []
56+
57+
self.metrics = self.__twemproxy_metrics()
58+
59+
def __twemproxy_metrics(self):
60+
metric_hash = {}
61+
try:
62+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
63+
s.connect((self.stats_addr, self.stats_port))
64+
65+
file = s.makefile('r')
66+
data = file.readline()
67+
s.close()
68+
69+
response = json.loads(data)
70+
except:
71+
print("Unable to connect to %s:%i" % (self.stats_addr, self.stats_port))
72+
sys.exit(1)
73+
74+
for pool, pool_info in response.items():
75+
if pool in self.exclude:
76+
continue
77+
78+
if hasattr(pool_info, 'items'):
79+
metric_hash[str(pool)] = {}
80+
for server, server_info in pool_info.items():
81+
if hasattr(server_info, 'items'):
82+
#print("Server: ", server)
83+
metric_hash[str(server)] = {}
84+
for stat in server_info:
85+
#print("Key: ", stat, " Value: ", server_info[stat])
86+
metric_hash[server][stat] = server_info[stat]
87+
else:
88+
metric_hash[pool][server] = server_info
89+
90+
return metric_hash
91+
92+
def __refresh_metrics(self):
93+
self.metrics = self.__twemproxy_metrics()
94+
return
95+
96+
def get_pools(self):
97+
'''Returns pool names'''
98+
pools = [k for k in self.metrics.keys() if self.metrics[k].get("client_eof") != None]
99+
return pools
100+
101+
def get_nodes(self):
102+
'''Returns nodes (server) names'''
103+
nodes = [k for k in self.metrics.keys() if self.metrics[k].get("client_eof") == None]
104+
return nodes
105+
106+
# Main metric handler
107+
def get_value(self, name):
108+
'''Return callback value for the requested metric'''
109+
metric_name_value = name[len(self.name_prefix):].split('-')
110+
111+
metric_name = metric_name_value[0]
112+
metric_value = metric_name_value[1]
113+
114+
result = self.metrics[metric_name][metric_value]
115+
116+
self.__refresh_metrics()
117+
118+
return result
119+
120+
121+
def create_desc(prop):
122+
d = Desc_Skel.copy()
123+
for k, v in prop.iteritems():
124+
d[k] = v
125+
return d
126+
127+
128+
def metric_init(params):
129+
global descriptors, Desc_Skel, _Twemproxy_Connection
130+
#print("twmeproxy received the following parameters")
131+
#print(params)
132+
133+
_Twemproxy_Connection = Twemproxy(params, NAME_PREFIX)
134+
135+
Desc_Skel = {
136+
'name' : 'XXX',
137+
'call_back' : get_value,
138+
'time_max' : 60,
139+
'value_type' : 'uint',
140+
'units' : 'connections',
141+
'slope' : 'both',
142+
'format' : '%u',
143+
'description' : 'XXX',
144+
'groups' : 'twemproxy',
145+
}
146+
147+
# Pools
148+
for pool in _Twemproxy_Connection.get_pools():
149+
descriptors.append(create_desc({
150+
"name" : NAME_PREFIX + pool + "-client_eof",
151+
"description": "# eof on client connections"
152+
}))
153+
154+
descriptors.append(create_desc({
155+
"name" : NAME_PREFIX + pool + "-client_err",
156+
"description": "# errors on client connections"
157+
}))
158+
159+
descriptors.append(create_desc({
160+
"name" : NAME_PREFIX + pool + "-client_connections",
161+
"description": "# active client connections"
162+
}))
163+
164+
descriptors.append(create_desc({
165+
"name" : NAME_PREFIX + pool + "-server_ejects",
166+
"description": "# times backend server was ejected",
167+
"units" : "Count"
168+
}))
169+
170+
descriptors.append(create_desc({
171+
"name" : NAME_PREFIX + pool + "-forward_error",
172+
"description": "# times we encountered a forwarding error",
173+
"units" : "Count"
174+
}))
175+
176+
descriptors.append(create_desc({
177+
"name" : NAME_PREFIX + pool + "-fragments",
178+
"description": "# fragments created from a multi-vector request",
179+
"units" : "Count"
180+
}))
181+
182+
# Nodes (servers)
183+
for node in _Twemproxy_Connection.get_nodes():
184+
descriptors.append(create_desc({
185+
"name" : NAME_PREFIX + node + "-server_eof",
186+
"description": "# eof on server connections"
187+
}))
188+
189+
descriptors.append(create_desc({
190+
"name" : NAME_PREFIX + node + "-server_err",
191+
"description": "# errors on server connections"
192+
}))
193+
194+
descriptors.append(create_desc({
195+
"name" : NAME_PREFIX + node + "-server_timedout",
196+
"description": "# timeouts on server connections"
197+
}))
198+
199+
descriptors.append(create_desc({
200+
"name" : NAME_PREFIX + node + "-server_connections",
201+
"description": "# active server connections"
202+
}))
203+
204+
descriptors.append(create_desc({
205+
"name" : NAME_PREFIX + node + "-requests",
206+
"description": "# requests",
207+
"units" : "Requests"
208+
}))
209+
210+
descriptors.append(create_desc({
211+
"name" : NAME_PREFIX + node + "-request_bytes",
212+
"description": "total request bytes",
213+
"units" : "Bytes"
214+
}))
215+
216+
descriptors.append(create_desc({
217+
"name" : NAME_PREFIX + node + "-responses",
218+
"description": "# respones",
219+
"units" : "Count"
220+
}))
221+
222+
descriptors.append(create_desc({
223+
"name" : NAME_PREFIX + node + "-response_bytes",
224+
"description": "total response bytes",
225+
"units" : "Bytes"
226+
}))
227+
228+
descriptors.append(create_desc({
229+
"name" : NAME_PREFIX + node + "-in_queue",
230+
"description": "# requests in incoming queue",
231+
"units" : "Requests"
232+
}))
233+
234+
descriptors.append(create_desc({
235+
"name" : NAME_PREFIX + node + "-in_queue_bytes",
236+
"description": "current request bytes in incoming queue",
237+
"units" : "Bytes"
238+
}))
239+
240+
descriptors.append(create_desc({
241+
"name" : NAME_PREFIX + node + "-out_queue",
242+
"description": "# requests in outgoing queue",
243+
"units" : "Requests"
244+
}))
245+
246+
descriptors.append(create_desc({
247+
"name" : NAME_PREFIX + node + "-out_queue_bytes",
248+
"description": "current request bytes in outgoing queue",
249+
"units" : "Bytes"
250+
}))
251+
252+
return descriptors
253+
254+
255+
def get_value(name):
256+
global _Twemproxy_Connection
257+
return _Twemproxy_Connection.get_value(name)
258+
259+
def metric_cleanup():
260+
pass
261+
262+
263+
264+
if __name__ == '__main__':
265+
params = {
266+
'stats_addr': 'localhost',
267+
'stats_port': '22222'
268+
}
269+
metric_init(params)
270+
for d in descriptors:
271+
v = d['call_back'](d['name'])
272+
print 'value for %s is %u' % (d['name'], v)

0 commit comments

Comments
 (0)