i3pyblocks._internal.misc

Module Contents

Functions

calculate_threshold(items: i3pyblocks._internal.models.Threshold, value: float) → Optional[str]

Calculates the threshold based on the current value and a dict.

non_nullable_dict(**kwargs) → Dict

Returns a dict without its keys with None values.

delegates(to_f, delegated_params=['args', 'kwargs'])

Delegate function parameters signature.

run_async(fn: Callable, executor: concurrent.futures.Executor = None) → Callable[Ellipsis, Awaitable[Any]]

Calls an sync function async.

get_aio_reader(loop: asyncio.AbstractEventLoop) → asyncio.StreamReader

i3pyblocks._internal.misc.calculate_threshold(items: i3pyblocks._internal.models.Threshold, value: float) Optional[str]

Calculates the threshold based on the current value and a dict.

i3pyblocks._internal.misc.non_nullable_dict(**kwargs) Dict

Returns a dict without its keys with None values.

i3pyblocks._internal.misc.delegates(to_f, delegated_params=['args', 'kwargs'])

Delegate function parameters signature.

Based on here: https://www.fast.ai/2019/08/06/delegation/

Allow delegation of functions or methods using *args and **kwargs. Meant to be used as a decorator.

For example:

def foo(a, b):
    pass

@delegates(foo)
def bar(**kwargs):
    foo(**kwargs)

When inspecting the parameters from bar, we will have bar(a, b) instead of bar(**kwargs).

Parameters
  • to_f – Target function/method to delegate.

  • replace – Arguments to replace in the destination function.

i3pyblocks._internal.misc.run_async(fn: Callable, executor: concurrent.futures.Executor = None) Callable[Ellipsis, Awaitable[Any]]

Calls an sync function async.

Based on here: https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn.

Basically it wraps an sync function and runs it inside a executor, so it runs it inside a thread or a processor and waits for it results.

It is meant to be used as a decorator, so if you want you can do:

@run_async
def sync_func(a, b):
    pass

await sync_func(1, b=2)

Or if you just want to use it directly:

await run_async(sync_func)(1, b=2)
Parameters

executorExecutor to be used to make the function async.

async i3pyblocks._internal.misc.get_aio_reader(loop: asyncio.AbstractEventLoop) asyncio.StreamReader