breaking

View Source
from breaking.breaker import CircuitBreaker, TooManyErrors
from breaking.bucket import TokenBucket
from breaking.clock import Clock, MockClock, MonotonicClock

__all__ = [
    "CircuitBreaker",
    "Clock",
    "MonotonicClock",
    "TooManyErrors",
    "MockClock",
    "TokenBucket",
]
class CircuitBreaker:
View Source
class CircuitBreaker:
    """
    Stop executing code after too many exceptions have been occurred.

    Allows a maximum number of `error_threshold` errors over a time window of
    `time_window_secs`. After this threshold has been exceeded, we disallow any
    further requests until enough time has passed.

    See `breaking.bucket.TokenBucket` for further details on how requests are
    replenished.

    If `exceptions_types` is passed, the CircuitBreaker will only count
    exceptions of the given types. All these exceptions are re-raised.

    This class is a ContextManager, so you can use it in a `with` statement.
    """

    def __init__(
        self,
        error_threshold: int,
        time_window_secs: int,
        exceptions_types: Tuple[Type[Exception], ...] = (Exception,),
        clock: Optional[Clock] = None,
    ):
        self._exception_types = exceptions_types

        restore_rate_hz = error_threshold / time_window_secs

        if clock is None:
            clock = MonotonicClock()

        self._bucket = TokenBucket(
            capacity_max=error_threshold,
            restore_rate_hz=restore_rate_hz,
            clock=clock,
        )

    def __enter__(self) -> None:
        print("Asked to perform request.")
        if self.is_blocking_execution():
            raise TooManyErrors("Not performing request. Too many failures")

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        exc_traceback: Optional[TracebackType],
    ) -> None:
        """Exit the context manager.

        If this method returns `True`, the runtime will ignore any raised
        exceptions in the body of the contextmanager. We don't use this
        feature, so this method returns `None`.
        """
        if exc_type is None:
            return

        # Check whether the raised exception is part of the configured
        # exceptions that the user wants to count. If so, record an extra
        # failure.
        for kind in self._exception_types:
            if issubclass(exc_type, kind):
                self.record_failure()

    def is_allowing_execution(self) -> bool:
        """
        Check if the circuit breaker is allowing execution.
        """
        return self._bucket.has_tokens_left()

    def is_blocking_execution(self) -> bool:
        """
        Check if the circuit breaker is blocking execution.
        """
        return not self.is_allowing_execution()

    def record_failure(self) -> None:
        """
        Record a failed call in the state of this circuit breaker.
        """
        self._bucket.take()

Stop executing code after too many exceptions have been occurred.

Allows a maximum number of error_threshold errors over a time window of time_window_secs. After this threshold has been exceeded, we disallow any further requests until enough time has passed.

See breaking.bucket.TokenBucket for further details on how requests are replenished.

If exceptions_types is passed, the CircuitBreaker will only count exceptions of the given types. All these exceptions are re-raised.

This class is a ContextManager, so you can use it in a with statement.

CircuitBreaker( self, error_threshold: int, time_window_secs: int, exceptions_types: Tuple[Type[Exception], ...] = (<class 'Exception'>,), clock: Optional[breaking.clock.Clock] = None )
View Source
    def __init__(
        self,
        error_threshold: int,
        time_window_secs: int,
        exceptions_types: Tuple[Type[Exception], ...] = (Exception,),
        clock: Optional[Clock] = None,
    ):
        self._exception_types = exceptions_types

        restore_rate_hz = error_threshold / time_window_secs

        if clock is None:
            clock = MonotonicClock()

        self._bucket = TokenBucket(
            capacity_max=error_threshold,
            restore_rate_hz=restore_rate_hz,
            clock=clock,
        )
def is_allowing_execution(self) -> bool:
View Source
    def is_allowing_execution(self) -> bool:
        """
        Check if the circuit breaker is allowing execution.
        """
        return self._bucket.has_tokens_left()

Check if the circuit breaker is allowing execution.

