限流算法


date: 2025-01-23

什么是限流

  • 限流,限制流量,限制用户在给定时间内向服务器请求某些内容的次数。

  • 限制资源的使用从而控制用户被允许执行的操作数量。

  • 通常,限流用于 API、Web 服务和网络设备,以维持稳定性和性能。

为什么要限流

  • 防止滥用:限制请求以防止端点泛滥,确保数据完整性和可用性。

  • 确保公平使用:在用户之间平均分配资源,防止单个用户垄断服务资源,提高整体满意度。

  • 保持性能:防止服务器过载,减少延迟,确保高效的服务交付,增强用户体验。

  • 成本管理:控制资源使用情况,以防止意外的基础设施成本,有效地管理资源。

  • 安全性:通过限制请求率来减轻 DoS 攻击,并保护网站可用性和可靠性免受恶意过载尝试

算法

令牌桶算法

  • 通过生成令牌并将其放入桶中来控制流量。每个请求需要获取一个令牌才能被处理。令牌以固定速率生成,桶的容量有限。

class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.time()

    def allow_request(self):
        now = time.time()
        self.tokens += (now - self.last_refill) * self.rate
        self.tokens = min(self.tokens, self.capacity)
        self.last_refill = now

        if self.tokens >= 1:
            self.tokens -= 1
            return True
        else:
            return False

漏桶算法

  • 请求以固定速率从桶中流出,桶会在接收到请求时进行填充。超出桶容量的请求会被丢弃。请求的处理速率是恒定的。

class LeakyBucket:
    def __init__(self, capacity, leak_rate):
        self.capacity = capacity  # 桶最大空间
        self.leak_rate = leak_rate  # 流出速率 (每秒)
        self.bucket_size = 0  # 当前桶空间
        self.last_updated = time.time()  # 最后更新桶时间

    def add_data(self, data_size):
        # 计算距离上次更新的时间间隔
        current_time = time.time()
        time_elapsed = current_time - self.last_updated
        self.last_updated = current_time
        
        # 根据流出速率重新计算桶剩余空间
        self.bucket_size = max(0, self.bucket_size - self.leak_rate * time_elapsed)
        
        # 添加数据到桶中
        self.bucket_size = min(self.bucket_size + data_size, self.capacity)
        
        # 判断桶是否溢出
        if self.bucket_size >= data_size:
            self.bucket_size -= data_size
            return True
        else:
            return False

# Example usage:
bucket = LeakyBucket(capacity=10, leak_rate=1)  # 配置一个桶,最大空间=10,流出速率是=1/s
data_to_send = 5  # 本次要占用的桶空间大小
if bucket.add_data(data_to_send):
    print(f"Data of size {data_to_send} sent successfully.")
else:
    print(f"Bucket overflow. Unable to send data of size {data_to_send}.")

固定窗口算法

  • 在一个固定的时间段内对请求进行计数,并根据这个计数来决定是否允许新的请求。

class FixedWindow:
    def __init__(self, window_size, max_requests):
        self.window_size = window_size
        self.max_requests = max_requests
        self.requests = 0
        self.window_start = time.time()

    def allow_request(self):
        now = time.time()
        if now - self.window_start >= self.window_size:
            self.requests = 0
            self.window_start = now

        if self.requests < self.max_requests:
            self.requests += 1
            return True
        else:
            return False

滑动窗口算法

  • 维护一个时间窗口,记录在特定时间段内的请求数量。能够动态计算过去一段时间内的请求频率,从而进行限流。

class SlidingWindow:
    def __init__(self, window_size, max_requests):
        self.window_size = window_size
        self.max_requests = max_requests
        self.requests = deque()

    def allow_request(self):
        now = time.time()
        # 弹出所有非当前窗口的请求
        while self.requests and self.requests[0] <= now - self.window_size:
            self.requests.popleft()
           
        # 如果
        if len(self.requests) < self.max_requests:
            self.requests.append(now)
            return True
        else:
            return False

限流算法的优缺点

算法
灵活性
实现复杂度
场景

令牌桶算法

突发流量友好

简单

API 限流、网络流量控制

漏桶算法

平滑流量友好

简单

视频流传输、数据上传

固定窗口算法

平滑流量友好

简单

定时任务、低频流量

滑动窗口算法

精确控制流量

中等

实时数据处理、在线游戏

扩展

  • 通过动态配置算法入参,可以实现实现灵活的限流策略。

  • 分布式中,计数器可以通过中间件实现,如redis,但也引入了中间件维护的职责。

  • 对于算法,可以引入自己业务需要的其他规则,以适配自己的需求。

参考

  • https://www.geeksforgeeks.org/rate-limiting-algorithms-system-design/#handling-bursts-and-spikes

Last updated