main
1from __future__ import annotations
2
3import asyncio
4import functools
5from typing import TypeVar, Callable, Awaitable
6from typing_extensions import ParamSpec
7
8import anyio
9import sniffio
10import anyio.to_thread
11
12T_Retval = TypeVar("T_Retval")
13T_ParamSpec = ParamSpec("T_ParamSpec")
14
15
16async def to_thread(
17 func: Callable[T_ParamSpec, T_Retval], /, *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
18) -> T_Retval:
19 if sniffio.current_async_library() == "asyncio":
20 return await asyncio.to_thread(func, *args, **kwargs)
21
22 return await anyio.to_thread.run_sync(
23 functools.partial(func, *args, **kwargs),
24 )
25
26
27# inspired by `asyncer`, https://github.com/tiangolo/asyncer
28def asyncify(function: Callable[T_ParamSpec, T_Retval]) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
29 """
30 Take a blocking function and create an async one that receives the same
31 positional and keyword arguments.
32
33 Usage:
34
35 ```python
36 def blocking_func(arg1, arg2, kwarg1=None):
37 # blocking code
38 return result
39
40
41 result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1)
42 ```
43
44 ## Arguments
45
46 `function`: a blocking regular callable (e.g. a function)
47
48 ## Return
49
50 An async function that takes the same positional and keyword arguments as the
51 original one, that when called runs the same original function in a thread worker
52 and returns the result.
53 """
54
55 async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
56 return await to_thread(function, *args, **kwargs)
57
58 return wrapper