breaking.bucket

View Source
import math
from dataclasses import dataclass, field
from typing import Tuple

from breaking.clock import Clock, MonotonicClock


@dataclass
class TokenBucket:
    """
    Impose an upper bound on a number of events over a time interval.

    Use cases:

     - Rate limiting the number of requests for clients.
     - Circuit breaking after a certain number of exceptions have happened.

    ### How this works

    I will slowly build up to how token buckets work and why they are
    nice by showing other ways you could implement this (and
    limitations of those approaches).

    There are many related datastructures / algorithms in this space
    eith different trade-offs. What follows is a brief comparison
    between them.

    #### Fixed window counters

    Divide time in a series of buckets, and for each bucket keep track
    of the number of events that have happened.

    Example implementation using a `Dict[Time, Count]`:

    ```
    # An event happens at time `1612604808`
    { 1612604808: 1 }

    # Another event happens within the same second.
    { 1612604808: 2 }

    # Time moves forward into the next second.

    # A new event happens. Remove the counter for the previous
    # second and start anew.
    { 1612604808: 2, 1612604809: 1 }
      ^              ^ Limit checks now use this counter.
      Previous key. Will be deleted.
    ```

    Based on these counters, you can implement your rate limiting logic.
    Conceptually, this is very simple. However, there is a major limitation.

    **You can only consider event rates for the current
    bucket**. (Otherwise you would need to loop over the entire dict,
    which is inefficient). This introduces inaccuracies in counting
    when events happen when your buckets roll over.

    Consider:

    ```
    # We'd like to limit the number of requests to 10/second.

    # At `1612604808.95` (near the end of second `1612604808`), 10
    # requests come in.
    { 1612604808: 10 }

    # At `1612604809.05` (the beginning of second `1612604809`), 11
    # more requests come in.
    { 1612604808: 10, 1612604809: 10 }
      ^               ^ Used for comparison
      Not used anymore
    ```

    You conceptually had a rate limit of 10 requests / second. However
    at the second boundaries, you can actually allow up to 20 requests.

    #### Sliding window counter

    This is an approach to fix the inaccuracies above.

     - Store all event times in a sorted set, with millisecond accuracy.
     - When new events come in, add their time values to the new set.
     - Clean up all old requests from the set every second (this is fast
       because the set is sorted).

    This is more accurate, but trades accuracy for memory. Depending
    on the limits that you set, this could be a dealbreaker.

    #### Token buckets

    Token buckets are a way to fix the accuracy and the memory usage
    problems with the implementations above.

    Each bucket starts out full of tokens.

    ```
    |---| <- capacty_current, capacity_max
    |   |
    |   |
    |___|
    ```

    When events happen, the current capcaity is decremented:

    ```
    |   | <- capacity_max
    |   |
    |---| <- capacity_current
    |___|
    ```

    When the bucket runs out of tokens, no new events are allowed:

    ```
    |   | <- capacity_max
    |   |
    |   |
    |___| <- capacity_current
    ```

    Over time, the capacity is restored at `restore_rate_hz`, and
    new events are allowed again:

    ```
    |   | <- capacity_max
    |---| <- capacity_current
    |   |
    |___|
    ```

    By keeping track of the `last_restore` time, we can perform this
    restoration calcuation every time we try to take from the bucket.

    We get the full accuracy offered by the sorted set, but with less
    memory requirements. Nice.

    (This class also has a `fill()` operation, which is helpful when
    implementing circuit breakers.)

    #### Leaky Buckets

    **Token buckets and leaky buckets have different goals and usecases.**
    But since they appear very similar on the surface, I'm mentioning them
    for completeness.

    Both leaky and token buckets contain a backpressure mechanism, but
    they have different goals:

     - Leaky buckets turn bursty streams of events into a stream of
       constant rate.
     - Token buckets make sure no more than `X` events happen over a
       certain time window. The resulting stream has an upper bound on
       the number of elements, but is still allowed to be bursty.

    Or more concretely:

     - Leaky buckets buffer requests and output them at a steady rate. Limits
       are imposed when the buffer is full.
     - Token buckets allow reqeusts immediately. Limits are imposed when more
       than X requests happen in a period of time.

    Here is a table which compares the two:

    | bucket type             | token                  | leaky              |
    |-------------------------|------------------------|--------------------|
    | goal                    | impose upper bound     | control burstiness |
    | requests happen         | immediately            | at leak rate       |
    | backpressure when       | empty                  | full               |
    | analogous datastructure | replenishing semaphore | bounded queue      |

    """

    capacity_max: int = field()
    capacity_current: int = field(init=False)

    restore_rate_hz: float = field()
    """How much capacity should be restored every second?"""
    last_restore: float = field(init=False)
    """Time the last capacity restore took place."""

    clock: Clock = field(default_factory=MonotonicClock)
    """Clock implementation to use.

    This is configurable because to facilitate testing the behavior of
    this class without waiting for actual time to pass. See `breaking.clock`
    for further details."""

    def __post_init__(self) -> None:
        self.last_restore = self.clock.seconds_since_epoch()
        self.capacity_current = self.capacity_max

        if self.capacity_max < 1:
            raise ValueError("capacity_max must be >= 1")

        if self.restore_rate_hz < 1:
            raise ValueError("restore_rate_hz must be >= 1")

        if math.isnan(self.restore_rate_hz):
            raise ValueError("restore_rate_hz cannot be NaN")

        if math.isinf(self.restore_rate_hz):
            raise ValueError("restore_rate_hz cannot be Inf")

    def has_tokens_left(self, n: int = 1) -> bool:
        """
        Does the bucket have capacity for `n` more items?
        """
        assert n >= 1, "`n` must be >= 1"

        self._refill_capacity()
        return self.capacity_current - n >= 0

    def fill(self, n: int = 1) -> int:
        """
        Put `n` items into the bucket.

        You can use this method to "return capacity" to the bucket.
        The bucket will never fill beyond `capacity_max`.

        Returns:
            The amount of tokens that didn't fit in the bucket anymore.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current + n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

    def take(self, n: int = 1) -> int:
        """
        Take `n` items from the bucket.

        You can use this method to "drain capacity" from the bucket. The
        bucket capacity will never drop below `0`.

        Returns:
            The amount of tokens that could not be taken from the bucket.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current - n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

    def _refill_capacity(self) -> None:
        """
        Update the current capacity based on the restore rate.
        """
        now = self.clock.seconds_since_epoch()
        seconds_since_last_drain = now - self.last_restore

        capacity_to_restore = min(
            int(seconds_since_last_drain * self.restore_rate_hz),
            self.capacity_max,
        )
        self.capacity_current = min(
            self.capacity_current + capacity_to_restore, self.capacity_max
        )
        self.last_restore = now


