Reference

final class ExecPolicy[source]

Function execution policy for a scheduled task.

CANCEL = 'CANCEL'

Cancel the previous call immediately if it’s still going and restart strictly on interval_s.

WAIT = 'WAIT'

Wait for the previous call to finish.

This policy actually has a safe wait timeout, it’s just bigger than its interval_s. See kaiju_scheduler.scheduler.Scheduler.wait_task_timeout_safe_mod. On scheduler exit this task will still be cancelled immediately.

SHIELD = 'SHIELD'

Shield the task from cancellation.

This will not allow the scheduler to cancel the task even on exit. The scheduler will try to wait until the task finishes.

Use this policy wisely because it can lead to stall tasks. Sometimes it may be useful though if it performs some sensitive operation.

Systems such as as Docker or web servers also have their own graceful timeouts. This policy doesn’t shield the task from being cancelled due to SIGTERM for obvious reasons.

class RetryError[source]

Error recognized by retry() as suitable for retry.

__init__(*args, **kwargs)
async retry(func: Callable[[...], Awaitable[Any]], retries: int, args: Iterable[Any] = tuple(), kws: Mapping[str, Any] = MappingProxyType({}), *, interval_s: float = 1.0, timeout_s: float = 120.0, catch_exceptions: Tuple[Type[BaseException], ...] = (TimeoutError, IOError, ConnectionError, RetryError), logger: Logger | None = None)[source]

Retry function call

Parameters:
  • func – async callable

  • retries – number of retries

  • args – positional arguments

  • kws – keyword arguments

  • interval_s – interval in seconds between retries

  • timeout_s – total timeout in seconds for all retries

  • catch_exceptions – catch certain exception types and retry when they happen

  • logger – optional logger

Returns:

returns the function result

class Scheduler[source]

Schedule and execute local asynchronous functions periodically.

wait_task_timeout_safe_mod: ClassVar[float] = 4.0

Timeout modifier for WAIT tasks (to prevent them waiting forever).

logger: Logger | None = None

Optional logger instance.

loop: AbstractEventLoop = None

Asyncio loop to use with the scheduler.

tasks: List[ScheduledTask]

List of registered tasks.

__init__(logger: Logger | None = None, loop: AbstractEventLoop = None) None
schedule_task(func: Callable, interval_s: float, args: tuple = tuple(), kws: Mapping = MappingProxyType({}), *, policy: ExecPolicy = ExecPolicy.CANCEL, max_timeout_s: float = 0, retries: int = 0, retry_interval_s: float = 0, name: str | None = None) ScheduledTask[source]

Schedule a periodic task.

Parameters:
  • func – asynchronous function

  • args – input positional arguments

  • kws – input kw arguments

  • interval_s – schedule interval_s in seconds

  • policy – task execution policy

  • max_timeout_s – optional max timeout in seconds, for CANCEL the lowest between max_timeout_s and interval_s will be used, by default interval_s is used for cancelled tasks and interval_s * 4 for waited tasks max_timeout_s is ignored for SHIELD policies

  • retries – number of retries if any, see retry() for more info

  • retry_interval_s – interval_s between retries, see retry() for more info

  • name – optional custom task name, which will be shown in the app’s server list of task

Returns:

an instance of scheduled task

async start()[source]

Initialize the loop and enable all tasks.

async stop()[source]

Close and cancel all tasks.

Note that it will not clear the tasks list.

json_repr() Dict[str, Any][source]

Get JSON compatible object state info.

final class ScheduledTask[source]

Scheduled task.

__init__(scheduler: Scheduler, name: str, method: Callable[[...], Awaitable[Any]], args: tuple, kws: Mapping, interval_s: float, policy: ExecPolicy, max_timeout_s: float, retries: int, retry_interval_s: float)[source]

Initialize.

name
method
args
kws
interval_s
max_timeout_s
retries
retry_interval_s
result
enable() None[source]

Enable and schedule next run.

