Python >> Tutoriel Python >  >> Python

Limitation des fonctions asynchrones dans Python Asyncio

Vous pouvez le faire en implémentant l'algorithme de seau qui fuit :

import asyncio
import contextlib
import collections
import time

from types import TracebackType
from typing import Dict, Optional, Type

try:  # Python 3.7
    base = contextlib.AbstractAsyncContextManager
    _current_task = asyncio.current_task
except AttributeError:
    base = object  # type: ignore
    _current_task = asyncio.Task.current_task  # type: ignore

class AsyncLeakyBucket(base):
    """A leaky bucket rate limiter.

    Allows up to max_rate / time_period acquisitions before blocking.

    time_period is measured in seconds; the default is 60.

    """
    def __init__(
        self,
        max_rate: float,
        time_period: float = 60,
        loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        self._loop = loop
        self._max_level = max_rate
        self._rate_per_sec = max_rate / time_period
        self._level = 0.0
        self._last_check = 0.0
        # queue of waiting futures to signal capacity to
        self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict()

    def _leak(self) -> None:
        """Drip out capacity from the bucket."""
        if self._level:
            # drip out enough level for the elapsed time since
            # we last checked
            elapsed = time.time() - self._last_check
            decrement = elapsed * self._rate_per_sec
            self._level = max(self._level - decrement, 0)
        self._last_check = time.time()

    def has_capacity(self, amount: float = 1) -> bool:
        """Check if there is enough space remaining in the bucket"""
        self._leak()
        requested = self._level + amount
        # if there are tasks waiting for capacity, signal to the first
        # there there may be some now (they won't wake up until this task
        # yields with an await)
        if requested < self._max_level:
            for fut in self._waiters.values():
                if not fut.done():
                    fut.set_result(True)
                    break
        return self._level + amount <= self._max_level

    async def acquire(self, amount: float = 1) -> None:
        """Acquire space in the bucket.

        If the bucket is full, block until there is space.

        """
        if amount > self._max_level:
            raise ValueError("Can't acquire more than the bucket capacity")

        loop = self._loop or asyncio.get_event_loop()
        task = _current_task(loop)
        assert task is not None
        while not self.has_capacity(amount):
            # wait for the next drip to have left the bucket
            # add a future to the _waiters map to be notified
            # 'early' if capacity has come up
            fut = loop.create_future()
            self._waiters[task] = fut
            try:
                await asyncio.wait_for(
                    asyncio.shield(fut),
                    1 / self._rate_per_sec * amount,
                    loop=loop
                )
            except asyncio.TimeoutError:
                pass
            fut.cancel()
        self._waiters.pop(task, None)

        self._level += amount

        return None

    async def __aenter__(self) -> None:
        await self.acquire()
        return None

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType]
    ) -> None:
        return None

Notez que nous perdons de la capacité du bucket de manière opportuniste, il n'est pas nécessaire d'exécuter une tâche asynchrone distincte juste pour abaisser le niveau ; au lieu de cela, la capacité est perdue lors du test d'une capacité restante suffisante.

Notez que les tâches qui attendent de la capacité sont conservées dans un dictionnaire ordonné, et lorsqu'il peut y avoir de nouveau de la capacité disponible, la première tâche encore en attente est réveillée plus tôt.

Vous pouvez l'utiliser comme gestionnaire de contexte ; essayer d'acquérir le bucket lorsqu'il est plein bloque jusqu'à ce qu'une capacité suffisante ait été à nouveau libérée :

bucket = AsyncLeakyBucket(100)

# ...

async with bucket:
    # only reached once the bucket is no longer full

ou vous pouvez appeler le acquire() directement :

await bucket.acquire()  # blocks until there is space in the bucket

ou vous pouvez simplement tester s'il y a d'abord de l'espace :

if bucket.has_capacity():
    # reject a request due to rate limiting

Notez que vous pouvez compter certaines requêtes comme "plus lourdes" ou "plus légères" en augmentant ou en diminuant la quantité que vous "gouttez" dans le seau :

await bucket.acquire(10)
if bucket.has_capacity(0.5):

Soyez prudent avec cela cependant; lors du mélange de gros et de petits goutteurs, les petits goutteurs ont tendance à s'écouler avant les gros goutteurs lorsqu'ils sont au débit maximum ou proches de celui-ci, car il est plus probable qu'il y ait suffisamment de capacité libre pour un petit goutteur avant qu'il n'y ait de la place pour un plus grand.

Démo :

>>> import asyncio, time
>>> bucket = AsyncLeakyBucket(5, 10)
>>> async def task(id):
...     await asyncio.sleep(id * 0.01)
...     async with bucket:
...         print(f'{id:>2d}: Drip! {time.time() - ref:>5.2f}')
...
>>> ref = time.time()
>>> tasks = [task(i) for i in range(15)]
>>> result = asyncio.run(asyncio.wait(tasks))
 0: Drip!  0.00
 1: Drip!  0.02
 2: Drip!  0.02
 3: Drip!  0.03
 4: Drip!  0.04
 5: Drip!  2.05
 6: Drip!  4.06
 7: Drip!  6.06
 8: Drip!  8.06
 9: Drip! 10.07
10: Drip! 12.07
11: Drip! 14.08
12: Drip! 16.08
13: Drip! 18.08
14: Drip! 20.09

Le seau est rempli rapidement au début en rafale, ce qui permet de répartir plus uniformément le reste des tâches; toutes les 2 secondes, suffisamment de capacité est libérée pour qu'une autre tâche soit traitée.

La taille de rafale maximale est égale à la valeur de débit maximale, dans la démonstration ci-dessus qui a été définie sur 5. Si vous ne souhaitez pas autoriser les rafales, définissez le débit maximal sur 1 et la période de temps sur le temps minimum entre les gouttes :

>>> bucket = AsyncLeakyBucket(1, 1.5)  # no bursts, drip every 1.5 seconds
>>> async def task():
...     async with bucket:
...         print(f'Drip! {time.time() - ref:>5.2f}')
...
>>> ref = time.time()
>>> tasks = [task() for _ in range(5)]
>>> result = asyncio.run(asyncio.wait(tasks))
Drip!  0.00
Drip!  1.50
Drip!  3.01
Drip!  4.51
Drip!  6.02

J'ai décidé de l'emballer en tant que projet Python :https://github.com/mjpieters/aiolimiter


Une autre solution - utilisant des sémaphores délimités - par un collègue, un mentor et un ami, est la suivante :

import asyncio


class AsyncLeakyBucket(object):

    def __init__(self, max_tasks: float, time_period: float = 60, loop: asyncio.events=None):
        self._delay_time = time_period / max_tasks
        self._sem = asyncio.BoundedSemaphore(max_tasks)
        self._loop = loop or asyncio.get_event_loop()
        self._loop.create_task(self._leak_sem())

    async def _leak_sem(self):
        """
        Background task that leaks semaphore releases based on the desired rate of tasks per time_period
        """
        while True:
            await asyncio.sleep(self._delay_time)
            try:
                self._sem.release()
            except ValueError:
                pass

    async def __aenter__(self) -> None:
        await self._sem.acquire()

    async def __aexit__(self, exc_type, exc, tb) -> None:
        pass

Peut toujours être utilisé avec le même async with bucket code comme dans la réponse de @Martijn