def is_blocking_execution(self) -> bool:
View Source
    def is_blocking_execution(self) -> bool:
        """
        Check if the circuit breaker is blocking execution.
        """
        return not self.is_allowing_execution()

Check if the circuit breaker is blocking execution.

def record_failure(self) -> None:
View Source
    def record_failure(self) -> None:
        """
        Record a failed call in the state of this circuit breaker.
        """
        self._bucket.take()

Record a failed call in the state of this circuit breaker.

class Clock(typing.Protocol):
View Source
class Clock(Protocol):
    """Interface that all clocks must conform to.

    You will get a `TypeError` if you try to instantiate this class. This is
    a `typing_extensions.Protocol`, which you can think of as an abstract base
    class.
    """

    def seconds_since_epoch(self) -> float:
        """Return the amount of seconds since clock epoch."""

Interface that all clocks must conform to.

You will get a TypeError if you try to instantiate this class. This is a typing_extensions.Protocol, which you can think of as an abstract base class.

Clock(*args, **kwargs)
View Source
def _no_init(self, *args, **kwargs):
    if type(self)._is_protocol:
        raise TypeError('Protocols cannot be instantiated')
def seconds_since_epoch(self) -> float:
View Source
    def seconds_since_epoch(self) -> float:
        """Return the amount of seconds since clock epoch."""

Return the amount of seconds since clock epoch.

class MonotonicClock:
View Source
class MonotonicClock:
    """Clock based on `time.monotonic()`"""

    def seconds_since_epoch(self) -> float:
        """Returns `time.monotonic()`"""
        return time.monotonic()

Clock based on time.monotonic()

MonotonicClock()
def seconds_since_epoch(self) -> float:
View Source
    def seconds_since_epoch(self) -> float:
        """Returns `time.monotonic()`"""
        return time.monotonic()

Returns time.monotonic()

class TooManyErrors(builtins.Exception):
View Source
class TooManyErrors(Exception):
    """
    Raised by `CircuitBreaker` when too many errors have occurred.
    """

Raised by CircuitBreaker when too many errors have occurred.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class MockClock:
View Source
class MockClock:
    """Clock that must be manually advanced for use in tests."""

    def __init__(self) -> None:
        self.time = 0.0

    def seconds_since_epoch(self) -> float:
        """Return the stored time value.

        This value does not increase by itself. You have to manually call
        `MockClock.advance_by()` in order to move this clock forward in time.
        """
        return self.time

    def advance_by(self, n: float) -> None:
        """Advance the clock by `n` seconds."""
        assert not n < 0, "Clock cannot go backwards"
        self.time += n

Clock that must be manually advanced for use in tests.

MockClock()
View Source
    def __init__(self) -> None:
        self.time = 0.0
def seconds_since_epoch(self) -> float:
View Source
    def seconds_since_epoch(self) -> float:
        """Return the stored time value.

        This value does not increase by itself. You have to manually call
        `MockClock.advance_by()` in order to move this clock forward in time.
        """
        return self.time

Return the stored time value.

This value does not increase by itself. You have to manually call MockClock.advance_by() in order to move this clock forward in time.

def advance_by(self, n: float) -> None:
View Source
    def advance_by(self, n: float) -> None:
        """Advance the clock by `n` seconds."""
        assert not n < 0, "Clock cannot go backwards"
        self.time += n

Advance the clock by n seconds.

