So, you're doing some sync stuff.
But you also need to do some async stuff, without making everything async.
Maybe the sync stuff is an existing application.
Maybe you still want to use your favorite sync library.
Or maybe you need just a little async, without having to pay the full price.
Of course, you can run a coroutine with asyncio.run(), and blocking sync code from a coroutine with asyncio.to_thread(), but the former isn't granular enough, and the latter doesn't solve async code being at the top.
As always, there must be a better way.
Maybe something like this?
async def do_stuff(i): return i * 2 async def generate_stuff(i): for n in range(i): yield n runner = ThreadRunner() print(runner.run(do_stuff(2))) # 8 print(*runner.wrap_iter(generate_stuff(4))) # 0 1 2 3
Now, it would take a long time to fully explore how you might build up to such a thing and why you would actually need it... so that's exactly what we're going to do.
- But I need to do the thing more than once
- But I want a long-lived event loop
- But I want a long-lived session
- But I want to do something else during the async stuff
- But I want to do async stuff from multiple threads
- But I don't want to manage async context managers by hand
- But I want to use async iterables
- So, should you do this?
But I need to do the thing more than once
To make our example more life-like, we'll make some requests using aiohttp.
We create the session separately, since we might want to reuse it later on.
asyncio.run() allows you to execute a coroutine in a transient event loop; it's meant to be the entry point of your program, and "should ideally only be called once" – but you can call it multiple times:
$ python brigde.py got 200 in loop 4399589904 got 200 in loop 4386034640
But I want a long-lived event loop
Of course, creating and cleaning up a loop for each call isn't that efficient, and we might have loop-bound resources that need to live across calls.
With a bit of diving into asyncio guts, we can manage our own long-lived loop:
These are low-level APIs, so a bit of care is needed to use them correctly:
- We could have used get_event_loop() to both create and set the loop, but this behavior was deprecated in Python 3.10.
- loop.close() already schedules executor shutdown, but does not wait for it to finish; for predictable behavior, it's best to wait cleanup tasks explicitly.
Anyway, it works – note the loop id:
$ python brigde.py got 200 in loop 4509807312 got 200 in loop 4509807312
Thankfully, starting with Python 3.11, asyncio.Runner makes this much easier:
But I want a long-lived session
Reusing the loop is not enough, though.
We're throwing the Session away after each request, and with it the associated connection pool, which means each request creates a new connection. Reusing connections can make repeated requests much faster – most "client" libraries (e.g. aiobotocore) keep a connection pool around for this reason.
Creating the session outside an async function and passing it in works:
...but with a bunch of warnings:
$ python brigde.py .../bridge.py:16: DeprecationWarning: The object should be created within an async function session = aiohttp.ClientSession() got 200 in loop 4570601360 got 200 in loop 4570601360 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x10ff01c90>
The first one's easy – the constructor wants to be called in an async function, so we write one to call it in (to pass in constructor arguments, we can use a partial()):
The second one happens because
we're expected to
async session.close() after we're done with it.
But we didn't do that before –
we used the (async)
the idiomatic way for objects to clean up after themselves.
async with is syntactic sugar over
calling the asynchronous context manager protocol methods
in a specific way,
we can simulate it by doing that:
But I want to do something else during the async stuff
OK, we have a long-lived event loop and a long-lived session.
But when doing async stuff, we can't do anything else, because we are busy running the event loop; and as a consequence of that, outside of run() all async stuff is frozen.
So let's use the other way of doing more than one thing at a time, and call run() from another thread. First, more logging:
$ python brigde.py got 200 in loop 4538933264 in MainThread got 200 in loop 4538933264 in MainThread
Hey, it works!
$ python brigde.py got 200 in loop 4448024592 in Thread-1 (run)
But I want to do async stuff from multiple threads
OK, but does it work from two threads? This can be the case in an existing application. Or we can deliberately use threads most of the time, because they work with normal code, and only use async as an implementation detail. Or, we just have one other thread, like above, but then accidentally call run() from the main thread.
Glad you asked!
$ python brigde.py Exception in thread Thread-2 (run): Traceback (most recent call last): ... File ".../asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File ".../asyncio/base_events.py", line 629, in run_until_complete self._check_running() File ".../asyncio/base_events.py", line 588, in _check_running raise RuntimeError('This event loop is already running') RuntimeError: This event loop is already running got 200 in loop 4544447440 in Thread-1 (run) got 200 in loop 4544447440 in Thread-1 (run)
Not only did one thread crash, both coroutines ran in the other thread. Worse, sometimes only one runs. In a long-running program with infrequent uses of run(), we might not even get a hard failure, but subtle, mysterious bugs. 👻
And why not? The docs have a Concurrency and Multithreading section that warns:
Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback.
...and that's exactly what we've been doing. But the warning comes with a solution:
So, we want a loop running continuously in another thread, not just when we call run():
We have to wait for the loop to be created in its own thread, otherwise the main thread can race ahead, create the loop first with its get_loop() call, and then deadlock when run_coroutine_threadsafe() is passed a loop that isn't running yet.
We can go ahead and plug in the new
winding things down after we're done:
This time, it really works:
$ python brigde.py got 200 in loop 4444839248 in LoopThread got 200 in loop 4444839248 in LoopThread
Getting that in the right order every time looks tricky, though, so we should probably do something about it. Just like asyncio.Runner, we'll provide a higher-level API, and since it already does similar work, we get to reuse its API design for free:
This means using it looks just the same, which is pretty neat:
But I don't want to manage async context managers by hand
If I'm being honest, this whole
async with by calling the protocol methods"
is getting kinda tedious.
But now we have a high-level API,
so we can extend it with a sync context manager to take care of it for us.
You use it like so:
That's good, but we can do better. Here, we don't really need granular control over the lifetime of the session, we just want it to live as long as the event loop, so it would be nice to give users a shorthand for that.2 We can use an ExitStack to enter context managers on the user's behalf and exit them when the runner is closed.
Now, we just need to do:
But I want to use async iterables
async with session.get('https://death.andgravity.com/') as response: lines = [line async for line in response.content] print('got', len(lines), 'lines')
async for has its own protocol we can wrap;
for slightly better error messages,
we'll use the aiter() and anext() built-ins
instead of calling the corresponding special methods directly:
You use it like so:
$ python brigde.py got 624 lines
And with this, we're done!
Here's the final version of the code.
So, should you do this?
I used something like this on a "production" project without any problems.
Performance might be an issue if you
run() many small tasks,
but the overhead for bigger tasks should be negligible,
especially compared with the cost of doing I/O
(for example, returning an entire HTTP response is likely fine,
wrapping an iterator of lines might not be).
In my case, I was forced to use an async library that didn't have a sync counterpart, and the benefit of retaining a mostly sync code base was overwhelming.
Learned something new today? Share this with others, it really helps!
Which itself comes with another warning: to handle signals and to execute subprocesses, the event loop must be run in the main thread; this is a sacrifice we're willing to make. [return]
It could be argued that it is not ThreadRunner's job to do this, but remember it exists to make it easy to use async code from sync code, and this use case is quite common; also, there is precedent for this kind of API, and something something copy, something something steal. [return]