Limiting concurrency in Python asyncio: the story of async imap_unordered()
March 2023 ∙ 13 minute read ∙
So, you're doing some async stuff, repeatedly, many times.
Like, hundreds of thousands of times.
Maybe you're scraping some data.
Maybe it's more complicated – you're calling an API, and then passing the result to another one, and then saving the result of that.
Either way, it's a good idea to not do it all at once. For one, it's not polite to the services you're calling. For another, it'll load everything in memory, all at once.
In sync code, you might use a thread pool and imap_unordered():
pool = multiprocessing.dummy.Pool(2)
for result in pool.imap_unordered(do_stuff, things_to_do):
print(result)
Here, concurrency is limited by the fixed number of threads.
But what about async code? In this article, we'll look at a few ways of limiting concurrency in asycio, and find out which one is best.
Tip
No, it's not Semaphore, despite what Stack Overflow may tell you.
Tip
If you're in a hurry – it's wait().
Getting started #
In order to try things out more easily, we'll start with a test harness of sorts.
Our async map_unordered()
behaves pretty much like imap_unordered()
– it takes a coroutine function and an iterable of arguments,
and runs the resulting awaitables limit
at a time:
|
|
The actual running is done by limit_concurrency()
.
For now, we run them one by one
(we'll get back to this later on):
|
|
To simulate work being done, we just sleep():
|
|
Putting it all together,
we get a map_unordered.py LIMIT TIME...
script that does stuff in parallel,
printing timings as we get each result:
|
|
... like so:
$ python map_unordered.py 2
0.0: done
$ python map_unordered.py 2 .1 .2
0.1: 0.1
0.3: 0.2
0.3: done
Tip
If you need a refresher on lower level asyncio stuff related to waiting, check out Hynek Schlawack's excellent Waiting in asyncio.
asyncio.gather() #
In the Running Tasks Concurrently section of the asyncio docs, we find asyncio.gather(), which runs awaitables concurrently and returns their results.
We can use it to run limit
-sized batches:
|
|
This seems to work:
$ python map_unordered.py 2 .1 .2
0.2: 0.1
0.2: 0.2
0.2: done
... except:
$ python map_unordered.py 2 .1 .2 .2 .1
0.2: 0.1
0.2: 0.2
0.4: 0.2
0.4: 0.1
0.4: done
... those should fit in 0.3 seconds:
| sleep(.1) | sleep(.2) |
| sleep(.2) | sleep(.1) |
... but we're waiting for the entire batch to finish, even if some tasks finish earlier:
| sleep(.1) |...........| sleep(.2) |
| sleep(.2) | sleep(.1) |...........|
asyncio.Semaphore #
Screw the docs, too much to read; after some googling, the first few Stack Overflow answers all point to asyncio.Semaphore.
Like its threading counterpart,
we can use it to limit
how many times the body of a with
block is entered in parallel:
|
|
This works:
$ python map_unordered.py 2 .1 .2 .2 .1
0.3: 0.1
0.3: 0.2
0.3: 0.2
0.3: 0.1
0.3: done
... except, because gather() takes a sequence,
we end up consuming the entire aws
iterable
before gather() is even called.
Let's highlight this:
|
|
|
|
As expected:
$ python map_unordered.py 2 .1 .2 .2 .1
iter end
0.3: 0.1
0.3: 0.2
0.3: 0.2
0.3: 0.1
0.3: done
For small iterables, this is fine, but for bigger ones, creating all the tasks upfront without running them might cause memory issues. Also, if the iterable is lazy (e.g. it comes from a paginated API), we only start work after it's all consumed in memory, instead processing it in a streaming fashion.
asyncio.as_completed() #
At a glance, asyncio.as_completed() might do what we need – it takes an iterable of awaitables, runs them concurrently, and returns an iterator of coroutines that "can be awaited to get the earliest next result from the iterable of the remaining awaitables".
Sadly, it still consumes the iterable right away:
def as_completed(fs, *, timeout=None):
... # set-up
todo = {ensure_future(f, loop=loop) for f in set(fs)}
... # actual logic
But there's another, subtler issue.
as_completed() has no limits of its own – it's up to us to limit how fast we feed it awaitables. Presumably, we could wrap the input iterable into a generator that yields awaitables only if enough results came out the other end, and waits otherwise.
However, due to historical reasons,
as_completed() takes a plain-old-sync-iterator
– we cannot await
anything in its (sync) __next__(),
and sync waiting of any kind would block (and possibly deadlock)
the entire event loop.
So, no as_completed() for you.
asyncio.Queue #
Speaking of threading counterparts, how would you implement imap_unordered() if there was no Pool? Queues, of course!
And asyncio has its own Queue,
which you use in pretty much the same way:
start limit
worker tasks that loop forever,
each pulling awaitables, awaiting them,
and putting the results into a queue.
|
|
The iterable is exhausted before the last "batch" starts:
$ python map_unordered.py 2 .1 .2 .3 .3 .2 .1
0.1: 0.1
0.2: 0.2
0.4: 0.3
0.5: 0.3
iter end
0.6: 0.1
0.6: 0.2
0.6: done
I was going to work up to this in a few steps, but I'll just point out three common bugs this type of code might have (that apply to threads too).
First,
we could increment ndone
from the worker,
but this makes await queue.get()
hang forever for empty iterables,
since workers never get to run by the time we get to it;
because there's no other await, it's not even a race condition.
|
|
$ python map_unordered.py 2
iter end
Traceback (most recent call last):
...
asyncio.exceptions.TimeoutError
The solution is to signal the worker is done in-band, by putting a sentinel on the queue. I guess a good rule of thumb is that you want a put() for each get() without a timeout.1
Second,
you have to catch all exceptions;
otherwise,
the worker gets killed,
and get()
waits forever for a sentinel that will never come.
|
|
$ python map_unordered.py 2 .1 .2 0 .2 .1
0.1: 0.1
0.2: 0.2
0.4: 0.2
iter end
0.5: 0.1
Traceback (most recent call last):
...
asyncio.exceptions.TimeoutError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<limit_concurrency.<locals>.worker() done, defined at map_unordered.py:20> exception=ZeroDivisionError('float division by zero')>
Traceback (most recent call last):
...
ZeroDivisionError: float division by zero
Finally, our input iterator is synchronous (for now),
so no other task can run during next(aws)
.
But if it were async,
any number of tasks could await anext(aws)
in parallel,
leading to concurrency issues.
The fix is the same as with threads:
either protect that call with a Lock,
or feed awaitables to workers through an input queue.
Anyway, no need to worry about any of that – a better solution awaits.
Aside: backpressure #
At this point, we're technically done – the queue solution does everything Pool.imap_unordered() does.
So much so, that, like imap_unordered(),
it lacks backpressure:
when code consuming results from map_unordered()
cannot keep up with the tasks producing them,
the results accumulate in the internal queue,
with potentially infinite memory usage.
>>> pool = multiprocessing.dummy.Pool(1)
>>> for _ in pool.imap_unordered(print, range(4)):
... time.sleep(.1)
... print('got result')
...
0
1
2
3
got result
got result
got result
got result
>>> async def async_print(arg):
... print(arg, '(async)')
...
>>> async for _ in map_unordered(async_print, range(4), limit=1):
... await asyncio.sleep(.1)
... print('got result')
...
0 (async)
1 (async)
2 (async)
3 (async)
got result
got result
got result
got result
To fix this, we make the queue bounded, so that workers block while the queue is full.
|
|
>>> async for _ in map_unordered(async_print, range(5), limit=1):
... await asyncio.sleep(.1)
... print('got result')
...
0 (async)
1 (async)
2 (async)
got result
3 (async)
got result
4 (async)
got result
got result
got result
Alas, we can't do the same thing for Pool.imap_unordered() because don't have access to its queue, but that's a story for another time.
asyncio.wait() #
Pretending we're using threads works, but it's not all that idiomatic.
If only there was some sort of low level, select()-like primitive taking a set of tasks and blocking until at least one of them finishes. And of course there is – we've been purposefully avoiding it this entire time – it's asyncio.wait(), and it does exactly that.
By default, it waits until all tasks are completed, which isn't much better than gather().
But, with return_when=FIRST_COMPLETED
,
it waits until at least one task is completed.
We can use this to keep a limit
-sized set of running tasks
updated with new tasks as soon as the old ones finish:
|
|
We change limit_concurrency()
to yield awaitables instead of results,
so it's more symmetric – awaitables in, awaitables out.
map_unordered()
then becomes an async generator function,
instead of a sync function returning an async generator.
This is functionally the same,
but does make it a bit more self-documenting.
|
|
This implementation has all the properties that the Queue one has:
$ python map_unordered.py 2 .1 .2 .2 .1
0.1: 0.1
0.2: 0.2
0.3: 0.1
0.3: 0.2
iter end
0.3: done
... and backpressure too:
>>> async for _ in map_unordered(async_print, range(4), limit=1):
... await asyncio.sleep(.1)
... print('got result')
...
0 (async)
got result
1 (async)
got result
2 (async)
got result
3 (async)
got result
Async iterables #
OK, but what if we pass map_unordered()
an asynchronous iterable?
We are talking about async stuff, after all.
This opens up a whole looking-glass world of async iteration: instead of iter() you have aiter(), instead of next() you have anext(), some of them you await, some you don't... Thankfully, we can support both without making things much worse.
And we don't need to be particularly smart about it either;
we can just feed the current code an async iterable from main()
,
and punch our way through the exceptions:
|
|
|
|
$ python map_unordered.py 2 .1 .2 .2 .1
Traceback (most recent call last):
...
File "map_unordered.py", line 9, in map_unordered
aws = map(func, iterable)
TypeError: 'async_generator' object is not iterable
map() doesn't work with async iterables, so we use a generator expression instead.
|
|
In true easier to ask for forgiveness than permission style,
we handle the exception from map() instead of, say,
checking if aws
is an instance of collections.abc.Iterable.
We could wrap aws
to always be an async iterable,
but limit_concurrency()
is useful on it its own,
so it's better to support both.
$ python map_unordered.py 2 .1 .2 .2 .1
Traceback (most recent call last):
...
File "map_unordered.py", line 19, in limit_concurrency
aws = iter(aws)
TypeError: 'async_generator' object is not iterable
For async iterables, we need to use aiter():
|
|
$ python map_unordered.py 2 .1 .2 .2 .1
Traceback (most recent call last):
...
File "map_unordered.py", line 32, in limit_concurrency
aw = next(aws)
TypeError: 'async_generator' object is not an iterator
... and anext():
|
|
... which unlike aiter(), has to be awaited.
Here's limit_concurrency()
in its entire glory:
|
|
Not as clean as before, but it gets the job done:
$ python map_unordered.py 2 .1 .2 .2 .1
0.1: 0.1
0.2: 0.2
0.3: 0.1
0.3: 0.2
iter end
0.3: done
Anyway, that's it for now. Here's the final version of the code.
Learned something new today? Share this with others, it really helps!
Bonus: exceptions #
OK, so what about exceptions?
A lot of times,
you still want to do the rest of the things,
even if one fails.
Also, you probably want to know which one failed,
but the map_unordered()
results are not in order,
so how could you tell?
The most flexible solution is to let the user handle it just like they would with Pool.imap_unordered() – by decorating the original function. Here's one way of doing it:
|
|
|
|
$ python map_unordered.py 2 .1 .2 0 .2 .1
0.1: 0.1 -> 0.1
0.1: 0.0 -> float division by zero
0.2: 0.2 -> 0.2
0.3: 0.2 -> 0.2
0.3: 0.1 -> 0.1
iter end
0.3: done
Bonus: better decorators? #
Finally, here's a cool thing I learned from the asyncio docs.
When writing decorators, you can use partial() to bind the decorated function to an existing wrapper, instead of always returning a new one. The result is a more descriptive representation:
>>> return_args_and_exceptions(do_stuff)
functools.partial(<function _return_args_and_exceptions at 0x10647fd80>, <function do_stuff at 0x10647d8a0>)
Compare with the traditional version:
def return_args_and_exceptions(func):
async def wrapper(*args):
...
return wrapper
>>> return_args_and_exceptions(do_stuff)
<function return_args_and_exceptions.<locals>.wrapper at 0x103993560>
Does this have a fancy, academic name? Do let me know! [return]