查看CPU核数
from multiprocessing import cpu_count
print("CPU的核数为:{}".format(cpu_count()))
print(type(cpu_count()))
Python 进程 Process 并行
并行是指两个或者多个事件在同一时刻发生,Python 中的进程属于并行,能充分利用计算机资源,效率最高,同时执行多个任务,占用多个 CPU 资源;
Python 线程 threading 并发
并发是指两个或多个事件在同一时间间隔发生,Python 中的线程属于并发,不管计算机有多少个 CPU ,不管你开了多少个线程,同一时间多个任务会在其中一个 CPU 来回切换,只占用一个 CPU ,效率并不高;
信号量
1)概述:信号量是用来 控制线程并发数的。
2)原理:BoundedSemaphore和Semaphore管理一个内置的计数器。每当 资源释放递增时(调用acquire)计数器-1,资源消耗时递减(调用release)计数器+1。
3)调用格式: threading.BoundedSemaphore/Semaphore(value);值默认1
4)使用场景: 停车位(固定的停车位,车位全部被占用则进不来;只有车子离开其余车才能进来)
5)BoundedSemaphore和Semaphore区别: 前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过将抛出一个异常。
6)注意事项: 计数器不能小于0,当计数器为0时:acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。
案例操作
停车场只有3个停车位。来了5辆汽车需要停车这时候就使用Semaphore来控制访问量了
只能同时允许3辆车同时进入停车场,第4辆车只有等先进入的3辆车中有车出来才可进入
import threading,time,random
semaphore=threading.Semaphore(3)#同一时间只能有3个线程处于运行状态
def run (ii):
semaphore.acquire() # 获得信号量:信号量减一
print(ii,'号车可以进入')
time.sleep(random.randint(0,10)*1)
print(ii,'号停车位释放')
semaphore.release()# 释放信号量:信号量加一
for i in range(5):#创建5个线程
t=threading.Thread(target=run,args=(i,))
t.start()
BoundedSemaphore和Semaphore区别案例操作
BoundedSemaphore调用时如果计数器的值超过了初始值会抛出异常;但是Semaphore不会
import threading,time,random
semaphore=threading.BoundedSemaphore(3)#同一时间只能有3个线程处于运行状态
def run (ii):
semaphore.acquire() # 获得信号量:信号量减一
print(ii,'号车可以进入')
time.sleep(random.randint(0,10)*1)
print(ii,'号停车位释放')
#***************此处高能******************
semaphore.release()# 释放信号量:信号量加一
# 再次释放信号量:信号量加一,这时超过限定的信号量数目会报错ValueError: Semaphore released too many times
semaphore.release()
#***************高能结束******************
for i in range(5):#创建5个线程
t=threading.Thread(target=run,args=(i,))
t.start()
信号量
信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行
如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小
from threading import Thread
from threading import Semaphore
from threading import current_thread
import time
sm = Semaphore(3)
def task():
sm.acquire()
print("%s in" %current_thread().getName())
time.sleep(3)
sm.release()
print()
if __name__ == "__main__":
for i in range(1,11):
t = Thread(target=task)
t.start()
设置坑3个,先是线程1-3抢到坑,然后释放了,其他线程开始抢
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
semaphore信号量相关函数介绍
acquire() — 消耗信号量,内置计数器减一;
release() — 释放信号量,内置计数器加一;
在semaphore信号量有一个内置计数器,控制线程的数量,acquire()会消耗信号量,计数器会自动减一;release()会释放信号量,计数器会自动加一;当计数器为零时,acquire()调用被阻塞,直到release()释放信号量为止。
创建多个线程,限制同一时间最多运行5个线程,示例代码如下:
# -*- coding:utf-8 _*-
# 导入线程模块
import threading
# 导入时间模块
import time
# 添加一个计数器,最大并发线程数量5(最多同时运行5个线程)
semaphore = threading.Semaphore(5)
def foo():
semaphore.acquire() # 计数器获得锁
time.sleep(2) # 程序休眠2秒
print("当前时间:", time.ctime()) # 打印当前系统时间
semaphore.release() # 计数器释放锁
if __name__ == "__main__":
thread_list = list()
for i in range(20):
t = threading.Thread(target=foo, args=()) # 创建线程
thread_list.append(t)
t.start() # 启动线程
for t in thread_list:
t.join()
print("程序结束!")
得到如下结果
当前时间: Thu Apr 14 19:26:48 2022
当前时间: Thu Apr 14 19:26:48 2022
当前时间: Thu Apr 14 19:26:48 2022
当前时间: Thu Apr 14 19:26:48 2022
当前时间: Thu Apr 14 19:26:48 2022
当前时间: Thu Apr 14 19:26:50 2022
当前时间: Thu Apr 14 19:26:50 2022
当前时间: Thu Apr 14 19:26:50 2022
当前时间: Thu Apr 14 19:26:50 2022
当前时间: Thu Apr 14 19:26:50 2022
当前时间: Thu Apr 14 19:26:52 2022
当前时间: Thu Apr 14 19:26:52 2022
当前时间: Thu Apr 14 19:26:52 2022
当前时间: Thu Apr 14 19:26:52 2022
当前时间: Thu Apr 14 19:26:52 2022
当前时间: Thu Apr 14 19:26:54 2022
当前时间: Thu Apr 14 19:26:54 2022
当前时间: Thu Apr 14 19:26:54 2022
当前时间: Thu Apr 14 19:26:54 2022
当前时间: Thu Apr 14 19:26:54 2022
程序结束!
根据打印的日志可以看出,同一时间只有5个线程运行,间隔两秒之后,再次启动5个线程,直到20个线程全部运行结束为止;如果没有设置信号量Semapaore,创建线程直接start(),输出的时间全部都是一样的,这个问题比较简单,可以自己去实验一下!