main
 1from __future__ import annotations
 2
 3import asyncio
 4import functools
 5from typing import TypeVar, Callable, Awaitable
 6from typing_extensions import ParamSpec
 7
 8import anyio
 9import sniffio
10import anyio.to_thread
11
12T_Retval = TypeVar("T_Retval")
13T_ParamSpec = ParamSpec("T_ParamSpec")
14
15
16async def to_thread(
17    func: Callable[T_ParamSpec, T_Retval], /, *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
18) -> T_Retval:
19    if sniffio.current_async_library() == "asyncio":
20        return await asyncio.to_thread(func, *args, **kwargs)
21
22    return await anyio.to_thread.run_sync(
23        functools.partial(func, *args, **kwargs),
24    )
25
26
27# inspired by `asyncer`, https://github.com/tiangolo/asyncer
28def asyncify(function: Callable[T_ParamSpec, T_Retval]) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
29    """
30    Take a blocking function and create an async one that receives the same
31    positional and keyword arguments.
32
33    Usage:
34
35    ```python
36    def blocking_func(arg1, arg2, kwarg1=None):
37        # blocking code
38        return result
39
40
41    result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1)
42    ```
43
44    ## Arguments
45
46    `function`: a blocking regular callable (e.g. a function)
47
48    ## Return
49
50    An async function that takes the same positional and keyword arguments as the
51    original one, that when called runs the same original function in a thread worker
52    and returns the result.
53    """
54
55    async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
56        return await to_thread(function, *args, **kwargs)
57
58    return wrapper