def clip(*, val: int, lower: int, upper: int) -> Tuple[int, int]:
    """
    Clip `val` between `lower` and `upper`.

    Returns:
        `(clipped_val, amount_clipped)`
    """
    assert lower < upper
    clipped_val = max(lower, min(val, upper))
    amount_clipped = abs(val - clipped_val)
    return clipped_val, amount_clipped
@dataclass
class TokenBucket:
View Source
@dataclass
class TokenBucket:
    """
    Impose an upper bound on a number of events over a time interval.

    Use cases:

     - Rate limiting the number of requests for clients.
     - Circuit breaking after a certain number of exceptions have happened.

    ### How this works

    I will slowly build up to how token buckets work and why they are
    nice by showing other ways you could implement this (and
    limitations of those approaches).

    There are many related datastructures / algorithms in this space
    eith different trade-offs. What follows is a brief comparison
    between them.

    #### Fixed window counters

    Divide time in a series of buckets, and for each bucket keep track
    of the number of events that have happened.

    Example implementation using a `Dict[Time, Count]`:

    ```
    # An event happens at time `1612604808`
    { 1612604808: 1 }

    # Another event happens within the same second.
    { 1612604808: 2 }

    # Time moves forward into the next second.

    # A new event happens. Remove the counter for the previous
    # second and start anew.
    { 1612604808: 2, 1612604809: 1 }
      ^              ^ Limit checks now use this counter.
      Previous key. Will be deleted.
    ```

    Based on these counters, you can implement your rate limiting logic.
    Conceptually, this is very simple. However, there is a major limitation.

    **You can only consider event rates for the current
    bucket**. (Otherwise you would need to loop over the entire dict,
    which is inefficient). This introduces inaccuracies in counting
    when events happen when your buckets roll over.

    Consider:

    ```
    # We'd like to limit the number of requests to 10/second.

    # At `1612604808.95` (near the end of second `1612604808`), 10
    # requests come in.
    { 1612604808: 10 }

    # At `1612604809.05` (the beginning of second `1612604809`), 11
    # more requests come in.
    { 1612604808: 10, 1612604809: 10 }
      ^               ^ Used for comparison
      Not used anymore
    ```

    You conceptually had a rate limit of 10 requests / second. However
    at the second boundaries, you can actually allow up to 20 requests.

    #### Sliding window counter

    This is an approach to fix the inaccuracies above.

     - Store all event times in a sorted set, with millisecond accuracy.
     - When new events come in, add their time values to the new set.
     - Clean up all old requests from the set every second (this is fast
       because the set is sorted).

    This is more accurate, but trades accuracy for memory. Depending
    on the limits that you set, this could be a dealbreaker.

    #### Token buckets

    Token buckets are a way to fix the accuracy and the memory usage
    problems with the implementations above.

    Each bucket starts out full of tokens.

    ```
    |---| <- capacty_current, capacity_max
    |   |
    |   |
    |___|
    ```

    When events happen, the current capcaity is decremented:

    ```
    |   | <- capacity_max
    |   |
    |---| <- capacity_current
    |___|
    ```

    When the bucket runs out of tokens, no new events are allowed:

    ```
    |   | <- capacity_max
    |   |
    |   |
    |___| <- capacity_current
    ```

    Over time, the capacity is restored at `restore_rate_hz`, and
    new events are allowed again:

    ```
    |   | <- capacity_max
    |---| <- capacity_current
    |   |
    |___|
    ```

    By keeping track of the `last_restore` time, we can perform this
    restoration calcuation every time we try to take from the bucket.

    We get the full accuracy offered by the sorted set, but with less
    memory requirements. Nice.

    (This class also has a `fill()` operation, which is helpful when
    implementing circuit breakers.)

    #### Leaky Buckets

    **Token buckets and leaky buckets have different goals and usecases.**
    But since they appear very similar on the surface, I'm mentioning them
    for completeness.

    Both leaky and token buckets contain a backpressure mechanism, but
    they have different goals:

     - Leaky buckets turn bursty streams of events into a stream of
       constant rate.
     - Token buckets make sure no more than `X` events happen over a
       certain time window. The resulting stream has an upper bound on
       the number of elements, but is still allowed to be bursty.

    Or more concretely:

     - Leaky buckets buffer requests and output them at a steady rate. Limits
       are imposed when the buffer is full.
     - Token buckets allow reqeusts immediately. Limits are imposed when more
       than X requests happen in a period of time.

    Here is a table which compares the two:

    | bucket type             | token                  | leaky              |
    |-------------------------|------------------------|--------------------|
    | goal                    | impose upper bound     | control burstiness |
    | requests happen         | immediately            | at leak rate       |
    | backpressure when       | empty                  | full               |
    | analogous datastructure | replenishing semaphore | bounded queue      |

    """

    capacity_max: int = field()
    capacity_current: int = field(init=False)

    restore_rate_hz: float = field()
    """How much capacity should be restored every second?"""
    last_restore: float = field(init=False)
    """Time the last capacity restore took place."""

    clock: Clock = field(default_factory=MonotonicClock)
    """Clock implementation to use.

    This is configurable because to facilitate testing the behavior of
    this class without waiting for actual time to pass. See `breaking.clock`
    for further details."""

    def __post_init__(self) -> None:
        self.last_restore = self.clock.seconds_since_epoch()
        self.capacity_current = self.capacity_max

        if self.capacity_max < 1:
            raise ValueError("capacity_max must be >= 1")

        if self.restore_rate_hz < 1:
            raise ValueError("restore_rate_hz must be >= 1")

        if math.isnan(self.restore_rate_hz):
            raise ValueError("restore_rate_hz cannot be NaN")

        if math.isinf(self.restore_rate_hz):
            raise ValueError("restore_rate_hz cannot be Inf")

    def has_tokens_left(self, n: int = 1) -> bool:
        """
        Does the bucket have capacity for `n` more items?
        """
        assert n >= 1, "`n` must be >= 1"

        self._refill_capacity()
        return self.capacity_current - n >= 0

    def fill(self, n: int = 1) -> int:
        """
        Put `n` items into the bucket.

        You can use this method to "return capacity" to the bucket.
        The bucket will never fill beyond `capacity_max`.

        Returns:
            The amount of tokens that didn't fit in the bucket anymore.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current + n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

    def take(self, n: int = 1) -> int:
        """
        Take `n` items from the bucket.

        You can use this method to "drain capacity" from the bucket. The
        bucket capacity will never drop below `0`.

        Returns:
            The amount of tokens that could not be taken from the bucket.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current - n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

    def _refill_capacity(self) -> None:
        """
        Update the current capacity based on the restore rate.
        """
        now = self.clock.seconds_since_epoch()
        seconds_since_last_drain = now - self.last_restore

        capacity_to_restore = min(
            int(seconds_since_last_drain * self.restore_rate_hz),
            self.capacity_max,
        )
        self.capacity_current = min(
            self.capacity_current + capacity_to_restore, self.capacity_max
        )
        self.last_restore = now

