|
| 1 | +========== |
| 2 | +分布式进程 |
| 3 | +========== |
1 | 4 |
|
| 5 | +在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。 |
| 6 | + |
| 7 | +Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 |
| 8 | + |
| 9 | +举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现? |
| 10 | + |
| 11 | +原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。 |
| 12 | + |
| 13 | +我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:: |
| 14 | + |
| 15 | + # task_master.py |
| 16 | + |
| 17 | + import random, time, queue |
| 18 | + from multiprocessing.managers import BaseManager |
| 19 | + |
| 20 | + # 发送任务的队列: |
| 21 | + task_queue = queue.Queue() |
| 22 | + # 接收结果的队列: |
| 23 | + result_queue = queue.Queue() |
| 24 | + |
| 25 | + # 从BaseManager继承的QueueManager: |
| 26 | + class QueueManager(BaseManager): |
| 27 | + pass |
| 28 | + |
| 29 | + # 把两个Queue都注册到网络上, callable参数关联了Queue对象: |
| 30 | + QueueManager.register('get_task_queue', callable=lambda: task_queue) |
| 31 | + QueueManager.register('get_result_queue', callable=lambda: result_queue) |
| 32 | + # 绑定端口5000, 设置验证码'abc': |
| 33 | + manager = QueueManager(address=('', 5000), authkey=b'abc') |
| 34 | + # 启动Queue: |
| 35 | + manager.start() |
| 36 | + # 获得通过网络访问的Queue对象: |
| 37 | + task = manager.get_task_queue() |
| 38 | + result = manager.get_result_queue() |
| 39 | + # 放几个任务进去: |
| 40 | + for i in range(10): |
| 41 | + n = random.randint(0, 10000) |
| 42 | + print('Put task %d...' % n) |
| 43 | + task.put(n) |
| 44 | + # 从result队列读取结果: |
| 45 | + print('Try get results...') |
| 46 | + for i in range(10): |
| 47 | + r = result.get(timeout=10) |
| 48 | + print('Result: %s' % r) |
| 49 | + # 关闭: |
| 50 | + manager.shutdown() |
| 51 | + print('master exit.') |
| 52 | + |
| 53 | +请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。 |
| 54 | + |
| 55 | +然后,在另一台机器上启动任务进程(本机上启动也可以):: |
| 56 | + |
| 57 | + # task_worker.py |
| 58 | + |
| 59 | + import time, sys, queue |
| 60 | + from multiprocessing.managers import BaseManager |
| 61 | + |
| 62 | + # 创建类似的QueueManager: |
| 63 | + class QueueManager(BaseManager): |
| 64 | + pass |
| 65 | + |
| 66 | + # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: |
| 67 | + QueueManager.register('get_task_queue') |
| 68 | + QueueManager.register('get_result_queue') |
| 69 | + |
| 70 | + # 连接到服务器,也就是运行task_master.py的机器: |
| 71 | + server_addr = '127.0.0.1' |
| 72 | + print('Connect to server %s...' % server_addr) |
| 73 | + # 端口和验证码注意保持与task_master.py设置的完全一致: |
| 74 | + m = QueueManager(address=(server_addr, 5000), authkey=b'abc') |
| 75 | + # 从网络连接: |
| 76 | + m.connect() |
| 77 | + # 获取Queue的对象: |
| 78 | + task = m.get_task_queue() |
| 79 | + result = m.get_result_queue() |
| 80 | + # 从task队列取任务,并把结果写入result队列: |
| 81 | + for i in range(10): |
| 82 | + try: |
| 83 | + n = task.get(timeout=1) |
| 84 | + print('run task %d * %d...' % (n, n)) |
| 85 | + r = '%d * %d = %d' % (n, n, n*n) |
| 86 | + time.sleep(1) |
| 87 | + result.put(r) |
| 88 | + except Queue.Empty: |
| 89 | + print('task queue is empty.') |
| 90 | + # 处理结束: |
| 91 | + print('worker exit.') |
| 92 | + |
| 93 | +任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。 |
| 94 | + |
| 95 | +现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:: |
| 96 | + |
| 97 | + $ python3 task_master.py |
| 98 | + Put task 3411... |
| 99 | + Put task 1605... |
| 100 | + Put task 1398... |
| 101 | + Put task 4729... |
| 102 | + Put task 5300... |
| 103 | + Put task 7471... |
| 104 | + Put task 68... |
| 105 | + Put task 4219... |
| 106 | + Put task 339... |
| 107 | + Put task 7866... |
| 108 | + Try get results... |
| 109 | + |
| 110 | +task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:: |
| 111 | + |
| 112 | + $ python3 task_worker.py |
| 113 | + Connect to server 127.0.0.1... |
| 114 | + run task 3411 * 3411... |
| 115 | + run task 1605 * 1605... |
| 116 | + run task 1398 * 1398... |
| 117 | + run task 4729 * 4729... |
| 118 | + run task 5300 * 5300... |
| 119 | + run task 7471 * 7471... |
| 120 | + run task 68 * 68... |
| 121 | + run task 4219 * 4219... |
| 122 | + run task 339 * 339... |
| 123 | + run task 7866 * 7866... |
| 124 | + worker exit. |
| 125 | + |
| 126 | +task_worker.py进程结束,在task_master.py进程中会继续打印出结果:: |
| 127 | + |
| 128 | + Result: 3411 * 3411 = 11634921 |
| 129 | + Result: 1605 * 1605 = 2576025 |
| 130 | + Result: 1398 * 1398 = 1954404 |
| 131 | + Result: 4729 * 4729 = 22363441 |
| 132 | + Result: 5300 * 5300 = 28090000 |
| 133 | + Result: 7471 * 7471 = 55815841 |
| 134 | + Result: 68 * 68 = 4624 |
| 135 | + Result: 4219 * 4219 = 17799961 |
| 136 | + Result: 339 * 339 = 114921 |
| 137 | + Result: 7866 * 7866 = 61873956 |
| 138 | + |
| 139 | +这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。 |
| 140 | + |
| 141 | +Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中: |
| 142 | + |
| 143 | +task_master_worker |
| 144 | + |
| 145 | +而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。 |
| 146 | + |
| 147 | +authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。 |
| 148 | + |
| 149 | +小结 |
| 150 | +------------ |
| 151 | + |
| 152 | +Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。 |
| 153 | + |
| 154 | +注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。 |
0 commit comments