Reference¶
- final class ExecPolicy[source]¶
Function execution policy for a scheduled task.
- CANCEL = 'CANCEL'¶
Cancel the previous call immediately if it’s still going and restart strictly on interval_s.
- WAIT = 'WAIT'¶
Wait for the previous call to finish.
This policy actually has a safe wait timeout, it’s just bigger than its interval_s. See
kaiju_scheduler.scheduler.Scheduler.wait_task_timeout_safe_mod
. On scheduler exit this task will still be cancelled immediately.
- SHIELD = 'SHIELD'¶
Shield the task from cancellation.
This will not allow the scheduler to cancel the task even on exit. The scheduler will try to wait until the task finishes.
Use this policy wisely because it can lead to stall tasks. Sometimes it may be useful though if it performs some sensitive operation.
Systems such as as Docker or web servers also have their own graceful timeouts. This policy doesn’t shield the task from being cancelled due to SIGTERM for obvious reasons.
- class RetryError[source]¶
Error recognized by
retry()
as suitable for retry.- __init__(*args, **kwargs)¶
- async retry(func: Callable[[...], Awaitable[Any]], retries: int, args: Iterable[Any] = tuple(), kws: Mapping[str, Any] = MappingProxyType({}), *, interval_s: float = 1.0, timeout_s: float = 120.0, catch_exceptions: Tuple[Type[BaseException], ...] = (TimeoutError, IOError, ConnectionError, RetryError), logger: Logger | None = None)[source]¶
Retry function call
- Parameters:
func – async callable
retries – number of retries
args – positional arguments
kws – keyword arguments
interval_s – interval in seconds between retries
timeout_s – total timeout in seconds for all retries
catch_exceptions – catch certain exception types and retry when they happen
logger – optional logger
- Returns:
returns the function result
- class Scheduler[source]¶
Schedule and execute local asynchronous functions periodically.
- wait_task_timeout_safe_mod: ClassVar[float] = 4.0¶
Timeout modifier for WAIT tasks (to prevent them waiting forever).
- logger: Logger | None = None¶
Optional logger instance.
- loop: AbstractEventLoop = None¶
Asyncio loop to use with the scheduler.
- tasks: List[ScheduledTask]¶
List of registered tasks.
- __init__(logger: Logger | None = None, loop: AbstractEventLoop = None) None ¶
- schedule_task(func: Callable, interval_s: float, args: tuple = tuple(), kws: Mapping = MappingProxyType({}), *, policy: ExecPolicy = ExecPolicy.CANCEL, max_timeout_s: float = 0, retries: int = 0, retry_interval_s: float = 0, name: str | None = None) ScheduledTask [source]¶
Schedule a periodic task.
- Parameters:
func – asynchronous function
args – input positional arguments
kws – input kw arguments
interval_s – schedule interval_s in seconds
policy – task execution policy
max_timeout_s – optional max timeout in seconds, for
CANCEL
the lowest between max_timeout_s and interval_s will be used, by default interval_s is used for cancelled tasks and interval_s * 4 for waited tasks max_timeout_s is ignored forSHIELD
policiesretries – number of retries if any, see
retry()
for more inforetry_interval_s – interval_s between retries, see
retry()
for more infoname – optional custom task name, which will be shown in the app’s server list of task
- Returns:
an instance of scheduled task
- final class ScheduledTask[source]¶
Scheduled task.
- __init__(scheduler: Scheduler, name: str, method: Callable[[...], Awaitable[Any]], args: tuple, kws: Mapping, interval_s: float, policy: ExecPolicy, max_timeout_s: float, retries: int, retry_interval_s: float)[source]¶
Initialize.
- name¶
- method¶
- args¶
- kws¶
- interval_s¶
- max_timeout_s¶
- retries¶
- retry_interval_s¶
- result¶
- class Server[source]¶
Simple asyncio server for internal function calls.
Provides a mechanism for request limiting, timeouts and retries.
- max_parallel_tasks: int = 256¶
Maximum number of parallel calls.
- logger: Logger | None = None¶
Optional logger instance.
- full: bool = False¶
Server is full and cant accept connections.
- closed: bool = True¶
Server is closed and cannot accept new requests.
- server_not_full: Final[Event]¶
The event is set when the server is not full and can accept new requests.
- server_empty: Final[Event]¶
The event is set when the server is empty and can now shut down.
- call_nowait(func: Callable[[...], Awaitable], args: Iterable[Any] = tuple(), kws: Mapping[str, Any] = MappingProxyType({}), *, request_timeout_s: float = 300, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task [source]¶
Create a new task and return immediately.
Note
The method is designed so the task doesn’t raise errors. It returns them instead in its result and passes them to the callback function if it was provided.
- Parameters:
func – The function to call
args – Positional arguments
kws – Keyword arguments
request_timeout_s – total max request time, the request will return asyncio.TimeoutError once the timeout is reached
callback – The callback function which will be called with the result
retries – How many times to retry the call
retry_interval_s – How long to wait before retries
task_name – custom asyncio task name
- Returns:
an asyncio task wrapper around the request
- Raises:
asyncio.QueueFull – if the server is full, use
full
orserver_not_full
event to check before submitting a requestServerClosed – if the server is closed and cannot accept a request
- call_many_nowait(batch: Collection[Tuple[Callable[[...], Awaitable], Iterable[Any], Mapping[str, Any]]], *, request_timeout_s: float = 300, abort_batch_on_error: bool = False, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task [source]¶
Create a new task batch and return immediately.
When batch will be called its requests will be executed in order which allows request chaining.
Note
The method is designed so the task doesn’t raise errors. It returns them instead in its result and passes them to the callback function if it was provided.
- Parameters:
batch – batch of requests i.e. a collection of (func, args, kws) tuples
request_timeout_s – total max request time for the whole batch, each request in a batch after the timeout will return asyncio.TimeoutError
abort_batch_on_error – abort the whole batch on a first exception, all subsequent requests in the batch will return an
Aborted
exceptionscallback – The callback function which will be called with the result
retries – How many times to retry the call
retry_interval_s – How long to wait before retries
task_name – custom asyncio task name
- Returns:
an asyncio task wrapper around the request
- Raises:
asyncio.QueueFull – if the server is full, use
full
orserver_not_full
event to check before submitting a requestServerClosed – if the server is closed and cannot accept a request
- async call(func: Callable[[...], Awaitable], args: Iterable[Any] = tuple(), kws: Mapping[str, Any] = MappingProxyType({}), *, request_timeout_s: float = 300, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task [source]¶
Same as
call_nowait()
but would wait for the server counter instead of raising a asyncio.QueueFull error.
- async call_many(batch: Collection[Tuple[Callable[[...], Awaitable], Iterable[Any], Mapping[str, Any]]], *, request_timeout_s: float = 300, abort_batch_on_error: bool = False, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task [source]¶
Same as
call_many_nowait()
but would wait for the server counter instead of raising a asyncio.QueueFull error.
- __init__(max_parallel_tasks: int = 256, logger: Logger | None = None) None ¶