multiprocessing模块支持进程间通信的两种主要形式:管道和队列。这两种方法都是实用消息传递来实现的,但队列接口有意模仿线程程序中参见的队列用法。
Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数,如果忽略此参数,则无大小限制。底层队列使用管道和锁定实现。
Queue的实例具有以下方法
JoinableQueue([maxsize])
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例p除了与Queue对象相同的方法外,还具有以下方法:
import multiprocessingdef consumer(input_q):while True:item = input_q.get()# 进行处理工作,此处替换为实际有意义的工作print(item)# 发出信号通知任务完成input_q.task_done()def producer(sequence, output_q):for item in sequence:# 将项目放入队列中output_q.put(item)# 建立进程
if __name__ == '__main__':q = multiprocessing.JoinableQueue()# 运行消费者进程consumer_p = multiprocessing.Process(target=consumer, args=(q, ))consumer_p.daemon = Trueconsumer_p.start()# 生产项目,sequence代表要发送给消费者的项目序列# 在实践中,这可能是生成器的输出或通过一些其他方式生产出来sequence = [1, 2, 3, 4, 5]producer(sequence, q)# 等待所有项目被处理q.join()
这个例子中,将消费者进程设置为后台进程,因为它永久运行,而我们希望但主程序结束时它随之终止(如果忘记这么做,程序将挂起)。这里使用了JoinableQueue,以便让生产者实际了解队列中的所有项目何时被处理完毕。join()操作保证了这一点。如果忘记这个步骤,消费者进程将在有时间完成所有工作之前被终止。
如果需要,可以在同一个队列中放置多个进程,也可以从同一个队列中获取多个进程,例如,如果要构造消费者进程池,可以编写下面这样的代码:
if __name__ == "__main__":q = multiprocessing.JoinableQueue()# 启动一些消费者进程consumer_p1 = multiprocessing.Process(target=consumer, args=(q, ))consumer_p1.daemon = Trueconsumer_p1.start()consumer_p2 = multiprocessing.Process(target=consumer, args=(q, ))consumer_p2.daemon = Trueconsumer_p2.start()# 生产项目sequence = [1, 2, 3, 4, 5]producer(sequence, q)# 等待所有项目被处理q.join()
编写这类代码时,要把队列中的每个项目都序列化,然后通过管道或套接字连接发送给进程。一般的规则是:发送数据量较少的大对象比发送大量小对象要好。
在某些应用程序中,生产者需要通知消费者,它们不在生产任何项目而应该关闭。为此,编写的代码中应该使用标志(sentinel 指示完成的特殊值)。下面这个例子使用none作为标志说明这个概念:
import multiprocessingdef consumer(input_q):while True:item = input_q.get()if item is None:break# 处理项目print(item)# 关闭print("consumer done")def producer(sequence, output_q):for item in sequence:# 把项目放入队列output_q.put(item)if __name__ == '__main__':q = multiprocessing.JoinableQueue()# 启动消费者进程consumer_p = multiprocessing.Process(target=consumer, args=(q, ))consumer_p.daemon = Trueconsumer_p.start()# 生产项目sequence = [1, 2, 3, 4, 5]producer(sequence, q)# 在队列中放置标志,发出完成信号q.put(None)# 等待所有项目被处理, 然后关闭q.join()
如果像上面这个例子中那样使用标志,一定要在队列上为每个使用者都放上标志。例如,如果有三个消费者进程在使用队列上的项目,那么生产者需要在队列中放置三个标志,才能让所有消费者都关闭。
作为使用队列的另一种形式,还可以使用管道在进程之间执行消息传递。
Pipe([duplex])
在进程之间创建一条管道,并返回元组(conn1, conn2), 其中conn1和conn2是表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex设置为false,则conn1只能用于接收,conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。
Pipe()方法返回的Connection对象的实例c具有以下方法和属性
可以使用与队列类似的方式使用管道。下面这个例子说明如何使用管道实现前面的生产者消费者问题:
import multiprocessing# 使用管道上的项目
def consumer(pipe):output_p, input_p = pipe# 关闭管道的输入端input_p.close()while True:try:item = output_p.recv()except EOFError:# 如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常break# 处理项目print(item)# 关闭print("consumer done")# 生产项目并将其放到队列上
def producer(sequence, input_p):for item in sequence:input_p.send(item)if __name__ == "__main__":(output_p, input_p) = multiprocessing.Pipe()# 启动消费者进程consumer_p = multiprocessing.Process(target=consumer, args=((output_p, input_p), ))consumer_p.start()# 关闭生产者中的输出管道output_p.close()# 生产项目sequence = [1, 2, 3, 4, 5]producer(sequence, input_p)# 关闭输入管道,表示完成input_p.close()# 等待消费者进程关闭consumer_p.join()
应当特别注意管道端口的正确管理。如果生产者或消费者中都没有使用管道的某个端点,就应将其关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭了管道的输入端。如果忘记执行这些步骤,程序可能会在消费者中的recv()操作上挂起。
管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:
import multiprocessing# 服务器处理
def adder(pipe):server_p, client_p = pipe# 关闭服务器中的客户端管道client_p.close()while True:try:x, y = server_p.recv()except EOFError:breakresult = x + yserver_p.send(result)# 关闭print("server done")if __name__ == "__main__":(server_p, client_p) = multiprocessing.Pipe()# 启动服务器进程adder_p = multiprocessing.Process(target=adder, args=((server_p, client_p), ))adder_p.start()# 关闭客户端中的服务器管道server_p.close()# 向服务器提出一些请求client_p.send((3, 4))print(client_p.recv())client_p.send(("hello", "world"))print(client_p.recv())# 完成, 关闭管道client_p.close()# 等待消费者进程关闭adder_p.join()
这个例子中,adder()函数以服务器的形式运行,等待消息到达管道的端点。收到之后,它会执行一些处理并将结果返回给管道。send()和recv()方法使用pickle模块对对象进行序列化。但对于使用远程过程调用的高级应用程序而言,应该使用进程池。
下一篇:Java-02对象传递和返回