@@ -61,13 +61,124 @@ def start_thread():
6161 thread2 .start ()
6262
6363
64+ class Message ():
65+ """class"""
66+ def __init__ (self ):
67+ self .s_pid = 0
68+ self .s_pn = ""
69+ self .d_pid = 0
70+ self .d_pn = ""
71+ self .method = "PUT"
72+ self .command = 0
73+ self .payload = None
74+
75+ def get_payload (self ):
76+ return self .payload
77+
78+ def get_source (self ):
79+ return self .s_pn , self .s_pid
80+
81+ def get_destination (self ):
82+ return self .d_pn , self .d_pid
83+
84+ def get_method (self ):
85+ return self .method
86+
87+ def get_command (self ):
88+ return self .command
89+
90+
91+ class MessageProcess ():
92+ """process message"""
93+
94+ def __init__ (self , process_name , process_id , queue_in , queue_out , call_back ):
95+ """init"""
96+ self .process_id = process_id
97+ self .process_name = process_name
98+ self .msg_queue_in = queue_in
99+ self .msg_queue_out = queue_out
100+ self .msg_in_count = 0
101+ self .msg_out_count = 0
102+ # 与本进程相连的其他进程,只能和列表中的进程相通信
103+ self .process_link_info = {}
104+
105+ # 回调函数,在调用获取消息接口后会调用回调函数进行处理
106+ self .call_back = call_back
107+
108+ self .message_send = Message ()
109+ self .message_receive = Message ()
110+
111+ def add_process_link (self , process_name , process_id , queue_in , queue_out ):
112+ """add process link"""
113+
114+ if process_name in self .process_link_info :
115+ return False , "process_name in link"
116+
117+ for pro in self .process_link_info :
118+ if self .process_link_info [pro ]['process_id' ] == process_id :
119+ return False , "process_id in link"
120+
121+ new_link = None
122+ new_link [process_name ] = {
123+ 'process_id' : process_id ,
124+ 'queue_in' : queue_in ,
125+ 'queue_out' : queue_out ,
126+ 'msg_send_count' : 0 ,
127+ 'msg_receive_count' : 0
128+ }
129+
130+ self .process_link_info [process_name ] = new_link
131+ return True , "process link add success"
132+
133+ def del_process_link (self , * agr ):
134+
135+ if isinstance (type (agr ), type (1 )) is True :
136+ for pro in self .process_link_info :
137+ if agr == self .process_link_info [pro ]['process_id' ]:
138+ del self .process_link_info [pro ]
139+ if isinstance (type (agr ), type ("" )) is True :
140+ if agr in self .process_link_info :
141+ del self .process_link_info [agr ]
142+ return True
143+
144+ def send_msg (self , process_name , method , command , data ):
145+ """send msg"""
146+ for pro in self .process_link_info :
147+ if process_name == self .process_link_info :
148+ self .message_send .d_pid = self .process_link_info [pro ]
149+ self .message_send .d_pn = process_name
150+ self .message_send .s_pid = self .process_id
151+ self .message_send .s_pn = self .process_name
152+ self .message_send .method = method
153+ self .message_send .command = command
154+ self .message_send .payload = data
155+
156+ # 发送数据
157+ self .process_link_info [pro ]['msg_in' ].put (self .message_send )
158+ self .msg_out_count += 1
159+ self .process_link_info [pro ]['msg_send' ] += 1
160+ else :
161+ return False , "process not in link"
162+
163+ def get_msg (self ):
164+ if self .msg_queue_in .empty () is True :
165+ return False , "there is no message"
166+ else :
167+ new_msg = self .msg_queue_in .get ()
168+ self .message_receive .method = new_msg ['method' ]
169+ self .message_receive .s_pn = new_msg ['s_pn' ]
170+ self .message_receive .s_pid = new_msg ['s_pid' ]
171+ self .message_receive .d_pn = new_msg ['d_pn' ]
172+ self .message_receive .d_pid = new_msg ['d_pid' ]
173+ self .message_receive .command = new_msg ['command' ]
174+ self .message_receive .payload = new_msg ['payload' ]
175+
176+
64177def my_process (process_name , queue_up , queue_down ):
65- """my process"""
66178
67179 pid = os .getpid ()
68180 print "process_name:%s, process_id:%d" % (process_name , pid )
69181 queue_up .put (pid )
70-
71182 while True :
72183 # 进程接收和处理从其他进程发过来的消息
73184 if queue_down .empty () is True :
@@ -101,7 +212,7 @@ def my_process(process_name, queue_up, queue_down):
101212 # b = my_thread()
102213
103214 msg_channal = {}
104- for count in range (0 , 5 , 1 ):
215+ for count in range (0 , 1 , 1 ):
105216 # 创建进程和通信通道
106217 msg_queue = {}
107218 queue_up = Queue ()
0 commit comments