使用令牌桶算法进行速率限制 速率限制令牌桶实现

2025-06-10

使用令牌桶算法进行速率限制

速率限制

令牌桶

执行

速率限制

几乎所有使用过 Web 服务器的人都知道“速率限制”这个术语。我认为它是我们经常使用却很少考虑的话题之一。但我认为,如果你曾经使用过速率限制,那么了解它的工作原理会很有帮助。

您可能知道,“速率限制”本质上是监控传入的请求,并确保它们不超过阈值。简单来说,我们定义每个时间单位的请求速率,并丢弃发送数据包数量超过我们速率定义的数据包的流。

你可能注意到了,当达到阈值时,我使用了“丢弃”这个词。但情况并非总是如此。在很多情况下,我们会将数据包放入队列中,以保持数据包的输出速率恒定。(这样数据包就不会被丢弃。)

我们将要讨论的内容不需要队列,我敢说这可能是实现速率限制的最简单方法。

实现速率限制(流量整形)最著名的方法是:

  1. 令牌桶
  2. 漏水桶
  3. (r,t)流量整形

漏桶与令牌桶有些类似,但现在我们不关心恒定的输出率,令牌桶是我们将要讨论的唯一算法。

令牌桶

令牌桶的类比非常简单。它就是一个桶和里面的令牌。让我们一步一步来讨论。

  1. 在心里想象一个水桶。
  2. 以恒定的速率向桶中填充令牌。
  3. 当数据包到达时,检查桶中是否有令牌。
  4. 如果桶里还有剩余令牌,则从桶里取出一个令牌并转发数据包。如果桶是空的,则直接丢弃数据包。

就是这样。是不是很简单?

令牌桶

执行

闲话少叙。我们来用 Python 写一个简单的令牌桶节流器。

我们首先定义一个在实例化时有 4 个参数的类。

  1. tokens:每个时间单位内添加到桶中的令牌数量。
  2. time_unit:令牌在此时间范围内添加。
  3. forward_callback:当数据包被转发时调用此函数。
  4. drop_callback:当应该丢弃数据包时调用此函数。

我们还会预先将指定的令牌填充到桶中。这样,当节流器首次启动时,就可以应对突发的数据包。

last_check是我们之前处理数据包的时间戳。初始化时,我们设置了创建 bucket 的时间。



import time


class TokenBucket:

    def __init__(self, tokens, time_unit, forward_callback, drop_callback):
        self.tokens = tokens
        self.time_unit = time_unit
        self.forward_callback = forward_callback
        self.drop_callback = drop_callback
        self.bucket = tokens
        self.last_check = time.time()


Enter fullscreen mode Exit fullscreen mode

然后,我们定义handle()魔法发生的地方。



def handle(self, packet): #1
    current = time.time() #2
    time_passed = current - self.last_check #3
    self.last_check = current #4

    self.bucket = self.bucket + \
        time_passed * (self.tokens / self.time_unit) #5

    if (self.bucket > self.tokens): #6
        self.bucket = self.tokens

    if (self.bucket < 1): #7
        self.drop_callback(packet)
    else:
        self.bucket = self.bucket - 1 #8
        self.forward_callback(packet)


Enter fullscreen mode Exit fullscreen mode
  1. handle仅接受 1 个参数:数据包。
  2. 当前时间戳被放入current
  3. 从现在到上次通话之间的时间handle被放入time_passed
  4. 我们将其更新last_check为当前时间戳。
  5. 这是最重要的部分。通过将 乘以time_passed我们的速率(即tokens/ time_unit),我们可以得出需要向桶中添加多少个令牌。
  6. 如果存储桶中的令牌数多于默认值,则重置存储桶。
  7. 如果桶中没有足够的令牌,则丢弃该数据包。
  8. 否则,删除一个令牌并转发数据包。

就这样。这是最终的工作版本:



import time


class TokenBucket:

    def __init__(self, tokens, time_unit, forward_callback, drop_callback):
        self.tokens = tokens
        self.time_unit = time_unit
        self.forward_callback = forward_callback
        self.drop_callback = drop_callback
        self.bucket = tokens
        self.last_check = time.time()

    def handle(self, packet):
        current = time.time()
        time_passed = current - self.last_check
        self.last_check = current

        self.bucket = self.bucket + \
            time_passed * (self.tokens / self.time_unit)

        if (self.bucket > self.tokens):
            self.bucket = self.tokens

        if (self.bucket < 1):
            self.drop_callback(packet)
        else:
            self.bucket = self.bucket - 1
            self.forward_callback(packet)


def forward(packet):
    print("Packet Forwarded: " + str(packet))


def drop(packet):
    print("Packet Dropped: " + str(packet))


throttle = TokenBucket(1, 1, forward, drop)

packet = 0

while True:
    time.sleep(0.2)
    throttle.handle(packet)
    packet += 1


Enter fullscreen mode Exit fullscreen mode

你应该得到类似这样的结果:



Packet Forwarded: 0
Packet Dropped: 1
Packet Dropped: 2
Packet Dropped: 3
Packet Dropped: 4
Packet Forwarded: 5
Packet Dropped: 6
Packet Dropped: 7
Packet Dropped: 8
Packet Dropped: 9
Packet Forwarded: 10
Packet Dropped: 11
Packet Dropped: 12
Packet Dropped: 13
Packet Dropped: 14
Packet Forwarded: 15


Enter fullscreen mode Exit fullscreen mode

正如您在代码中看到的,我们的速率是每秒 1 个数据包。我们每 0.2 秒发送一个数据包。当我们第一次发送数据包时,它会很快清空存储桶,而存储桶再次填满需要一些时间,这就是为什么每次成功转发之间都会丢弃 4 个数据包。

一般来说,速率限制和流量监管是一个非常广泛的主题,因此,如果您喜欢刚刚阅读的内容,那么网上有很多关于这个主题的资料,您可以使用它们来更深入地了解流量监管。

感谢您抽出时间。

鏂囩珷鏉ユ簮锛�https://dev.to/satrobit/rate-limiting-using-the-token-bucket-algorithm-3cjh
PREV
DEV.to Widget 现在有暗黑主题!还有粉色主题和海洋主题。查看 dev-widget v1.1.0 的新功能 🌻🌑🌸 使用 Codepen 演示:链接:
NEXT
😎 使用 Node JS 从头开始​​构建 REST Api,无需任何框架🔥 启动并运行服务器🥳 让我们构建基于 CRUD 的 REST API