Skip to content

Commit b1a1e47

Browse files
committed
添加进程通信消息类
1 parent 875f023 commit b1a1e47

1 file changed

Lines changed: 114 additions & 3 deletions

File tree

multi_thread.py

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,124 @@ def start_thread():
6161
thread2.start()
6262

6363

64+
class Message():
65+
"""class"""
66+
def __init__(self):
67+
self.s_pid = 0
68+
self.s_pn = ""
69+
self.d_pid = 0
70+
self.d_pn = ""
71+
self.method = "PUT"
72+
self.command = 0
73+
self.payload = None
74+
75+
def get_payload(self):
76+
return self.payload
77+
78+
def get_source(self):
79+
return self.s_pn, self.s_pid
80+
81+
def get_destination(self):
82+
return self.d_pn, self.d_pid
83+
84+
def get_method(self):
85+
return self.method
86+
87+
def get_command(self):
88+
return self.command
89+
90+
91+
class MessageProcess():
92+
"""process message"""
93+
94+
def __init__(self, process_name, process_id, queue_in, queue_out, call_back):
95+
"""init"""
96+
self.process_id = process_id
97+
self.process_name = process_name
98+
self.msg_queue_in = queue_in
99+
self.msg_queue_out = queue_out
100+
self.msg_in_count = 0
101+
self.msg_out_count = 0
102+
# 与本进程相连的其他进程,只能和列表中的进程相通信
103+
self.process_link_info = {}
104+
105+
# 回调函数,在调用获取消息接口后会调用回调函数进行处理
106+
self.call_back = call_back
107+
108+
self.message_send = Message()
109+
self.message_receive = Message()
110+
111+
def add_process_link(self, process_name, process_id, queue_in, queue_out):
112+
"""add process link"""
113+
114+
if process_name in self.process_link_info:
115+
return False, "process_name in link"
116+
117+
for pro in self.process_link_info:
118+
if self.process_link_info[pro]['process_id'] == process_id:
119+
return False, "process_id in link"
120+
121+
new_link = None
122+
new_link[process_name] = {
123+
'process_id': process_id,
124+
'queue_in': queue_in,
125+
'queue_out': queue_out,
126+
'msg_send_count': 0,
127+
'msg_receive_count': 0
128+
}
129+
130+
self.process_link_info[process_name] = new_link
131+
return True, "process link add success"
132+
133+
def del_process_link(self, *agr):
134+
135+
if isinstance(type(agr), type(1)) is True:
136+
for pro in self.process_link_info:
137+
if agr == self.process_link_info[pro]['process_id']:
138+
del self.process_link_info[pro]
139+
if isinstance(type(agr), type("")) is True:
140+
if agr in self.process_link_info:
141+
del self.process_link_info[agr]
142+
return True
143+
144+
def send_msg(self, process_name, method, command, data):
145+
"""send msg"""
146+
for pro in self.process_link_info:
147+
if process_name == self.process_link_info:
148+
self.message_send.d_pid = self.process_link_info[pro]
149+
self.message_send.d_pn = process_name
150+
self.message_send.s_pid = self.process_id
151+
self.message_send.s_pn = self.process_name
152+
self.message_send.method = method
153+
self.message_send.command = command
154+
self.message_send.payload = data
155+
156+
# 发送数据
157+
self.process_link_info[pro]['msg_in'].put(self.message_send)
158+
self.msg_out_count += 1
159+
self.process_link_info[pro]['msg_send'] += 1
160+
else:
161+
return False, "process not in link"
162+
163+
def get_msg(self):
164+
if self.msg_queue_in.empty() is True:
165+
return False, "there is no message"
166+
else:
167+
new_msg = self.msg_queue_in.get()
168+
self.message_receive.method = new_msg['method']
169+
self.message_receive.s_pn = new_msg['s_pn']
170+
self.message_receive.s_pid = new_msg['s_pid']
171+
self.message_receive.d_pn = new_msg['d_pn']
172+
self.message_receive.d_pid = new_msg['d_pid']
173+
self.message_receive.command = new_msg['command']
174+
self.message_receive.payload = new_msg['payload']
175+
176+
64177
def my_process(process_name, queue_up, queue_down):
65-
"""my process"""
66178

67179
pid = os.getpid()
68180
print "process_name:%s, process_id:%d" % (process_name, pid)
69181
queue_up.put(pid)
70-
71182
while True:
72183
# 进程接收和处理从其他进程发过来的消息
73184
if queue_down.empty() is True:
@@ -101,7 +212,7 @@ def my_process(process_name, queue_up, queue_down):
101212
# b = my_thread()
102213

103214
msg_channal = {}
104-
for count in range(0, 5, 1):
215+
for count in range(0, 1, 1):
105216
# 创建进程和通信通道
106217
msg_queue = {}
107218
queue_up = Queue()

0 commit comments

Comments
 (0)