-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy paththread_queue.py
More file actions
126 lines (102 loc) · 3.16 KB
/
thread_queue.py
File metadata and controls
126 lines (102 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# _*_ coding:utf-8 _*_
"""
"""
import Queue
import threading
import logging
import random
import time
from basic_thread import join_all_others_thread
logging.basicConfig(level=logging.DEBUG,
format='%(levelname)s %(asctime)s %(threadName)s %(message)s',
datefmt='%Y-%m-%d %I:%M:%S')
lst_que = Queue.Queue()
def produce_item():
return threading.currentThread().name, random.randint(0, 10)
pass
def producer(num):
for i in xrange(num):
item = produce_item()
lst_que.put(item)
logging.info('produce item : ' + str(item))
time.sleep(0.5)
def consume():
while True:
try:
# non-block if lst_queue is empty then, it will raise Empty error
item = lst_que.get(False)
if item:
logging.debug('consume item: ' + str(item))
time.sleep(0.5)
except Queue.Empty, e:
# if lst_que is empty then do the following code snippet
logging.warn('queue empty ' + str(e) + 'now sleep 1 S')
time.sleep(1)
def create_mul_thread(thread_num, prefix_name, target_name):
"""
A template of creating and starting n thread, do the same task.
:param thread_num: the num of thread
:param prefix_name:
:param target_name:
:return:
"""
for i in xrange(thread_num):
t_name = prefix_name + str(i)
produce_num = random.randint(10, 100)
if prefix_name == 'consume--':
t = threading.Thread(name=t_name, target=target_name)
else:
t = threading.Thread(name=t_name, target=target_name, args=(produce_num, ))
t.start()
def create_mul_thread_producer(num):
for i in xrange(num):
t_name = 'producer--' + str(i)
produce_num = random.randint(10, 100)
t = threading.Thread(name=t_name, target=producer, args=(produce_num, ))
t.start()
def test_consume_produce_queue():
produce_num, consume_num = 2, 3
# create 2 producer thread
create_mul_thread(produce_num, 'producer--', producer)
# create 3 consumer thread
create_mul_thread(consume_num, 'consume--', consume)
pass
def consume_echo():
logging.info('set gpu mode, load caffe net')
while True:
item = lst_que.get(True)
logging.info('recognize %s', item)
def get_input_text():
while True:
text = raw_input("please input a sentence")
lst_que.put(text, True)
if 'exit' == text:
break
def create_echo_cp():
"""
create consume_echo, get_input_text thread
"""
c1 = threading.Thread(name="c1", target=consume_echo)
c2 = threading.Thread(name="c2", target=consume_echo)
p1 = threading.Thread(name="p1", target=get_input_text)
c1.start()
c2.start()
p1.start()
join_all_others_thread()
def get_que_len():
global lst_que
lst_que.put('abc')
print lst_que.qsize()
lst_que.put('abc')
lst_que.put('abc')
print lst_que.qsize()
lst_que.put('abc')
lst_que.put('abc')
print lst_que.qsize()
if __name__ == '__main__':
# test_consume_produce_queue()
# create_echo_cp()
s = 'abc'
s = s + '123'
print s
pass