线程
,最小的执行单元;进程
,最小的资源分配单元,至少由一个线程组成;Unix
/Linux
操作系统的 fork()
可以把当前进程(父进程)复制一份(子进程),然后分别在父子进程内返回,其中子进程返回的是 0,父进程返回的是子进程 ID,子进程通过 getppid()
获取父进程 ID;
os
模块封装的系统调用包含 fork
;
import osprint(f'Process {os.getpid()} start...')
pid = os.fork()
if pid = 0:print(f'child process {os.getpid()}, parent is {os.getppid()}')
else:print(f'process {os.getpid()} created a child process {pid}')
Process 1798 start...
child process 1799, parent is 1798
process 1798 created a child process 1799
跨平台的多进程模板;
from multiprocessing import Processimport os
import timedef proc(name):# time.sleep(5)print(f'run child process {name} ({os.getpid()})')if __name__ == "__main__":print(f'parent process {os.getpid()}')# 创建 Process 实例p = Process(target=proc, args=('test', ))print('child process will start.')# 启动 p 子进程p.start()# 阻塞,等待 p 子进程结束后才继续往下执行p.join()print('child process end.')
parent process 4872
child process will start.
run child process test (14360)
child process end.
子进程结束后主进程才会退出;
进程池,用于批量启动子进程;
from multiprocessing import Pool
import os, time, randomdef proc(name):print(f'run task {name} ({os.getpid()})')start = time.time()time.sleep(random.random() * 3)end = time.time()print(f'task {name} runs: {end-start}')if __name__ == "__main__":print(f'parent process {os.getpid()}')# 正在运行的子进程个数p = Pool(4)for i in range(5):p.apply_async(proc, args=(i, ))print('waiting for all subprocesses done...')# close 后不能继续添加新的 processp.close()# 等待所有子进程执行完毕,必须在 close 后调用p.join()print('all subprocess done.')
parent process 14360
waiting for all subprocesses done...
run task 0 (10800)
run task 1 (7996)
run task 2 (4084)
run task 3 (3492)
task 0 runs: 1.7698190212249756
run task 4 (10800)
task 1 runs: 1.9010038375854492
task 2 runs: 2.1044681072235107
task 3 runs: 2.431309700012207
task 4 runs: 2.9220056533813477
all subprocess done.
创建外部进程,并控制其输入输出,如调用命令nslookup
,并与之交互;
# 调用 snlookup 子进程
p = subprocess.call(['nslookup', 'www.python.org'])# 启动 nslookup 子进程,并设置其输入输出流
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 向 p 子进程输入命令,并接收输出
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
# 打印输出
print(output.decode('gbk'))
Python 的multiprocessing
模块提供了Queue
、Pipes
等交换数据的方式;
from multiprocessing import Process, Queue
import os, time, randomdef write(q):print(f'process to write: {os.getpid()}')for value in ['A', 'B', 'C']:print(f'put {value} to queue...')q.put(value)time.sleep(random.random())def read(q):print(f'process to read: {os.getpid()}')while True:value = q.get(True)print(f'get {value} from queue.')if __name__ == "__main__":q = Queue()pw = Process(target=write, args=(q, ))pr = Process(target=read, args=(q, ))pw.start()pr.start()pw.join()# 写完即可关闭读进程pr.terminate()
process to write: 9472
put A to queue...
process to read: 10084
get A from queue.
put B to queue...
get B from queue.
put C to queue...
get C from queue.
Python 标准库提供了两个线程模块:_thread
(低级模块)、threading
(高级模块,_thread
的封装);
import time, threadingdef loop():print(f'thread {threading.current_thread().name} is running...')n = 0while n < 5:n = n + 1print(f'thread {threading.current_thread().name} >>> {n}')time.sleep(1)print(f'thread {threading.current_thread().name} ended.')print(f'thread {threading.current_thread().name} is running...')
# 传入一个函数以创建 Thread 实例
t = threading.Thread(target=loop, name='LoopThread')
# 启动子线程
t.start()
# 阻塞等待子线程执行结束
t.join()
print(f'thread {threading.current_thread().name} ended.')
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
多进程中,同一个变量会被拷贝到每个进程中,互不影响;
多线程中,所有变量由所有线程共享;
高级语言的一条语句在 CPU 执行时可能是若干条语句,当多个线程同时操作同一个变量时,变量在缓存状态下可能已被修改,此时执行结果无法满足"一致性"(类比事务
的一致性
);
import threading
balance = 0
lock = threading.Lock()def run_thread(n):for i in range(10000):# 获取锁lock.acquire()try:change_it(n)finally:# 释放锁lock.release()
锁
,可以避免上述问题,同一时刻最多只有一个线程持有同一个锁,只有成功获得锁,才能继续执行代码,其他尝试获取该锁的线程会被阻塞,知道获得锁的线程释放该锁;坏处
阻止多线程的并发执行;
死锁
存在多个锁,多个线程试图获取对方持有的锁,会导致多个线程同时挂起,只能靠操作系统强制终止;
Global Interpreter Lock
全局解释器锁,任何 Python 线程在执行前必须先获得GIL
,每执行 100 条字节码自动释放GIL
,这导致多线程在同一时间实际永远不能利用多核 CPU,只能交替执行;
Python 中多线程并发不能利用多核 CPU;
多线程环境下,全局变量必须加锁,相较使用局部变量会更好;
局部变量需要在函数调用建以参数传递,十分麻烦,此时可引入ThreadLocal
变量;
ThreadLocal
,变量虽是全局变量,但每个线程只能读取自己线程独立副本,互不干扰;import threading# 创建全局ThreadLocal对象:
local_school = threading.local()def process_student():# 获取当前线程关联的student:std = local_school.studentprint('Hello, %s (in %s)' % (std, threading.current_thread().name))def process_thread(name):# 绑定ThreadLocal的student:local_school.student = nameprocess_student()t1 = threading.Thread(target=process_thread, args=('Alice', ), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob', ), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
多进程
子线程
多线程的并行实际是单核快速在多线程间切换的效果,当线程多到一定程度,线程切换会消耗系统大量资源,使效率急剧下降;
计算密集型
消耗都在 CPU,最高效率利用 CPU,同时进行的任务数应等于 CPU 核数;
Python 的运行效率较低,不适合计算型程序,最好使用 C 语言;
IO 密集型
消耗在等待 IO 上,CPU 消耗少,任务越多,CPU 效率越高;
Python 与 C 语言在 IO 密集型程序上的运行效率表现相差不大,Python 开发效率往往更高;
操作系统提供的异步 IO 支持,事件驱动模型;
Python 中单线程的异步编程模型称为协程
;
Process 可以分布到多台机器上(分布式),Thread 最多只能分布在同一机器的多个 CPU;
Python 的multiprocessing
模块的子模块managers
对分布式做了支持;
# task_master.pyimport random, queue
from multiprocessing.managers import BaseManager# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()def get_task_queue():return task_queuedef get_result_queue():return result_queue# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):passif __name__ == "__main__":# 把两个Queue都注册到网络上, callable参数关联了Queue对象:QueueManager.register('get_task_queue', callable=get_task_queue)QueueManager.register('get_result_queue', callable=get_result_queue)# 绑定端口5000, 设置验证码'abc':manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')# 启动Queue:manager.start()# 获得通过网络访问的Queue对象:task = manager.get_task_queue()result = manager.get_result_queue()# 放几个任务进去:for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)# 从result队列读取结果:print('Try get results...')for i in range(10):r = result.get(timeout=100)print('Result: %s' % r)# 关闭:manager.shutdown()print('master exit.')
import time, queue
from multiprocessing.managers import BaseManager# 创建类似的QueueManager:
class QueueManager(BaseManager):passif __name__ == "__main__":# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')# 连接到服务器,也就是运行task_master.py的机器:server_addr = '127.0.0.1'print('Connect to server %s...' % server_addr)# 端口和验证码注意保持与task_master.py设置的完全一致:manager = QueueManager(address=(server_addr, 5000), authkey=b'abc')# 从网络连接:manager.connect()# 获取Queue的对象:task = manager.get_task_queue()result = manager.get_result_queue()# 从task队列取任务,并把结果写入result队列:for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n * n)time.sleep(1)result.put(r)except queue.Queue.Empty:print('task queue is empty.')# 处理结束:print('worker exit.')
通过managers
模块把Queue
暴露到网络以便其他机器的进程访问,worker
中并没有保存Queue
,Queue
存储在master
中;
Queue
是用来传递任务和接收结果的,任务的描述数据要尽量小,如果要传输体量较大的数据,可通过共享磁盘等,让worker
自行读取;
上一篇:「Python 基础」I/O 编程、正则表达式
专栏:《Python 基础》
PS:感谢每一位志同道合者的阅读,欢迎关注、评论、赞!