Impose an upper bound on a number of events over a time interval.

Use cases:

  • Rate limiting the number of requests for clients.
  • Circuit breaking after a certain number of exceptions have happened.

How this works

I will slowly build up to how token buckets work and why they are nice by showing other ways you could implement this (and limitations of those approaches).

There are many related datastructures / algorithms in this space eith different trade-offs. What follows is a brief comparison between them.

Fixed window counters

Divide time in a series of buckets, and for each bucket keep track of the number of events that have happened.

Example implementation using a Dict[Time, Count]:

# An event happens at time `1612604808`
{ 1612604808: 1 }

# Another event happens within the same second.
{ 1612604808: 2 }

# Time moves forward into the next second.

# A new event happens. Remove the counter for the previous
# second and start anew.
{ 1612604808: 2, 1612604809: 1 }
  ^              ^ Limit checks now use this counter.
  Previous key. Will be deleted.

Based on these counters, you can implement your rate limiting logic. Conceptually, this is very simple. However, there is a major limitation.

You can only consider event rates for the current bucket. (Otherwise you would need to loop over the entire dict, which is inefficient). This introduces inaccuracies in counting when events happen when your buckets roll over.

Consider:

# We'd like to limit the number of requests to 10/second.

# At `1612604808.95` (near the end of second `1612604808`), 10
# requests come in.
{ 1612604808: 10 }