@dataclass
class TokenBucket:
View Source
@dataclass
class TokenBucket:
    """
    Impose an upper bound on a number of events over a time interval.

    Use cases:

     - Rate limiting the number of requests for clients.
     - Circuit breaking after a certain number of exceptions have happened.

    ### How this works

    I will slowly build up to how token buckets work and why they are
    nice by showing other ways you could implement this (and
    limitations of those approaches).

    There are many related datastructures / algorithms in this space
    eith different trade-offs. What follows is a brief comparison
    between them.

    #### Fixed window counters

    Divide time in a series of buckets, and for each bucket keep track
    of the number of events that have happened.

    Example implementation using a `Dict[Time, Count]`:

    ```
    # An event happens at time `1612604808`
    { 1612604808: 1 }

    # Another event happens within the same second.
    { 1612604808: 2 }

    # Time moves forward into the next second.

    # A new event happens. Remove the counter for the previous
    # second and start anew.
    { 1612604808: 2, 1612604809: 1 }
      ^              ^ Limit checks now use this counter.
      Previous key. Will be deleted.
    ```

    Based on these counters, you can implement your rate limiting logic.
    Conceptually, this is very simple. However, there is a major limitation.

    **You can only consider event rates for the current
    bucket**. (Otherwise you would need to loop over the entire dict,
    which is inefficient). This introduces inaccuracies in counting
    when events happen when your buckets roll over.

    Consider:

    ```
    # We'd like to limit the number of requests to 10/second.

    # At `1612604808.95` (near the end of second `1612604808`), 10
    # requests come in.
    { 1612604808: 10 }

    # At `1612604809.05` (the beginning of second `1612604809`), 11
    # more requests come in.
    { 1612604808: 10, 1612604809: 10 }
      ^               ^ Used for comparison
      Not used anymore
    ```

    You conceptually had a rate limit of 10 requests / second. However
    at the second boundaries, you can actually allow up to 20 requests.

    #### Sliding window counter

    This is an approach to fix the inaccuracies above.

     - Store all event times in a sorted set, with millisecond accuracy.
     - When new events come in, add their time values to the new set.
     - Clean up all old requests from the set every second (this is fast
       because the set is sorted).

    This is more accurate, but trades accuracy for memory. Depending
    on the limits that you set, this could be a dealbreaker.

    #### Token buckets

    Token buckets are a way to fix the accuracy and the memory usage
    problems with the implementations above.

    Each bucket starts out full of tokens.

    ```
    |---| <- capacty_current, capacity_max
    |   |
    |   |
    |___|
    ```

    When events happen, the current capcaity is decremented:

    ```
    |   | <- capacity_max
    |   |
    |---| <- capacity_current
    |___|
    ```

    When the bucket runs out of tokens, no new events are allowed:

    ```
    |   | <- capacity_max
    |   |
    |   |
    |___| <- capacity_current
    ```

    Over time, the capacity is restored at `restore_rate_hz`, and
    new events are allowed again:

    ```
    |   | <- capacity_max
    |---| <- capacity_current
    |   |
    |___|
    ```

    By keeping track of the `last_restore` time, we can perform this
    restoration calcuation every time we try to take from the bucket.

    We get the full accuracy offered by the sorted set, but with less
    memory requirements. Nice.

    (This class also has a `fill()` operation, which is helpful when
    implementing circuit breakers.)

    #### Leaky Buckets

    **Token buckets and leaky buckets have different goals and usecases.**
    But since they appear very similar on the surface, I'm mentioning them
    for completeness.

    Both leaky and token buckets contain a backpressure mechanism, but
    they have different goals:

     - Leaky buckets turn bursty streams of events into a stream of
       constant rate.
     - Token buckets make sure no more than `X` events happen over a
       certain time window. The resulting stream has an upper bound on
       the number of elements, but is still allowed to be bursty.

    Or more concretely:

     - Leaky buckets buffer requests and output them at a steady rate. Limits
       are imposed when the buffer is full.
     - Token buckets allow reqeusts immediately. Limits are imposed when more
       than X requests happen in a period of time.

    Here is a table which compares the two:

    | bucket type             | token                  | leaky              |
    |-------------------------|------------------------|--------------------|
    | goal                    | impose upper bound     | control burstiness |
    | requests happen         | immediately            | at leak rate       |
    | backpressure when       | empty                  | full               |
    | analogous datastructure | replenishing semaphore | bounded queue      |

    """

    capacity_max: int = field()
    capacity_current: int = field(init=False)

    restore_rate_hz: float = field()
    """How much capacity should be restored every second?"""
    last_restore: float = field(init=False)
    """Time the last capacity restore took place."""

    clock: Clock = field(default_factory=MonotonicClock)
    """Clock implementation to use.

    This is configurable because to facilitate testing the behavior of
    this class without waiting for actual time to pass. See `breaking.clock`
    for further details."""

    def __post_init__(self) -> None:
        self.last_restore = self.clock.seconds_since_epoch()
        self.capacity_current = self.capacity_max

        if self.capacity_max < 1:
            raise ValueError("capacity_max must be >= 1")

        if self.restore_rate_hz < 1:
            raise ValueError("restore_rate_hz must be >= 1")

        if math.isnan(self.restore_rate_hz):
            raise ValueError("restore_rate_hz cannot be NaN")

        if math.isinf(self.restore_rate_hz):
            raise ValueError("restore_rate_hz cannot be Inf")

    def has_tokens_left(self, n: int = 1) -> bool:
        """
        Does the bucket have capacity for `n` more items?
        """
        assert n >= 1, "`n` must be >= 1"

        self._refill_capacity()
        return self.capacity_current - n >= 0

    def fill(self, n: int = 1) -> int:
        """
        Put `n` items into the bucket.

        You can use this method to "return capacity" to the bucket.
        The bucket will never fill beyond `capacity_max`.

        Returns:
            The amount of tokens that didn't fit in the bucket anymore.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current + n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

    def take(self, n: int = 1) -> int:
        """
        Take `n` items from the bucket.

        You can use this method to "drain capacity" from the bucket. The
        bucket capacity will never drop below `0`.

        Returns:
            The amount of tokens that could not be taken from the bucket.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current - n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

    def _refill_capacity(self) -> None:
        """
        Update the current capacity based on the restore rate.
        """
        now = self.clock.seconds_since_epoch()
        seconds_since_last_drain = now - self.last_restore

        capacity_to_restore = min(
            int(seconds_since_last_drain * self.restore_rate_hz),
            self.capacity_max,
        )
        self.capacity_current = min(
            self.capacity_current + capacity_to_restore, self.capacity_max
        )
        self.last_restore = now

