-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_handler.py
More file actions
134 lines (111 loc) · 4.96 KB
/
test_handler.py
File metadata and controls
134 lines (111 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
Test the stackify.handler module
"""
import unittest
from mock import patch, Mock
try:
import Queue as queue
except ImportError:
import queue
from stackify.handler import StackifyHandler, StackifyListener
from stackify.transport.application import ApiConfiguration
import logging
class TestHandler(unittest.TestCase):
'''
Test the StackifyHandler class
'''
def test_queue_full(self):
'''The queue should evict when full'''
q = queue.Queue(1)
handler = StackifyHandler(queue_=q, listener=Mock())
# don't print warnings on overflow, so mute stackify logger
logging.getLogger('stackify').propagate = False
handler.enqueue('test1')
handler.enqueue('test2')
handler.enqueue('test3')
self.assertEqual(q.qsize(), 1)
self.assertEqual(q.get(), 'test3')
class TestListener(unittest.TestCase):
'''
Test the StackifyListener class
'''
def setUp(self):
self.config = ApiConfiguration(
application='test_appname',
environment='test_environment',
api_key='test_apikey',
api_url='test_apiurl',
)
# don't print warnings on http crashes, so mute stackify logger
logging.getLogger('stackify').propagate = False
@patch('stackify.transport.default.DefaultTransport.create_message')
@patch('stackify.transport.default.http.HTTPClient.POST')
def test_not_identified(self, post, logmsg):
'''The HTTPClient identifies automatically if needed'''
listener = StackifyListener(queue_=Mock(), config=self.config)
listener.handle(Mock())
listener.send_group()
self.assertTrue(listener.transport._transport.identified)
@patch('stackify.transport.default.DefaultTransport.create_message')
@patch('stackify.transport.default.DefaultTransport.create_group_message')
@patch('stackify.transport.default.http.HTTPClient.POST')
def test_send_group_if_needed(self, post, logmsggroup, logmsg):
'''The listener sends groups of messages'''
listener = StackifyListener(queue_=Mock(), max_batch=3, config=self.config)
listener.transport._transport.identified = True
listener.handle(1)
self.assertFalse(post.called)
listener.handle(2)
self.assertFalse(post.called)
self.assertEqual(len(listener.messages), 2)
listener.handle(3)
self.assertTrue(post.called)
self.assertEqual(len(listener.messages), 0)
listener.handle(4)
self.assertEqual(post.call_count, 1)
self.assertEqual(len(listener.messages), 1)
@patch('stackify.transport.default.DefaultTransport.create_message')
@patch('stackify.handler.StackifyListener.send_group')
def test_clear_queue_shutdown(self, send_group, logmsg):
'''The listener sends the leftover messages on the queue when shutting down'''
listener = StackifyListener(queue_=Mock(), max_batch=3, config=self.config)
listener.transport._transport.identified = True
listener._thread = Mock()
listener.handle(1)
listener.handle(2)
self.assertFalse(send_group.called)
listener.stop()
self.assertTrue(send_group.called)
@patch('stackify.transport.default.DefaultTransport.create_message')
@patch('stackify.transport.default.DefaultTransport.create_group_message')
@patch('stackify.transport.default.http.HTTPClient.send_log_group')
def test_send_group_crash(self, send_log_group, logmsggroup, logmsg):
'''The listener drops messages after retrying'''
listener = StackifyListener(queue_=Mock(), max_batch=3, config=self.config)
listener.transport._transport.identified = True
send_log_group.side_effect = Exception
listener.handle(1)
listener.handle(2)
listener.handle(3)
self.assertEqual(len(listener.messages), 0)
listener.handle(4)
self.assertEqual(len(listener.messages), 1)
self.assertEqual(send_log_group.call_count, 1)
@patch('stackify.transport.default.DefaultTransport.create_message')
@patch('stackify.transport.default.DefaultTransport.create_group_message')
@patch('stackify.transport.default.http.HTTPClient.send_log_group')
def test_create_message_crash(self, send_log_group, logmsggroup, logmsg):
'''The listener drops messages after retrying'''
listener = StackifyListener(queue_=Mock(), max_batch=3, config=self.config)
listener.transport._transport.identified = True
logmsg.side_effect = Exception
listener.handle(1)
listener.handle(2)
listener.handle(3)
self.assertEqual(len(listener.messages), 0)
listener.handle(4)
self.assertEqual(len(listener.messages), 0) # messages not created
self.assertEqual(logmsg.call_count, 4) # we called the function 4 times
self.assertEqual(send_log_group.call_count, 0) # since we have exceptions
if __name__ == '__main__':
unittest.main()