# At `1612604809.05` (the beginning of second `1612604809`), 11
# more requests come in.
{ 1612604808: 10, 1612604809: 10 }
  ^               ^ Used for comparison
  Not used anymore

You conceptually had a rate limit of 10 requests / second. However at the second boundaries, you can actually allow up to 20 requests.

Sliding window counter

This is an approach to fix the inaccuracies above.

  • Store all event times in a sorted set, with millisecond accuracy.
  • When new events come in, add their time values to the new set.
  • Clean up all old requests from the set every second (this is fast because the set is sorted).

This is more accurate, but trades accuracy for memory. Depending on the limits that you set, this could be a dealbreaker.

Token buckets

Token buckets are a way to fix the accuracy and the memory usage problems with the implementations above.

Each bucket starts out full of tokens.

|---| <- capacty_current, capacity_max
|   |
|   |
|___|

When events happen, the current capcaity is decremented:

|   | <- capacity_max
|   |
|---| <- capacity_current
|___|

When the bucket runs out of tokens, no new events are allowed:

|   | <- capacity_max
|   |
|   |
|___| <- capacity_current

Over time, the capacity is restored at restore_rate_hz, and new events are allowed again:

|   | <- capacity_max
|---| <- capacity_current
|   |
|___|

By keeping track of the last_restore time, we can perform this restoration calcuation every time we try to take from the bucket.

