Running async code from sync in Python asyncio

July 2023 ∙ 10 minute read ∙

So, you're doing some sync stuff.

But you also need to do some async stuff, without making everything async.

Maybe the sync stuff is an existing application.

Maybe you still want to use your favorite sync library.

Or maybe you need just a little async, without having to pay the full price.

Of course, you can run a coroutine with asyncio.run(), and blocking sync code from a coroutine with asyncio.to_thread(), but the former isn't granular enough, and the latter doesn't solve async code being at the top.

As always, there must be a better way.


Maybe something like this?

async def do_stuff(i):
    return i * 2

async def generate_stuff(i):
   for n in range(i):
       yield n

runner = ThreadRunner()
print(runner.run(do_stuff(2)))  # 8
print(*runner.wrap_iter(generate_stuff(4)))  # 0 1 2 3

Now, it would take a long time to fully explore how you might build up to such a thing and why you would actually need it... so that's exactly what we're going to do.

But I need to do the thing more than once #

To make our example more life-like, we'll make some requests using aiohttp.

 9
10
11
12
async def do_stuff(session):
    async with session.get('https://death.andgravity.com/') as response:
        loop = asyncio.get_running_loop()
        print('got', response.status, 'in loop', id(loop))

We create the session separately, since we might want to reuse it later on.

15
16
17
async def main():
    async with aiohttp.ClientSession() as session:
        await do_stuff(session)

asyncio.run() allows you to execute a coroutine in a transient event loop; it's meant to be the entry point of your program, and "should ideally only be called once" – but you can call it multiple times:

20
21
asyncio.run(main())
asyncio.run(main())
$ python brigde.py
got 200 in loop 4399589904
got 200 in loop 4386034640

But I want a long-lived event loop #

Of course, creating and cleaning up a loop for each call isn't that efficient, and we might have loop-bound resources that need to live across calls.

With a bit of diving into asyncio guts, we can manage our own long-lived loop:

20
21
22
23
24
25
26
27
28
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
    loop.run_until_complete(main())
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.run_until_complete(loop.shutdown_default_executor())
    loop.close()

These are low-level APIs, so a bit of care is needed to use them correctly:

  • We could have used get_event_loop() to both create and set the loop, but this behavior was deprecated in Python 3.10.
  • loop.close() already schedules executor shutdown, but does not wait for it to finish; for predictable behavior, it's best to wait cleanup tasks explicitly.

Anyway, it works – note the loop id:

$ python brigde.py
got 200 in loop 4509807312
got 200 in loop 4509807312

Thankfully, starting with Python 3.11, asyncio.Runner makes this much easier:

20
21
22
with asyncio.Runner() as runner:
    runner.run(main())
    runner.run(main())

But I want a long-lived session #

Reusing the loop is not enough, though.

We're throwing the Session away after each request, and with it the associated connection pool, which means each request creates a new connection. Reusing connections can make repeated requests much faster – most "client" libraries (e.g. aiobotocore) keep a connection pool around for this reason.

Creating the session outside an async function and passing it in works:

15
16
17
18
with asyncio.Runner() as runner:
    session = aiohttp.ClientSession()
    runner.run(do_stuff(session))
    runner.run(do_stuff(session))

...but with a bunch of warnings:

$ python brigde.py
.../bridge.py:16: DeprecationWarning: The object should be created within an async function
  session = aiohttp.ClientSession()
got 200 in loop 4570601360
got 200 in loop 4570601360
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10ff01c90>

The first one's easy – the constructor wants to be called in an async function, so we write one to call it in (to pass in constructor arguments, we can use a partial()):

 9
10
async def call_async(callable):
    return callable()
19
20
with asyncio.Runner() as runner:
    session = runner.run(call_async(aiohttp.ClientSession))

The second one happens because we're expected to async session.close() after we're done with it. But we didn't do that before – we used the (async) with statement, the idiomatic way for objects to clean up after themselves. Since async with is syntactic sugar over calling the asynchronous context manager protocol methods in a specific way, we can simulate it by doing that:

19
20
21
22
23
24
25
26
with asyncio.Runner() as runner:
    session = runner.run(call_async(aiohttp.ClientSession))
    runner.run(session.__aenter__())
    try:
        runner.run(do_stuff(session))
        runner.run(do_stuff(session))
    finally:
        runner.run(session.__aexit__(None, None, None))

But I want to do something else during the async stuff #

OK, we have a long-lived event loop and a long-lived session.

But when doing async stuff, we can't do anything else, because we are busy running the event loop; and as a consequence of that, outside of run() all async stuff is frozen.