Impose an upper bound on a number of events over a time interval.

Use cases:

  • Rate limiting the number of requests for clients.
  • Circuit breaking after a certain number of exceptions have happened.

How this works

I will slowly build up to how token buckets work and why they are nice by showing other ways you could implement this (and limitations of those approaches).

There are many related datastructures / algorithms in this space eith different trade-offs. What follows is a brief comparison between them.

Fixed window counters

Divide time in a series of buckets, and for each bucket keep track of the number of events that have happened.

Example implementation using a Dict[Time, Count]:

# An event happens at time `1612604808`
{ 1612604808: 1 }

# Another event happens within the same second.
{ 1612604808: 2 }

# Time moves forward into the next second.

# A new event happens. Remove the counter for the previous
# second and start anew.
{ 1612604808: 2, 1612604809: 1 }
  ^              ^ Limit checks now use this counter.
  Previous key. Will be deleted.

Based on these counters, you can implement your rate limiting logic. Conceptually, this is very simple. However, there is a major limitation.

You can only consider event rates for the current bucket. (Otherwise you would need to loop over the entire dict, which is inefficient). This introduces inaccuracies in counting when events happen when your buckets roll over.

Consider:

# We'd like to limit the number of requests to 10/second.

# At `1612604808.95` (near the end of second `1612604808`), 10
# requests come in.
{ 1612604808: 10 }

# At `1612604809.05` (the beginning of second `1612604809`), 11
# more requests come in.
{ 1612604808: 10, 1612604809: 10 }
  ^               ^ Used for comparison
  Not used anymore

You conceptually had a rate limit of 10 requests / second. However at the second boundaries, you can actually allow up to 20 requests.

Sliding window counter

This is an approach to fix the inaccuracies above.

  • Store all event times in a sorted set, with millisecond accuracy.
  • When new events come in, add their time values to the new set.
  • Clean up all old requests from the set every second (this is fast because the set is sorted).

This is more accurate, but trades accuracy for memory. Depending on the limits that you set, this could be a dealbreaker.