We get the full accuracy offered by the sorted set, but with less memory requirements. Nice.

(This class also has a fill() operation, which is helpful when implementing circuit breakers.)

Leaky Buckets

Token buckets and leaky buckets have different goals and usecases. But since they appear very similar on the surface, I'm mentioning them for completeness.

Both leaky and token buckets contain a backpressure mechanism, but they have different goals:

  • Leaky buckets turn bursty streams of events into a stream of constant rate.
  • Token buckets make sure no more than X events happen over a certain time window. The resulting stream has an upper bound on the number of elements, but is still allowed to be bursty.

Or more concretely:

  • Leaky buckets buffer requests and output them at a steady rate. Limits are imposed when the buffer is full.
  • Token buckets allow reqeusts immediately. Limits are imposed when more than X requests happen in a period of time.

Here is a table which compares the two:

bucket type token leaky
goal impose upper bound control burstiness
requests happen immediately at leak rate
backpressure when empty full
analogous datastructure replenishing semaphore bounded queue
TokenBucket( self, capacity_max: int, restore_rate_hz: float, clock: breaking.clock.Clock = <factory> )
capacity_max: int
capacity_current: int
restore_rate_hz: float

How much capacity should be restored every second?

last_restore: float

Time the last capacity restore took place.

Clock implementation to use.

This is configurable because to facilitate testing the behavior of this class without waiting for actual time to pass. See breaking.clock for further details.

def has_tokens_left(self, n: int = 1) -> bool:
View Source
    def has_tokens_left(self, n: int = 1) -> bool:
        """
        Does the bucket have capacity for `n` more items?
        """
        assert n >= 1, "`n` must be >= 1"

        self._refill_capacity()
        return self.capacity_current - n >= 0

Does the bucket have capacity for n more items?

def fill(self, n: int = 1) -> int:
View Source
    def fill(self, n: int = 1) -> int:
        """
        Put `n` items into the bucket.

        You can use this method to "return capacity" to the bucket.
        The bucket will never fill beyond `capacity_max`.

        Returns:
            The amount of tokens that didn't fit in the bucket anymore.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current + n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

Put n items into the bucket.

You can use this method to "return capacity" to the bucket. The bucket will never fill beyond capacity_max.

Returns: The amount of tokens that didn't fit in the bucket anymore. If this value is > 0, then the bucket didn't have capacity for that number of items.

def take(self, n: int = 1) -> int:
View Source
    def take(self, n: int = 1) -> int:
        """
        Take `n` items from the bucket.

        You can use this method to "drain capacity" from the bucket. The
        bucket capacity will never drop below `0`.

        Returns:
            The amount of tokens that could not be taken from the bucket.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current - n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

Take n items from the bucket.

You can use this method to "drain capacity" from the bucket. The bucket capacity will never drop below 0.

Returns: The amount of tokens that could not be taken from the bucket. If this value is > 0, then the bucket didn't have capacity for that number of items.

def clip(*, val: int, lower: int, upper: int) -> Tuple[int, int]:
View Source
def clip(*, val: int, lower: int, upper: int) -> Tuple[int, int]:
    """
    Clip `val` between `lower` and `upper`.

    Returns:
        `(clipped_val, amount_clipped)`
    """
    assert lower < upper
    clipped_val = max(lower, min(val, upper))
    amount_clipped = abs(val - clipped_val)
    return clipped_val, amount_clipped

Clip val between lower and upper.

Returns: (clipped_val, amount_clipped)