This article was last updated on <span id="expire-date"></span> days ago, the information described in the article may be outdated.
Python 并发编程
各并发技术
1. 多线程 Thread(threading)
优点:相比进程,更轻量级、占用资源少.
缺点:
- 相比进程:多线程只能并发执行,不能利用多 CPU(GIL)
- 相比协程:启动数目有限制,占用内存资源,有线程切换开销
适用于:IO 密集型计算、同时运行的任务数目要求不多
多进程普通写法
import threading
threads = [] # 线程列表
# 为每个url创建一个线程对象并加入线程列表里
# target: 函数名 args: 函数参数(元组格式)
for url in urls:
threads.append(
threading.Thread(target=craw, args=(url,))
)
# 遍历线程列表启动线程
for thread in threads:
thread.start()
# 等待线程结束再执行主线程
for thread in threads:
thread.join()
利用线程池技术
好处: 新建线程系统需要分配资源、终止线程系统需要回收资源,线程池可以重用线程,则可以减去新建/终止的开销
原理
代码实现
- map 方式提交
map
的结果和入参是顺序对应的,且 map 传入函数参数时要传入参数列表zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的对象,这样做的好处是节约了不少的内存。
我们可以使用
list()
转换来输出列表。元素个数与最短的列表一致.
如果各个迭代器的元素个数不一致,则返回列表长度与最短的对象相同,利用 * 号操作符,可以将元组解压为列表。
from concurrent.futures import ThreadPoolExecutor # 可以设置线程数 with ThreadPoolExecutor(max_workers=10) as pool: # map 向线程池提交任务,传入方法名,以及参数列表 htmls = pool.map(craw, urls) # htmls urls(列表) 都是可迭代对象,可以用 zip 函数将他们打包成一个元组 htmls = list(zip(urls, htmls)) # 结果和入参是顺序对应的 for url, html in htmls: print(url, len(html))
- submit 方式提交
future 模式,更强大,注意如果用
as_completed
顺序是不定的Python 字典 items() 方法以列表返回视图对象,是一个可遍历的 key/value 对。
将:
{'Name': 'Runoob', 'Age': 7}
变为:[('Age', 7), ('Name', 'Runoob')]
方法一
from concurrent.futures import ThreadPoolExecutor,as_completed with ThreadPoolExecutor() as pool: # 利用线程列表启动 futures=[ pool.submit(craw,url) for url in urls ] # 通过遍历线程列表取出结果(要等所有结果运行完才有) for future in futures: print(future.result()) # as_completed 不需要等待所有结果运行完才输出结果 # 一旦有结果运行完就会输出 for future in as_completed(futures): print(future.result())
方法二
适合需要与某一个量一一对应建立联系
from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as pool: futures = {} # 字典,便于一一对应 # htmls格式: ([],[],[],..) # 遍历元组的每个列表,把列表中的两个参数赋值给 url,html for url, html in htmls: # submwit 提交,传入方法名和单个参数 future = pool.submit(parse, html) # 使用字典 将 future 对象与链接一一对应 futures[future] = url # 将转化为元组的字典遍历出来 # future->线程对象 url->线程对应的url for future,url in futures.items(): # 线程对象通过 result 取出运行结果 print(url,future.result())
生产者消费者模型 ( Producer, Consumer)
多组件的
Pipeline
技术架构复杂的事情一般都不会一下子做完,而是会分很多中间步骤一步步完成
多线程数据通信的**
queue.Queue
**queue.Queue
可以用于多线程之间的、线程安全的数据通信多个线程 可以 同时 读取 同一个队列
# 1、导入类库 import queue # 2、创建Queue q = queue.Queue() # 3、添加元素 q.put(item) # 4、获取元素 item = q.get() # 5、查询状态 # 查看元素的多少 q.qsize() # 判断是否为空 q.empty() # 判断是否已满 q.full()
实例:
import threading import queue import random,time # 传入 url 队列 和 Html 队列 def do_craw(url_queue: queue.Queue, html_queue: queue.Queue): while True: # 从 Url 队列里获取一个链接 url = url_queue.get() html = blog_spider.craw(url) # 将获取到的 html 内容放入 html 队列 html_queue.put(html) # current_thead: 当前线程 .name: 获取线程的名字 print(threading.current_thread().name, f"craw {url}", "url_queue.size=", url_queue.qsize()) time.sleep(random.randint(1, 2)) def do_parse(html_queue: queue.Queue, fout): while True: html = html_queue.get() results = blog_spider.parse(html) for result in results: fout.write(str(result) + "\n") print(threading.current_thread().name, f"results.size", len(results), "html_queue.size=", html_queue.qsize()) time.sleep(random.randint(1, 2)) if __name__ == "__main__": url_queue = queue.Queue() # 链接队列 html_queue = queue.Queue() # Html 文件队列 # 将所有链接添加到链接队列里面 for url in urls: url_queue.put(url) # 创建链接访问线程 for idx in range(3): t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}") t.start() # 创建 HTML 解析线程 fout = open("02.data.txt", "w") for idx in range(2): # name: 设置线程的名字. t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}") t.start()
2. 多进程 (MultiProcess)
全局解释器锁 (Python 的大缺点)
- 任何时刻仅有一个线程在执行。
- 在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程
- GIL 目的:为了解决多线程之间数据完整性和状态同步问题
- GIL 带来的问题
- 即使使用了多线程,同一时刻也只有单个线程使用 CPU,导致多核 CPU 的浪费
- GIL 只会对CPU 密集型的程序产生影响
- 如果程序主要是在做I/O 操作,比如处理网络连接,那么多线程技术常常是一个明智的选择
- 规避 GIL的方法
- 规避方法 2: 使用
multiprocessing
多进程,对 CPU 密集型计算,单独启动子进程解释器去执行 - 规避方法 2: 将计算密集型的任务转移到C 语言中,因为 C 语言比 Python 快得多,注意要在 C 语言中自己释放 GIL
多进程适用于 CPU 密集型计算
多进程的 API 与 多线程的实现十分类似
语法条目 | 多线程 | 多进程 |
---|---|---|
引入模块 | from threading import Thread | from multiprocessing import Process |
新建 启动 等待结束 | t=Thread(target=func, args=(100, )) t.start() t.join() | p = Process(target=f, args=(‘bob’,)) p.start() p.join() |
数据通信 | import queue q = queue.Queue() q.put(item) item = q.get() | from multiprocessing import Queue q = Queue() q.put([42, None, ‘hello’]) item = q.get() |
线程安全加锁 | from threading import Lock lock = Lock() with lock: # do something | from multiprocessing import Lock lock = Lock() with lock: # do something |
池化技术 | from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as executor: # 方法 1 results = executor.map(func, [1,2,3]) # 方法 2 future = executor.submit(func, 1) result = future.result() | from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as executor: # 方法 1 results = executor.map(func, [1,2,3]) # 方法 2 future = executor.submit(func, 1) result = future.result() |
利用进程池技术实现多进程
from concurrent.futures import ProcessPoolExecutor
import time
def test(lover):
print("我喜欢:", lover)
time.sleep(1)
# 任何池化技术都需要写程序入口
if __name__ == "__main__":
# 喜欢的人列表
lovers = ["颖怡", "菲菲", "詹天佑"]
start_time = time.time()
# 1. 使用 map 方式提交
with ProcessPoolExecutor(max_workers=10) as pool:
# map 方式提交进程池需要传入函数名和多参数元组
pool.map(test, lovers)
# 2. 使用 submit 方式提交
with ProcessPoolExecutor() as pool:
# 利用进程列表启动
futures=[
pool.submit(test,lover)
for lover in lovers
]
end_time = time.time()
print("cost Time:", end_time-start_time)
3. 并发锁 (Concurrent Lock)
线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全(简单来说就是程序串行)
import threading
# 实例化锁对象
lock = threading.Lock()
# 上锁
lock.locked()
# 释放锁
lock.release()
用法 1: try-finally
模式
import threading
lock = threading.Lock()
lock.acquire()
try:
# do something
finally:
lock.release()
用法 2: with
模式 (推荐这种方式)
import threading
lock = threading.Lock()
with lock:
# do something
什么时候需要上锁
多人读,不需要|一读一写要加|多人写要加
只是操作共享变量部分的代码要上锁,而不是多线程中所有代码都要上锁,并不影响多线程的执行效果.
4. 异步协程
原理
同步:执行 IO 操作时,必须等待执行完成才得到返回结果。
异步:执行 IO 操作时,不必等待执行就能得到返回结果。
注意:要用在异步 IO 编程中,依赖的库必须支持异步 IO 特性.
爬虫引用中:
requests
不支持异步|需要用aiohttp
补充
1. 什么是协程、异步
举个例子:假设有 1 个洗衣房,里面有 10 台洗衣机,有一个洗衣工在负责这 10 台洗衣机。那么洗衣房就相当于 1 个进程,洗衣工就相当1 个线程。如果有 10 个洗衣工,就相当于10 个线程,1 个进程是可以开多线程的。这就是多线程!
那么协程呢?先不急。大家都知道,洗衣机洗衣服是需要等待时间的,如果 10 个洗衣工,1 人负责 1 台洗衣机,这样效率肯定会提高,但是不觉得浪费资源吗?明明 1 个人能做的事,却要 10 个人来做。只是把衣服放进去,打开开关,就没事做了,等衣服洗好再拿出来就可以了。就算很多人来洗衣服,1 个人也足以应付了,开好第一台洗衣机,在等待的时候去开第二台洗衣机,再开第三台,……直到有衣服洗好了,就回来把衣服取出来,接着再取另一台的(哪台洗好先就取哪台,所以协程是无序的)。这就是计算机的协程!洗衣机就是执行的方法。
当你程序中方法需要等待时间的话,就可以用协程,效率高,消耗资源少。
洗衣房 ==> 进程 | 洗衣工 ==> 线程 | 洗衣机 ==> 方法 (函数)
2. async
\ await
的使用
正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加 async
关键字。
async
用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是 sleep(5))消失后,也就是 5 秒到了再回来执行。
await
用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await
后面只能跟 异步函数 或有 __await__
属性的对象,因为异步程序与一般程序不同。
假设有两个异步函数 async a,async b
,a 中的某一步有 await
,当程序碰到关键字 await b()
后,异步程序挂起后去执行另一个异步 b 程序,就是从函数内部跳出去执行其他函数,当挂起条件消失后,不管 b 是否执行完,要马上从 b 程序中跳出来,回到原程序执行原来的操作。如果 await 后面跟的b 函数不是异步函数,那么操作就只能等 b 执行完再返回,无法在b 执行的过程中返回。如果要在b 执行完才返回,也就不需要用 await 关键字了,直接调用 b 函数就行。所以这就需要 await
后面跟的是异步函数了。在一个异步函数中,可以不止一次挂起,也就是可以用多个 await
。
基本实现:
- 在普通的函数前面加 async 关键字;
await
表示在这个地方等待子函数执行完成,再往下执行。(在并发操作中,把程序控制权交给主程序,让他分配其他协程执行。)await
只能在带有async
关键字的函数中运行。
# 异步io简单实现
import asyncio
import time
# 获取超级事件循环
# event:事件 loop:环
loop = asyncio.get_event_loop()
urls = [
f"https://skyxinye.xyz/#{page}"
for page in range(1, 99)
]
# 定义协程函数
async def myfunc(url):
await time.sleep(2) # 这样写不行,因为 time.sleep 不是异步函数,不能中途跳出执行另一个协程.
await asyncio.sleep(2) # 用协程函数.
# 创建 task 列表
# create:创造
tasks = [
# 在超级循环中创造多个任务,并设置等待任务完成.(如果协程函数中没有同步操作的话就不用设置)
# 任务没有运行
loop.create_task(asyncio.wait(myfunc(url)))
for url in urls
]
# 执行异步事件列表,运行直到完成,如果是协程列表就要设置等待事件完成.
loop.run_until_complete(asyncio.wait(tasks))
利用 aiohttp
模块发送网络请求
# 异步模块 aiohttp 使用
import asyncio
import aiohttp
# Http请求 协程函数
# GET
async def async_get(url):
"""
aiohttp:发送http请求
1.创建一个ClientSession对象
2.通过ClientSession对象去发送请求(get, post, delete等)
3.await 异步等待返回结果
"""
print("Get Url:", url)
async with aiohttp.ClientSession() as se:
async with se.get(url) as resp:
result = await resp.text()
print(len(result))
urls = [
f"https://skyxinye.xyz/#{page}"
for page in range(1, 99999)
]
# 超级循环
loop = asyncio.get_event_loop()
# 事件列表
tasks=[
loop.create_task(async_get(url))
for url in urls
]
# 用 wait 方法遍历事件列表.协程列表需要设置等待运行完成
loop.run_until_complete(asyncio.wait(tasks))
"""
异步协程不用事件列表的另一种实现方法
aiohttp:发送POST请求
"""
# POST
async def main():
data = {'key1': 'value1', 'key2': 'value2'}
url = 'http://httpbin.org/post'
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data) as res:
print(res.status)
print(await res.text())
loop = asyncio.get_event_loop()
# 如果是单个协程就不需要设置 asyncio.wait()
task = loop.create_task(main())
loop.run_until_complete(task)
利用 aiofile
异步操作文件
import aiofiles
import asyncio
async def write():
async with aiofiles.open("test.txt","w",encoding="utf8") as fp:
await fp.write("异步写入文件")
print("文件写入成功")
async def read():
async with aiofiles.open("test.txt","r",encoding="utf8") as fp:
content = await fp.read()
print(content)
async def read2_demo():
async with aiofiles.open("text.txt","r",encoding="utf-8") as fp:
# 读取每行
async for line in fp:
print(line)
if __name__=="__main__":
asyncio.run(write())
Author: WhaleFall
Permalink: https://www.whaleluo.top/python/python-concurrent/
文章默认使用 CC BY-NC-SA 4.0 协议进行许可,使用时请注意遵守协议。
Comments