disable() None[source]

Disable the task for future execution.

This will not cancel the current execution if it’s already running.

suspend() _TaskSuspendCtx[source]

Temporarily suspend execution of a task within a context block.

async wait() None[source]

Wait until the current run has finished.

json_repr() Dict[str, Any][source]

Get JSON compatible object state info.

run() None[source]
class Server[source]

Simple asyncio server for internal function calls.

Provides a mechanism for request limiting, timeouts and retries.

max_parallel_tasks: int = 256

Maximum number of parallel calls.

logger: Logger | None = None

Optional logger instance.

full: bool = False

Server is full and cant accept connections.

closed: bool = True

Server is closed and cannot accept new requests.

server_not_full: Final[Event]

The event is set when the server is not full and can accept new requests.

server_empty: Final[Event]

The event is set when the server is empty and can now shut down.

call_nowait(func: Callable[[...], Awaitable], args: Iterable[Any] = tuple(), kws: Mapping[str, Any] = MappingProxyType({}), *, request_timeout_s: float = 300, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task[source]

Create a new task and return immediately.

Note

The method is designed so the task doesn’t raise errors. It returns them instead in its result and passes them to the callback function if it was provided.

Parameters:
  • func – The function to call

  • args – Positional arguments

  • kws – Keyword arguments

  • request_timeout_s – total max request time, the request will return asyncio.TimeoutError once the timeout is reached

  • callback – The callback function which will be called with the result

  • retries – How many times to retry the call

  • retry_interval_s – How long to wait before retries

  • task_name – custom asyncio task name

Returns:

an asyncio task wrapper around the request

Raises:
  • asyncio.QueueFull – if the server is full, use full or server_not_full event to check before submitting a request

  • ServerClosed – if the server is closed and cannot accept a request

call_many_nowait(batch: Collection[Tuple[Callable[[...], Awaitable], Iterable[Any], Mapping[str, Any]]], *, request_timeout_s: float = 300, abort_batch_on_error: bool = False, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task[source]

Create a new task batch and return immediately.

When batch will be called its requests will be executed in order which allows request chaining.

Note

The method is designed so the task doesn’t raise errors. It returns them instead in its result and passes them to the callback function if it was provided.

Parameters:
  • batch – batch of requests i.e. a collection of (func, args, kws) tuples

  • request_timeout_s – total max request time for the whole batch, each request in a batch after the timeout will return asyncio.TimeoutError

  • abort_batch_on_error – abort the whole batch on a first exception, all subsequent requests in the batch will return an Aborted exceptions

  • callback – The callback function which will be called with the result

  • retries – How many times to retry the call

  • retry_interval_s – How long to wait before retries

  • task_name – custom asyncio task name

Returns:

an asyncio task wrapper around the request

Raises:
  • asyncio.QueueFull – if the server is full, use full or server_not_full event to check before submitting a request

  • ServerClosed – if the server is closed and cannot accept a request

async call(func: Callable[[...], Awaitable], args: Iterable[Any] = tuple(), kws: Mapping[str, Any] = MappingProxyType({}), *, request_timeout_s: float = 300, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task[source]

Same as call_nowait() but would wait for the server counter instead of raising a asyncio.QueueFull error.

async call_many(batch: Collection[Tuple[Callable[[...], Awaitable], Iterable[Any], Mapping[str, Any]]], *, request_timeout_s: float = 300, abort_batch_on_error: bool = False, callback: Callable[[...], Awaitable] | None = None, retries: int = 0, retry_interval_s: float = 0, task_name: str | None = None) Task[source]

Same as call_many_nowait() but would wait for the server counter instead of raising a asyncio.QueueFull error.

async start()[source]
__init__(max_parallel_tasks: int = 256, logger: Logger | None = None) None
async stop()[source]
json_repr()[source]
timeout(time_sec: float, /)[source]

Execute async callables within a timeout.

async with timeout(5):
    await do_something_asynchronous()