So let's use the other way of doing more than one thing at a time, and call run() from another thread. First, more logging:

13
14
15
16
17
async def do_stuff(session):
    async with session.get('https://death.andgravity.com/') as response:
        loop = asyncio.get_running_loop()
        thread = threading.current_thread()
        print('got', response.status, 'in loop', id(loop), 'in', thread.name)
$ python brigde.py
got 200 in loop 4538933264 in MainThread
got 200 in loop 4538933264 in MainThread

Next up, the thread stuff – in actual code, the "do something else" part would happen between thread.start() and thread.join().

20
21
22
23
24
25
26
27
28
def do_stuff_in_threads(run, session, n=1):
    threads = [
        threading.Thread(target=run, args=(do_stuff(session),))
        for _ in range(n)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
31
32
33
34
35
36
37
with asyncio.Runner() as runner:
    session = runner.run(call_async(aiohttp.ClientSession))
    runner.run(session.__aenter__())
    try:
        do_stuff_in_threads(runner.run, session)
    finally:
        runner.run(session.__aexit__(None, None, None))

Hey, it works!

$ python brigde.py
got 200 in loop 4448024592 in Thread-1 (run)

But I want to do async stuff from multiple threads #

OK, but does it work from two threads? This can be the case in an existing application. Or we can deliberately use threads most of the time, because they work with normal code, and only use async as an implementation detail. Or, we just have one other thread, like above, but then accidentally call run() from the main thread.

Glad you asked!

35
        do_stuff_in_threads(runner.run, session, n=2)
$ python brigde.py
Exception in thread Thread-2 (run):
Traceback (most recent call last):
  ...
  File ".../asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
  File ".../asyncio/base_events.py", line 629, in run_until_complete
    self._check_running()
  File ".../asyncio/base_events.py", line 588, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
got 200 in loop 4544447440 in Thread-1 (run)
got 200 in loop 4544447440 in Thread-1 (run)

Not only did one thread crash, both coroutines ran in the other thread. Worse, sometimes only one runs. In a long-running program with infrequent uses of run(), we might not even get a hard failure, but subtle, mysterious bugs. 👻

And why not? The docs have a Concurrency and Multithreading section that warns:

Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback.

...and that's exactly what we've been doing. But the warning comes with a solution:

To schedule a coroutine object from a different OS thread, the run_coroutine_threadsafe() function should be used.1

So, we want a loop running continuously in another thread, not just when we call run():

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
runner = asyncio.Runner()
loop_created = threading.Event()

def run_forever():
    with runner:
        loop = runner.get_loop()
        asyncio.set_event_loop(loop)
        loop_created.set()
        loop.run_forever()

loop_thread = threading.Thread(target=run_forever, name='LoopThread')
loop_thread.start()
loop_created.wait()
loop = runner.get_loop()

def run(coro):
    return asyncio.run_coroutine_threadsafe(coro, loop).result()

We have to wait for the loop to be created in its own thread, otherwise the main thread can race ahead, create the loop first with its get_loop() call, and then deadlock when run_coroutine_threadsafe() is passed a loop that isn't running yet.

We can go ahead and plug in the new run(), winding things down after we're done:

49
50
51
52
53
54
55
56
57
58
try:
    session = run(call_async(aiohttp.ClientSession))
    run(session.__aenter__())
    try:
        do_stuff_in_threads(run, session, n=2)
    finally:
        run(session.__aexit__(None, None, None))
finally:
    loop.call_soon_threadsafe(loop.stop)
    loop_thread.join()

This time, it really works:

$ python brigde.py
got 200 in loop 4444839248 in LoopThread
got 200 in loop 4444839248 in LoopThread

Getting that in the right order every time looks tricky, though, so we should probably do something about it. Just like asyncio.Runner, we'll provide a higher-level API, and since it already does similar work, we get to reuse its API design for free:

 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ThreadRunner:

    def __init__(self, *args, **kwargs):
        self._runner = asyncio.Runner(*args, **kwargs)
        self._thread = None

    def __enter__(self):
        self._lazy_init()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def close(self):
        loop = self.get_loop()
        loop.call_soon_threadsafe(loop.stop)
        self._thread.join()

    def get_loop(self):
        self._lazy_init()
        return self._runner.get_loop()

    def run(self, coro):
        loop = self.get_loop()
        return asyncio.run_coroutine_threadsafe(coro, loop).result()

    def _lazy_init(self):
        if self._thread:
            return

        loop_created = threading.Event()

        def run_forever():
            with self._runner as runner:
                loop = runner.get_loop()
                asyncio.set_event_loop(loop)
                loop_created.set()
                loop.run_forever()

        self._thread = threading.Thread(
            target=run_forever, name='LoopThread', daemon=True
        )
        self._thread.start()
        loop_created.wait()

This means using it looks just the same, which is pretty neat:

77
78
79
80
81
82
83
with ThreadRunner() as runner:
    session = runner.run(call_async(aiohttp.ClientSession))
    runner.run(session.__aenter__())
    try:
        do_stuff_in_threads(runner.run, session, n=2)
    finally:
        runner.run(session.__aexit__(None, None, None))
Liking this so far? Here's another article you might like:

But I don't want to manage async context managers by hand #

If I'm being honest, this whole "simulate async with by calling the protocol methods" is getting kinda tedious. But now we have a high-level API, so we can extend it with a sync context manager to take care of it for us.

54
55
56
57
58
59
60
61
62
63
64
65
66
67
    @contextlib.contextmanager
    def wrap_context(self, cm=None, factory=None):
        if cm is None:
            cm = self.run(call_async(factory))
        aenter = type(cm).__aenter__
        aexit = type(cm).__aexit__
        value = self.run(aenter(cm))
        try:
            yield value
        except:
            if not self.run(aexit(cm, *sys.exc_info())):
                raise
        else:
            self.run(aexit(cm, None, None, None))

This is the same code as before, except we're going by the book with what gets called when; see Brett Cannon's Unravelling the async with statement for details.

You use it like so:

92
93
94
with ThreadRunner() as runner:
    with runner.wrap_context(factory=aiohttp.ClientSession) as session:
        do_stuff_in_threads(runner.run, session, n=2)

That's good, but we can do better. Here, we don't really need granular control over the lifetime of the session, we just want it to live as long as the event loop, so it would be nice to give users a shorthand for that.2 We can use an ExitStack to enter context managers on the user's behalf and exit them when the runner is closed.

11
12
13
14
    def __init__(self, *args, **kwargs):
        self._runner = asyncio.Runner(*args, **kwargs)
        self._thread = None
        self._stack = contextlib.ExitStack()
70
71
72
    def enter_context(self, cm=None, factory=None):
        cm = self.wrap_context(cm=cm, factory=factory)
        return self._stack.enter_context(cm)
23
24
25
26
27
28
29
    def close(self):
        try:
            self._stack.close()
        finally:
            loop = self.get_loop()
            loop.call_soon_threadsafe(loop.stop)
            self._thread.join()

Now, we just need to do:

100
101
102
with ThreadRunner() as runner:
    session = runner.enter_context(factory=aiohttp.ClientSession)
    do_stuff_in_threads(runner.run, session, n=2)

But I want to use async iterables #

One final thing: how would you consume an asynchronous iterable like the content of an aiohttp response? In async code, it would look something like this:

async with session.get('https://death.andgravity.com/') as response:
    lines = [line async for line in response.content]
print('got', len(lines), 'lines')

Like async with, async for has its own protocol we can wrap; for slightly better error messages, we'll use the aiter() and anext() built-ins instead of calling the corresponding special methods directly:

77
78
79
80
81
82
83
    def wrap_iter(self, it):
        it = aiter(it)
        while True:
            try:
                yield runner.run(anext(it))
            except StopAsyncIteration:
                break

You use it like so:

108
109
110
111
112
113
with ThreadRunner() as runner:
    session = runner.enter_context(factory=aiohttp.ClientSession)
    coroutine = session.get('https://death.andgravity.com/')
    with runner.wrap_context(coroutine) as response:
        lines = [line for line in runner.wrap_iter(response.content)]
    print('got', len(lines), 'lines')
$ python brigde.py
got 624 lines

And with this, we're done!

Here's the final version of the code.

So, should you do this? #

I guess?

I used something like this on a "production" project without any problems.

Performance might be an issue if you run() many small tasks, but the overhead for bigger tasks should be negligible, especially compared with the cost of doing I/O (for example, returning an entire HTTP response is likely fine, wrapping an iterator of lines might not be).

In my case, I was forced to use an async library that didn't have a sync counterpart, and the benefit of retaining a mostly sync code base was overwhelming.

Learned something new today? Share this with others, it really helps!

If you've made it this far, you might like:
  1. Which itself comes with another warning: to handle signals and to execute subprocesses, the event loop must be run in the main thread; this is a sacrifice we're willing to make. [return]

  2. It could be argued that it is not ThreadRunner's job to do this, but remember it exists to make it easy to use async code from sync code, and this use case is quite common; also, there is precedent for this kind of API, and something something copy, something something steal. [return]