使用令牌桶算法进行速率限制
速率限制
令牌桶
执行
速率限制
几乎所有使用过 Web 服务器的人都知道“速率限制”这个术语。我认为它是我们经常使用却很少考虑的话题之一。但我认为,如果你曾经使用过速率限制,那么了解它的工作原理会很有帮助。
您可能知道,“速率限制”本质上是监控传入的请求,并确保它们不超过阈值。简单来说,我们定义每个时间单位的请求速率,并丢弃发送数据包数量超过我们速率定义的数据包的流。
你可能注意到了,当达到阈值时,我使用了“丢弃”这个词。但情况并非总是如此。在很多情况下,我们会将数据包放入队列中,以保持数据包的输出速率恒定。(这样数据包就不会被丢弃。)
我们将要讨论的内容不需要队列,我敢说这可能是实现速率限制的最简单方法。
实现速率限制(流量整形)最著名的方法是:
- 令牌桶
- 漏水桶
- (r,t)流量整形
漏桶与令牌桶有些类似,但现在我们不关心恒定的输出率,令牌桶是我们将要讨论的唯一算法。
令牌桶
令牌桶的类比非常简单。它就是一个桶和里面的令牌。让我们一步一步来讨论。
- 在心里想象一个水桶。
- 以恒定的速率向桶中填充令牌。
- 当数据包到达时,检查桶中是否有令牌。
- 如果桶里还有剩余令牌,则从桶里取出一个令牌并转发数据包。如果桶是空的,则直接丢弃数据包。
就是这样。是不是很简单?
执行
闲话少叙。我们来用 Python 写一个简单的令牌桶节流器。
我们首先定义一个在实例化时有 4 个参数的类。
- tokens:每个时间单位内添加到桶中的令牌数量。
- time_unit:令牌在此时间范围内添加。
- forward_callback:当数据包被转发时调用此函数。
- drop_callback:当应该丢弃数据包时调用此函数。
我们还会预先将指定的令牌填充到桶中。这样,当节流器首次启动时,就可以应对突发的数据包。
last_check
是我们之前处理数据包的时间戳。初始化时,我们设置了创建 bucket 的时间。
import time
class TokenBucket:
def __init__(self, tokens, time_unit, forward_callback, drop_callback):
self.tokens = tokens
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback
self.bucket = tokens
self.last_check = time.time()
然后,我们定义handle()
魔法发生的地方。
def handle(self, packet): #1
current = time.time() #2
time_passed = current - self.last_check #3
self.last_check = current #4
self.bucket = self.bucket + \
time_passed * (self.tokens / self.time_unit) #5
if (self.bucket > self.tokens): #6
self.bucket = self.tokens
if (self.bucket < 1): #7
self.drop_callback(packet)
else:
self.bucket = self.bucket - 1 #8
self.forward_callback(packet)
handle
仅接受 1 个参数:数据包。- 当前时间戳被放入
current
。 - 从现在到上次通话之间的时间
handle
被放入time_passed
。 - 我们将其更新
last_check
为当前时间戳。 - 这是最重要的部分。通过将 乘以
time_passed
我们的速率(即tokens
/time_unit
),我们可以得出需要向桶中添加多少个令牌。 - 如果存储桶中的令牌数多于默认值,则重置存储桶。
- 如果桶中没有足够的令牌,则丢弃该数据包。
- 否则,删除一个令牌并转发数据包。
就这样。这是最终的工作版本:
import time
class TokenBucket:
def __init__(self, tokens, time_unit, forward_callback, drop_callback):
self.tokens = tokens
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback
self.bucket = tokens
self.last_check = time.time()
def handle(self, packet):
current = time.time()
time_passed = current - self.last_check
self.last_check = current
self.bucket = self.bucket + \
time_passed * (self.tokens / self.time_unit)
if (self.bucket > self.tokens):
self.bucket = self.tokens
if (self.bucket < 1):
self.drop_callback(packet)
else:
self.bucket = self.bucket - 1
self.forward_callback(packet)
def forward(packet):
print("Packet Forwarded: " + str(packet))
def drop(packet):
print("Packet Dropped: " + str(packet))
throttle = TokenBucket(1, 1, forward, drop)
packet = 0
while True:
time.sleep(0.2)
throttle.handle(packet)
packet += 1
你应该得到类似这样的结果:
Packet Forwarded: 0
Packet Dropped: 1
Packet Dropped: 2
Packet Dropped: 3
Packet Dropped: 4
Packet Forwarded: 5
Packet Dropped: 6
Packet Dropped: 7
Packet Dropped: 8
Packet Dropped: 9
Packet Forwarded: 10
Packet Dropped: 11
Packet Dropped: 12
Packet Dropped: 13
Packet Dropped: 14
Packet Forwarded: 15
正如您在代码中看到的,我们的速率是每秒 1 个数据包。我们每 0.2 秒发送一个数据包。当我们第一次发送数据包时,它会很快清空存储桶,而存储桶再次填满需要一些时间,这就是为什么每次成功转发之间都会丢弃 4 个数据包。
一般来说,速率限制和流量监管是一个非常广泛的主题,因此,如果您喜欢刚刚阅读的内容,那么网上有很多关于这个主题的资料,您可以使用它们来更深入地了解流量监管。
感谢您抽出时间。
鏂囩珷鏉ユ簮锛�https://dev.to/satrobit/rate-limiting-using-the-token-bucket-algorithm-3cjh