Python 限制线程的最大数量的方法(Semaphore)

  

Python 中通过 Semaphore 对象可以限制线程的最大数量,从而控制线程的并发访问。Semaphore 是一种同步工具,用于保证多个线程间访问资源的顺序或安全性。

Semaphore在Python的Threading模块中实现。Semaphore维护了一个内部计数器,初始提供一个数量参数,来限制并发线程访问的数量。当我们希望限制一定数量的线程访问共享资源时,可以创建一个 Semaphore 对象,并将计数器初始化为限额值。每当线程获取资源时,Semaphores的计数器就减少一。当计数器值为 0 时,任何尝试获取资源的线程都将被阻塞。

具体来看,Semaphore 的使用方法如下:

创建一个Semaphore对象

import threading

# 创建一个Semaphore对象,并指定信号量的数量
semaphore = threading.Semaphore(3)  # 限制最大并发数量为3

获取锁

获取Semaphore信号量时,可以使用semaphore.acquire()方法,这个方法会判断当前的 Semaphore 对象信号量计数器是否大于0,如果大于0,则立即减少信号量计数器的值并返回True,否则线程会被阻塞。

# 获取 Semaphore,信号量-1
semaphore.acquire()
# ...
# 访问共享资源
# ...
# 释放 Semaphore,信号量+1
semaphore.release()

释放锁

释放Semaphore锁时,需要使用semaphore.release() 方法。Semaphore的计数器会增加1,使得其他线程可以获取信号量访问共享资源。

# 释放 Semaphore,信号量+1
semaphore.release()

示例1

import time
import threading

semaphore = threading.Semaphore(3) # 最大并发数量为3

def worker():
    with semaphore:
        print(f'{threading.current_thread().name} 获取到信号量')
        time.sleep(1) # 模拟执行操作
        print(f'{threading.current_thread().name} 释放信号量')

threads = []
for i in range(5):
    t = threading.Thread(target=worker, name=f'thread-{i}')
    t.start()
    threads.append(t)

for t in threads:
    t.join()

在上面的代码中,我们创建了一个Semaphore对象,设置最大并发数量为3。创建了5个线程,同时在每个线程中获取 Semaphore 对象,打印线程名和时间戳,然后睡眠1s,接着释放 Semaphore 对象,打印线程名和时间戳。

输出结果:

thread-0 获取到信号量
thread-1 获取到信号量
thread-2 获取到信号量
thread-1 释放信号量
thread-3 获取到信号量
thread-2 释放信号量
thread-0 释放信号量
thread-4 获取到信号量
thread-3 释放信号量
thread-4 释放信号量

示例2

另一个使用Semaphore来限制最大并发的例子是运用在爬虫中。爬虫是我们需要获取公共资源的应用之一。比如我们需要获取一个网站中很多商品的数据,如果我们同时向网站发出大量的请求,就容易被网站屏蔽。这个时候使用Semaphore可以很好的避免这个问题,代码可以这样实现:

import requests
import threading

class Crawler:
    SEMAPHORE = threading.Semaphore(3)

    def __init__(self, item):
        self.item = item

    def run(self):
        with self.SEMAPHORE:
            print(f'开始爬取商品{self.item}')
            # 发送请求
            response = requests.get(f'https://www.example.com/products?item={self.item}')
            # 解析数据
            data = self.parse_data(response.text)
            # 保存数据
            self.save_data_to_database(data)
            print(f'商品{self.item}爬取完毕')

    def parse_data(self, data):
         # 爬虫解析数据的具体逻辑
        pass

    def save_data_to_database(self, data):
         # 爬虫保存数据到数据库的具体逻辑
        pass

items = ['item1', 'item2', 'item3', 'item4', 'item5', 'item6']
crawlers = [Crawler(item) for item in items]

threads = [threading.Thread(target=crawler.run, args=()) for crawler in crawlers]
for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

在这个例子中,我们首先创建了一个Semaphore对象并设置最大并发数量为3。然后我们创建了Crawler 类,该类代表了一个爬虫任务,随后在run方法中实现了具体的爬虫逻辑。在 run方法中首先获取Semaphore对象,获取到信号量时打印开始爬取一个商品的信息,随后发送请求,解析数据,保存数据到数据库。完成以上操作之后,释放Semaphore对象,打印结束爬取一个商品的信息。

我们创建了6个Crawler对象,分别代表了需要爬取的6个商品,最后在多线程的环境中运行爬虫。Semaphore对象可以限制最大并发数量,保证在同一时间内只有3个爬虫工作,不会让网站服务被过于频繁的访问。

相关文章