-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulti_thread.py
More file actions
253 lines (213 loc) · 8.07 KB
/
multi_thread.py
File metadata and controls
253 lines (213 loc) · 8.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# coding: utf-8
# file name: multi_thread.py
# python version: 2.7
# author: wu ming ming
# description: study of multi thread
import threading
from multiprocessing import Process, Queue
import time
import os
# """
# When you use python manage.py runserver Django start two processes, one for the actual development server and other
# to reload your application when the code change
#
# You can test it importing os inside your AppConfig class and print the process id inside the ready function like so:
#
# import os
#
# class SomeAppConfig(AppConfig):
# name = 'some_app'
#
# def ready(self):
# print(os.getpid())
#
# You will see it prints two different processes
#
# You can also start the server without the reload option, and you will see only one process running (and your code
# print("Redefined ready method in some_app") will only be executed once):
#
# python manage.py runserver --noreload
#
# https://stackoverflow.com/questions/37441564/redefinition-of-appconfig-ready
# """
# 为线程定义一个函数
def print_time(thread_name, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print '%s: %s' % (thread_name, time.ctime(time.time()))
class myThread(threading.Thread):
def __init__(self, thread_id, name, counter):
threading.Thread.__init__(self)
# super(myThread, self).__init__(self)
self.thread_id = thread_id
self.name = name
self.counter = counter
# lock.release()
def run(self):
# 从写run函数,增加自己的代码到run中
print 'Starting' + self.name
print_time(self.name, self.counter)
print 'Exiting' + self.name
def start_thread():
thread1 = myThread(1, 'Thread_1', 1)
thread2 = myThread(2, 'Thread_2', 2)
thread1.start()
thread2.start()
class Message():
"""class"""
def __init__(self):
self.s_pid = 0
self.s_pn = ""
self.d_pid = 0
self.d_pn = ""
self.method = "PUT"
self.command = 0
self.payload = None
def get_payload(self):
return self.payload
def get_source(self):
return self.s_pn, self.s_pid
def get_destination(self):
return self.d_pn, self.d_pid
def get_method(self):
return self.method
def get_command(self):
return self.command
class MessageProcess():
"""process message"""
def __init__(self, process_name, process_id, queue_in, queue_out, call_back):
"""init"""
self.process_id = process_id
self.process_name = process_name
self.msg_queue_in = queue_in
self.msg_queue_out = queue_out
self.msg_in_count = 0
self.msg_out_count = 0
# 与本进程相连的其他进程,只能和列表中的进程相通信
self.process_link_info = {}
# 回调函数,在调用获取消息接口后会调用回调函数进行处理
self.call_back = call_back
self.message_send = Message()
self.message_receive = Message()
def add_process_link(self, process_name, process_id, queue_in, queue_out):
"""add process link"""
if process_name in self.process_link_info:
return False, "process_name in link"
for pro in self.process_link_info:
if self.process_link_info[pro]['process_id'] == process_id:
return False, "process_id in link"
new_link = None
new_link[process_name] = {
'process_id': process_id,
'queue_in': queue_in,
'queue_out': queue_out,
'msg_send_count': 0,
'msg_receive_count': 0
}
self.process_link_info[process_name] = new_link
return True, "process link add success"
def del_process_link(self, *agr):
if isinstance(type(agr), type(1)) is True:
for pro in self.process_link_info:
if agr == self.process_link_info[pro]['process_id']:
del self.process_link_info[pro]
if isinstance(type(agr), type("")) is True:
if agr in self.process_link_info:
del self.process_link_info[agr]
return True
def send_msg(self, process_name, method, command, data):
"""send msg"""
for pro in self.process_link_info:
if process_name == self.process_link_info:
self.message_send.d_pid = self.process_link_info[pro]
self.message_send.d_pn = process_name
self.message_send.s_pid = self.process_id
self.message_send.s_pn = self.process_name
self.message_send.method = method
self.message_send.command = command
self.message_send.payload = data
# 发送数据
self.process_link_info[pro]['msg_in'].put(self.message_send)
self.msg_out_count += 1
self.process_link_info[pro]['msg_send'] += 1
else:
return False, "process not in link"
def get_msg(self):
if self.msg_queue_in.empty() is True:
return False, "there is no message"
else:
new_msg = self.msg_queue_in.get()
self.message_receive.method = new_msg['method']
self.message_receive.s_pn = new_msg['s_pn']
self.message_receive.s_pid = new_msg['s_pid']
self.message_receive.d_pn = new_msg['d_pn']
self.message_receive.d_pid = new_msg['d_pid']
self.message_receive.command = new_msg['command']
self.message_receive.payload = new_msg['payload']
def my_process(process_name, queue_up, queue_down):
pid = os.getpid()
print "process_name:%s, process_id:%d" % (process_name, pid)
queue_up.put(pid)
while True:
# 进程接收和处理从其他进程发过来的消息
if queue_down.empty() is True:
# print "receive not msg"
pass
else:
msg_receive = queue_down.get()
print "process pid=%d receive msg: %s" % (pid, msg_receive)
queue_up.put_nowait(msg_receive)
print "put msg back"
time.sleep(0.5)
if __name__ == '__main__':
# 使用简单线程接口创建线程
# print 'multi_progress test function'
# try:
# thread.start_new_thread(print_time, ('Thread-1', 2, ))
# thread.start_new_thread(print_time, ('Thread-2', 4, ))
# except:
# print 'Error: unable to start thread'
#
# while 1:
# pass
# 使用线程类创建和管理线程
# print threading.current_thread
# start_thread()
# start_thread()
# a = my_thread()
# b = my_thread()
msg_channal = {}
for count in range(0, 1, 1):
# 创建进程和通信通道
msg_queue = {}
queue_up = Queue()
queue_down = Queue()
msg_queue['queue_up'] = queue_up
msg_queue['queue_down'] = queue_down
print "queue_up_id:%d, queue_down_id:%d" % (id(queue_up), id(queue_down))
new_process = Process(target=my_process, args=('process_' + str(count), queue_up, queue_down))
new_process.start()
# time.sleep(1)
new_process_pid = queue_up.get()
print "process_%d pid:%d" % (count, new_process_pid)
msg_queue['pid'] = new_process_pid
msg_queue['handle'] = new_process
msg_channal['process_' + str(count)] = msg_queue
time.sleep(2)
for channal_key in msg_channal.keys():
print msg_channal[channal_key]
while True:
for channal_key in msg_channal.keys():
# 和进程通信
if msg_channal[channal_key]['handle'].is_alive() is False:
print "process pid=%d is not alive" % (msg_channal[channal_key]['pid'])
continue
msg_channal[channal_key]['queue_down'].put("msg for queue_%d" % (msg_channal[channal_key]['pid']))
time.sleep(0.5)
queue_channal = msg_channal[channal_key]['queue_up']
msg = queue_channal.get()
print "main process receive msg process pid=%d: %s" % (msg_channal[channal_key]['pid'], msg)
# time.sleep(1)
pass