Token buckets

Token buckets are a way to fix the accuracy and the memory usage problems with the implementations above.

Each bucket starts out full of tokens.

|---| <- capacty_current, capacity_max
|   |
|   |
|___|

When events happen, the current capcaity is decremented:

|   | <- capacity_max
|   |
|---| <- capacity_current
|___|

When the bucket runs out of tokens, no new events are allowed:

|   | <- capacity_max
|   |
|   |
|___| <- capacity_current

Over time, the capacity is restored at restore_rate_hz, and new events are allowed again:

|   | <- capacity_max
|---| <- capacity_current
|   |
|___|

By keeping track of the last_restore time, we can perform this restoration calcuation every time we try to take from the bucket.

We get the full accuracy offered by the sorted set, but with less memory requirements. Nice.

(This class also has a fill() operation, which is helpful when implementing circuit breakers.)

Leaky Buckets

Token buckets and leaky buckets have different goals and usecases. But since they appear very similar on the surface, I'm mentioning them for completeness.

Both leaky and token buckets contain a backpressure mechanism, but they have different goals:

  • Leaky buckets turn bursty streams of events into a stream of constant rate.
  • Token buckets make sure no more than X events happen over a certain time window. The resulting stream has an upper bound on the number of elements, but is still allowed to be bursty.

Or more concretely:

  • Leaky buckets buffer requests and output them at a steady rate. Limits are imposed when the buffer is full.
  • Token buckets allow reqeusts immediately. Limits are imposed when more than X requests happen in a period of time.

Here is a table which compares the two:

bucket type token leaky
goal impose upper bound control burstiness
requests happen immediately at leak rate
backpressure when empty full
analogous datastructure replenishing semaphore bounded queue
TokenBucket( self, capacity_max: int, restore_rate_hz: float, clock: breaking.clock.Clock = <factory> )
capacity_max: int
capacity_current: int
restore_rate_hz: float

How much capacity should be restored every second?

last_restore: float

Time the last capacity restore took place.

Clock implementation to use.

This is configurable because to facilitate testing the behavior of this class without waiting for actual time to pass. See breaking.clock for further details.

def has_tokens_left(self, n: int = 1) -> bool:
View Source
    def has_tokens_left(self, n: int = 1) -> bool:
        """
        Does the bucket have capacity for `n` more items?
        """
        assert n >= 1, "`n` must be >= 1"

        self._refill_capacity()
        return self.capacity_current - n >= 0

Does the bucket have capacity for n more items?

def fill(self, n: int = 1) -> int:
View Source
    def fill(self, n: int = 1) -> int:
        """
        Put `n` items into the bucket.

        You can use this method to "return capacity" to the bucket.
        The bucket will never fill beyond `capacity_max`.

        Returns:
            The amount of tokens that didn't fit in the bucket anymore.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current + n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

Put n items into the bucket.

You can use this method to "return capacity" to the bucket. The bucket will never fill beyond capacity_max.

Returns: The amount of tokens that didn't fit in the bucket anymore. If this value is > 0, then the bucket didn't have capacity for that number of items.

def take(self, n: int = 1) -> int:
View Source
    def take(self, n: int = 1) -> int:
        """
        Take `n` items from the bucket.

        You can use this method to "drain capacity" from the bucket. The
        bucket capacity will never drop below `0`.

        Returns:
            The amount of tokens that could not be taken from the bucket.
            If this value is `> 0`, then the bucket didn't have capacity
            for that number of items.
        """
        assert n >= 1, "`n` must be >= 1"
        self._refill_capacity()
        new_capacity, clipped = clip(
            val=self.capacity_current - n, lower=0, upper=self.capacity_max
        )
        self.capacity_current = new_capacity
        return clipped

Take n items from the bucket.

You can use this method to "drain capacity" from the bucket. The bucket capacity will never drop below 0.

Returns: The amount of tokens that could not be taken from the bucket. If this value is > 0, then the bucket didn't have capacity for that number of items.