forked from rabbitinaction/sourcecode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalert_consumer.py
More file actions
109 lines (83 loc) · 3.63 KB
/
alert_consumer.py
File metadata and controls
109 lines (83 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
###############################################
# RabbitMQ in Action
# Chapter 4.2.2 - Alerting Server Consumer
#
# Requires: pika >= 0.9.5
#
# Author: Jason J. W. Williams
# (C)2011
###############################################
import json, smtplib
import pika
def send_mail(recipients, subject, message):
"""E-mail generator for received alerts."""
headers = ("From: %s\r\nTo: \r\nDate: \r\n" + \
subject)
smtp_server = smtplib.SMTP()
smtp_server.connect("mail.ourcompany.com", 25)
recipients,
headers + str(message))
smtp_server.close()
#/(asc.5) Notify Processors
def critical_notify(channel, method, header, body):
"""Sends CRITICAL alerts to administrators via e-mail."""
#/(asc.6) Decode our message from JSON
message = json.loads(body)
#/(asc.7) Transmit e-mail to SMTP server
send_mail(EMAIL_RECIPS, "CRITICAL ALERT", message)
print ("Sent alert via e-mail! Alert Text: %s " + \
"Recipients: %s") % (str(message), str(EMAIL_RECIPS))
#/(asc.8) Acknowledge the message
channel.basic_ack(delivery_tag=method.delivery_tag)
def rate_limit_notify(channel, method, header, body):
"""Sends the message to the administrators via e-mail."""
#/(asc.9) Decode our message from JSON
message = json.loads(body)
#/(asc.10) Transmit e-mail to SMTP server
send_mail(EMAIL_RECIPS, "RATE LIMIT ALERT!", message)
print ("Sent alert via e-mail! Alert Text: %s " + \
"Recipients: %s") % (str(message), str(EMAIL_RECIPS))
#/(asc.11) Acknowledge the message
channel.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
#/(asc.0) Broker settings
AMQP_SERVER = "localhost"
AMQP_USER = "alert_user"
AMQP_PASS = "alertme"
AMQP_VHOST = "/"
AMQP_EXCHANGE = "alerts"
#/(asc.1) Establish connection to broker
creds_broker = pika.PlainCredentials(AMQP_USER, AMQP_PASS)
conn_params = pika.ConnectionParameters(AMQP_SERVER,
virtual_host = AMQP_VHOST,
credentials = creds_broker)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()
#/(asc.2) Declare the Exchange
channel.exchange_declare( exchange=AMQP_EXCHANGE,
type="topic",
auto_delete=False)
#/(asc.3) Build the queues and bindings for our topics
channel.queue_declare(queue="critical", auto_delete=False)
channel.queue_bind(queue="critical",
exchange="alerts",
routing_key="critical.*")
channel.queue_declare(queue="rate_limit", auto_delete=False)
channel.queue_bind(queue="rate_limit",
exchange="alerts",
routing_key="*.rate_limit")
#/(asc.4) Make our alert processors
channel.basic_consume( critical_notify,
queue="critical",
no_ack=False,
consumer_tag="critical")
channel.basic_consume( rate_limit_notify,
queue="rate_limit",
no_ack=False,
consumer_tag="rate_limit")
print "Ready for alerts!"
channel.start_consuming()