-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathElasticSearchMonitor.py
More file actions
158 lines (132 loc) · 6.75 KB
/
ElasticSearchMonitor.py
File metadata and controls
158 lines (132 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Make a ElasticSearch monitor. Print an alert
# when server(s) in cluster show signs of slowdown.
#
# A few assumptions are made:
# * Normal range for requests is under 0.1 sec
# anything outside that range is a timeout/
# indication of a system slowdown.
# * Garbage Collection happens every 10 seconds
# * PEP 8 Line Length conventions are also relaxed
#
# Note: This was tested in a fresh install of ES
# on a personal VPS.
#
# TODO: (in no specific order)
# * Provide Front-End for graphs of system.
# * Save logs to SQLite DB for historical data.
# * Handle Timeout/etc Exceptions more gracefully.
import requests
import json
from collections import defaultdict
import time
ENDPOINT = 'http://localhost:9200'
CLUSTER = '_cluster'
NODES = '_nodes'
TIMEOUT = 0.1
CURRENT_NODES = 1
CPU_THRESH = 95 # Percent
MEM_THRESH = 100 # Percent
GC_THRESH = 10000 # In milliseconds
MAX_SHARDS = 10 # For now
JVM_MEM_THRESH = 95 # JVM Heap size is set automatically, 50% of system RAM
class ESMonitor(object):
# Generic __init__
def __init__(self):
self.connect = ''
self.payload = ''
self.timeout = ''
# Return list of all nodes in cluster.
def node_list(self,payload):
payload = json.loads(payload)
n_list = defaultdict(dict)
for id, node in payload['nodes'].iteritems():
n_list[node['name']] = node['http_address']
return n_list
# Parse stats for each node in cluster, return
# warnings for different instances
def parse_stats(self, stats):
stats = json.loads(stats)
s_list = defaultdict(list)
for id, node in stats['nodes'].iteritems():
usage = node['os']
# Return each node in the format [(Node Name: [CPU %, MEM %]),(...),...]
s_list[node['name']] = '{0}, {1}'.format(usage['cpu_percent'], usage['mem']['used_percent'])
# If CPU usage by node is over 95% we're in trouble.
if usage['cpu_percent'] >= CPU_THRESH:
print 'WARNING! CPU usage at {0}% for {1}!!!'.format(usage['cpu_usage'], node['name'])
# If RAM usage by node is 100% that's a problem.
if usage['mem']['used_percent'] == MEM_THRESH:
print 'WARNING! RAM usage at {0}% for {1}!!!'.format(usage['used_percent'], node['name'])
# If Garbage Collection hasn't happened in a while (10 seconds), throw an error.
for age,info in node['jvm']['gc']['collectors'].iteritems():
if info['collection_time_in_millis'] >= GC_THRESH:
print 'WARNING! JVM Garbage Collection older than 10,000 milliseconds for {0}!!!'.format(node['name'])
# If the JVM Heap is getting too full, throw a warning.
if node['jvm']['mem']['heap_used_percent'] >= JVM_MEM_THRESH:
print 'WARNING! JVM Heap at {0} for {1}!'.format(node['jvm']['mem']['heap_used_percent'], node['name'])
if node['breakers']['request']['estimated_size_in_bytes'] >= node['breakers']['request']['limit_size_in_bytes']:
print 'WARNING! Request size on {0} greater than limit! Request: {1}, Limit: {2}'.format(node['name'],
node['breakers']['request']['estimated_size_in_bytes'],
node['breakers']['request']['limit_size_in_bytes'])
else:
return s_list
def query(self,param):
# Connect to cluster
# Check Health, Nodes, and Stats endpoints
# if there is a timeout, Alert user and return False.
try:
if 'health' in param:
self.connect = requests.get('{0}/{1}/{2}'.format(ENDPOINT,CLUSTER,param), timeout = TIMEOUT)
elif 'nodes' in param:
self.connect = requests.get('{0}/{1}'.format(ENDPOINT,NODES), timeout = TIMEOUT)
elif 'stats' in param:
self.connect = requests.get('{0}/{1}/{2}'.format(ENDPOINT,NODES,param), timeout = TIMEOUT)
return self.connect
except requests.exceptions.Timeout:
print requests.exceptions.Timeout
# return requests.exceptions.Timeout
return False
# Display errors if there are issues during
# initial health stats request.
def connection_debug(self, payload):
if 'green' not in payload['status']:
print 'WARNING! Cluster {0} is at Status {1}. Displaying Health for all Indices.\n'.format(payload['cluster_name'],
payload['status'])
print monitor.query('health?level=indices')
elif payload['number_of_nodes'] > CURRENT_NODES:
print 'ERROR! Cluster {0} has {1} nodes, we are expecting {2}!!!'.format(payload['cluster_name'],
payload['number_of_nodes'],
CURRENT_NODES)
elif payload['active_shards'] > MAX_SHARDS:
print 'WARNING! Cluster {0} has {1} active shards, more than the Max allowed: {2}'.format(payload['cluster_name'],
payload['active_shards'],
MAX_SHARDS)
print 'Cluster Request: {0}. Cluster Health: {1}\n'.format(connection.reason, payload['status'])
if __name__ == "__main__":
# Init/main
print '\nElasticSearch Health Monitor Started...'
monitor = ESMonitor()
connection = monitor.query('health')
payload = json.loads(connection._content)
if connection:
monitor.connection_debug(payload)
nodes = monitor.query('nodes')
n_list = monitor.node_list(nodes._content)
print [(node,info) for node,info in n_list.iteritems()]
stats = monitor.query('stats')
stat_parse = monitor.parse_stats(stats._content)
print [(stat,info) for stat,info in stat_parse.iteritems()]
else:
print 'TIMEOUT'
# Run until an error or use Ctrl-c out
while True:
connection = monitor.query('health')
if connection:
print connection.reason
stat_parse = monitor.parse_stats(stats._content)
print [(stat,info) for stat,info in stat_parse.iteritems()]
else:
print 'ElasticSearch Health Timeout! See logs for cluster system stats history.'
# Sleep for 1 second and continue looping
time.sleep(1)
print 'ElasticSearch Health Monitor Terminated.'