1 Token Bucket 机制
Token Bucket 机制(令牌桶算法):
TokenBucket 是一个常用的速率限制算法,用于控制请求速率,确保请求以平稳的速率发送。它的工作原理如下:
初始化:
令牌桶以一定的速率被填充(例如,每秒增加 query_per_second 个令牌)。
桶中令牌的最大数量是有限的,一旦达到上限,令牌不再增加。
获取令牌:
当发送请求时,需要从桶中取出一个令牌。
如果桶中有足够的令牌,请求立即发送。
如果桶中没有足够的令牌,get_token 方法将阻塞,直到有可用的令牌为止
优点:
- 精细控制:可以精细地控制请求的速率,适合需要平稳控制请求频率的情况。
- 适合多线程:适合多线程或高并发环境,因为令牌桶机制可以在各个线程之间共享速率限制。
缺点:
- 实现复杂度:实现起来相对复杂,需要手动管理令牌生成和消耗的逻辑。
- 处理瞬时错误:对于瞬时网络错误或 API 服务暂时不可用的情况,令牌桶机制不如重试机制直接有效。
适用场景:
- 高并发请求控制:需要对高并发请求进行速率限制。
- 持续负载下的速率控制:需要在持续负载下平稳地控制请求速率,例如防止服务器过载。
import time
import threadingclass TokenBucket:def __init__(self, rate, capacity):self.rate = rate # 每秒令牌增加的速率self.capacity = capacity # 桶的最大容量self.tokens = capacity # 当前桶中的令牌数量self.last_check = time.monotonic() # 上次检查时间self.lock = threading.Lock() # 用于多线程环境下的互斥锁def get_token(self):with self.lock:current_time = time.monotonic()elapsed = current_time - self.last_check# 增加令牌数量,考虑速率和时间间隔self.tokens += elapsed * self.rateif self.tokens > self.capacity:self.tokens = self.capacity # 不超过最大容量self.last_check = current_timeif self.tokens >= 1:self.tokens -= 1 # 消耗一个令牌return Trueelse:# 计算需要等待的时间wait_time = (1 - self.tokens) / self.ratetime.sleep(wait_time)self.tokens = 0 # 减少到0,继续下一次请求return True# 使用示例
bucket = TokenBucket(1, 5) # 每秒增加1个令牌,桶容量为5def make_request():if bucket.get_token():print("Request made at", time.time())# 模拟连续的请求
for _ in range(10):make_request()
rate: 每秒增加的令牌数。
capacity: 桶的最大容量。
tokens: 当前桶中的令牌数量,初始化为 capacity。
last_check: 上次检查的时间,用于计算时间间隔。
lock: 互斥锁,确保多线程环境下的安全访问。
get_token 方法:
计算当前时间与上次检查时间之间的间隔,更新令牌数量。
如果桶中有足够的令牌,消耗一个令牌并返回 True。
如果没有足够的令牌,计算需要等待的时间并阻塞,直到可以获取令牌
3 重试机制
重试机制(结合 requests
库和指数退避策略):
- 使用
requests
库的Session
对象发送 HTTP 请求,并结合指数退避策略进行重试。 - 在遇到网络错误或请求失败时,通过逐渐增加等待时间来重试请求。
- 主要优点是处理网络故障和暂时的 API 限制,在重试失败后会有较长的等待时间,有助于避免持续的请求失败。
优点:
- 简单易用:易于实现,尤其是结合
requests
库的Session
对象,可以轻松处理 HTTP 请求。 - 自动处理错误:能够自动处理瞬时错误和暂时的 API 服务不可用的情况,通过指数退避策略避免持续失败。
缺点:
- 缺乏精细控制:不能像令牌桶机制那样精细地控制请求速率,适合处理短期的网络问题或 API 限制。
- 等待时间增加:在高失败率情况下,指数退避策略可能导致较长时间的等待。
适用场景:
- 处理不稳定的网络连接:适合在网络连接不稳定时自动重试请求。
- API 服务不稳定:适合在 API 服务偶尔不可用或负载过高时自动重试请求。
import requests
import timedef make_request_with_retry(apiurl, query, headers, max_retries=10):with requests.Session() as session:backoff_time = 1 # 初始等待时间(秒)retry_count = 0while retry_count < max_retries:try:response = session.post(apiurl, json=query, headers=headers, timeout=120)# 打印或记录响应print(f"Response: {response.text}")response.raise_for_status() # 检查 HTTP 错误return response.json()["bo"]["result"]except requests.RequestException as err:print(f"Error: {err}. Waiting {backoff_time} seconds before retrying...")time.sleep(backoff_time)retry_count += 1backoff_time *= 2 # 指数退避return '' # 返回空字符串作为失败处理
3 选择的建议
- 如果你的系统需要精细控制请求频率,尤其是高并发的环境,选择 Token Bucket 机制。
- 如果你主要处理的是网络不稳定或 API 服务偶尔不可用的情况,使用 重试机制 可能更简单直接。
在一些实际场景中,你可能会结合这两种机制。例如,使用 Token Bucket 来控制总体的请求速率,同时使用重试机制来处理瞬时的网络错误。这种组合方式能够提供更全面的鲁棒性和控制能力。