Limiting concurrency in Python asyncio: the story of async imap_unordered()

March 2023 ∙ 13 minute read ∙

So, you're doing some async stuff, repeatedly, many times.

Like, hundreds of thousands of times.

Maybe you're scraping some data.

Maybe it's more complicated – you're calling an API, and then passing the result to another one, and then saving the result of that.

Either way, it's a good idea to not do it all at once. For one, it's not polite to the services you're calling. For another, it'll load everything in memory, all at once.

In sync code, you might use a thread pool and imap_unordered():

pool = multiprocessing.dummy.Pool(2)
for result in pool.imap_unordered(do_stuff, things_to_do):
    print(result)

Here, concurrency is limited by the fixed number of threads.

But what about async code? In this article, we'll look at a few ways of limiting concurrency in asycio, and find out which one is best.

Tip

No, it's not Semaphore, despite what Stack Overflow may tell you.

Tip

If you're in a hurry – it's wait().

Getting started #

In order to try things out more easily, we'll start with a test harness of sorts.

Our async map_unordered() behaves pretty much like imap_unordered() – it takes a coroutine function and an iterable of arguments, and runs the resulting awaitables limit at a time:

 8
 9
10
def map_unordered(func, iterable, *, limit):
    aws = map(func, iterable)
    return limit_concurrency(aws, limit)

The actual running is done by limit_concurrency(). For now, we run them one by one (we'll get back to this later on):

13
14
15
16
17
18
19
20
async def limit_concurrency(aws, limit):
    aws = iter(aws)
    while True:
        try:
            aw = next(aws)
        except StopIteration:
            break
        yield await aw

To simulate work being done, we just sleep():

23
24
25
26
async def do_stuff(i):
    1 / i  # raises ZeroDivisionError for i == 0
    await asyncio.sleep(i)
    return i

Putting it all together, we get a map_unordered.py LIMIT TIME... script that does stuff in parallel, printing timings as we get each result:

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def async_main(args, limit):
    start = time.monotonic()
    async for result in map_unordered(do_stuff, args, limit=limit):
        print(f"{(time.monotonic() - start):.1f}: {result}")
    print(f"{(time.monotonic() - start):.1f}: done")


def main():
    limit = int(sys.argv[1])
    args = [float(n) for n in sys.argv[2:]]
    timeout = sum(args) + 0.1
    asyncio.run(asyncio.wait_for(async_main(args, limit), timeout))


if __name__ == '__main__':
    main()

... like so:

$ python map_unordered.py 2
0.0: done
$ python map_unordered.py 2 .1 .2
0.1: 0.1
0.3: 0.2
0.3: done

Tip

If you need a refresher on lower level asyncio stuff related to waiting, check out Hynek Schlawack's excellent Waiting in asyncio.

asyncio.gather() #

In the Running Tasks Concurrently section of the asyncio docs, we find asyncio.gather(), which runs awaitables concurrently and returns their results.

We can use it to run limit-sized batches:

13
14
15
16
17
18
19
20
async def limit_concurrency(aws, limit):
    aws = iter(aws)
    while True:
        batch = list(itertools.islice(aws, limit))
        if not batch:
            break
        for result in await asyncio.gather(*batch):
            yield result

This seems to work:

$ python map_unordered.py 2 .1 .2
0.2: 0.1
0.2: 0.2
0.2: done

... except:

$ python map_unordered.py 2 .1 .2 .2 .1
0.2: 0.1
0.2: 0.2
0.4: 0.2
0.4: 0.1
0.4: done

... those should fit in 0.3 seconds:

| sleep(.1) |       sleep(.2)       |
|       sleep(.2)       | sleep(.1) |

... but we're waiting for the entire batch to finish, even if some tasks finish earlier:

| sleep(.1) |...........|       sleep(.2)       |
|       sleep(.2)       | sleep(.1) |...........|

asyncio.Semaphore #

Screw the docs, too much to read; after some googling, the first few Stack Overflow answers all point to asyncio.Semaphore.

Like its threading counterpart, we can use it to limit how many times the body of a with block is entered in parallel:

13
14
15
16
17
18
19
20
21
async def limit_concurrency(aws, limit):
    semaphore = asyncio.Semaphore(limit)

    async def wrapper(aw):
        async with semaphore:
            return await aw

    for result in await asyncio.gather(*map(wrapper, aws)):
        yield result

This works:

$ python map_unordered.py 2 .1 .2 .2 .1
0.3: 0.1
0.3: 0.2
0.3: 0.2
0.3: 0.1
0.3: done

... except, because gather() takes a sequence, we end up consuming the entire aws iterable before gather() is even called. Let's highlight this:

37
38
39
40
def on_iter_end(it, callback):
    for x in it:
        yield x
    callback()
46
47
48
    timeout = sum(args) + 0.1
    args = on_iter_end(args, lambda: print("iter end"))
    asyncio.run(asyncio.wait_for(async_main(args, limit), timeout))

As expected:

$ python map_unordered.py 2 .1 .2 .2 .1
iter end
0.3: 0.1
0.3: 0.2
0.3: 0.2
0.3: 0.1
0.3: done

For small iterables, this is fine, but for bigger ones, creating all the tasks upfront without running them might cause memory issues. Also, if the iterable is lazy (e.g. it comes from a paginated API), we only start work after it's all consumed in memory, instead processing it in a streaming fashion.

asyncio.as_completed() #

At a glance, asyncio.as_completed() might do what we need – it takes an iterable of awaitables, runs them concurrently, and returns an iterator of coroutines that "can be awaited to get the earliest next result from the iterable of the remaining awaitables".

Sadly, it still consumes the iterable right away:

def as_completed(fs, *, timeout=None):
    ...  # set-up
    todo = {ensure_future(f, loop=loop) for f in set(fs)}
    ...  # actual logic

But there's another, subtler issue.

as_completed() has no limits of its own – it's up to us to limit how fast we feed it awaitables. Presumably, we could wrap the input iterable into a generator that yields awaitables only if enough results came out the other end, and waits otherwise.

However, due to historical reasons, as_completed() takes a plain-old-sync-iterator – we cannot await anything in its (sync) __next__(), and sync waiting of any kind would block (and possibly deadlock) the entire event loop.

So, no as_completed() for you.

asyncio.Queue #

Speaking of threading counterparts, how would you implement imap_unordered() if there was no Pool? Queues, of course!

And asyncio has its own Queue, which you use in pretty much the same way: start limit worker tasks that loop forever, each pulling awaitables, awaiting them, and putting the results into a queue.

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def limit_concurrency(aws, limit):
    aws = iter(aws)
    queue = asyncio.Queue()
    ndone = 0

    async def worker():
        while True:
            try:
                aw = next(aws)
            except StopIteration:
                await queue.put((False, None))
                break

            try:
                await queue.put((True, await aw))
            except Exception as e:
                await queue.put((False, e))
                break

    worker_tasks = [asyncio.create_task(worker()) for _ in range(limit)]

    while ndone < limit or not queue.empty():
        ok, rv = await queue.get()
        if ok:
            yield rv
        elif rv:
            raise rv
        else:
            ndone += 1

The iterable is exhausted before the last "batch" starts:

$ python map_unordered.py 2 .1 .2 .3 .3 .2 .1
0.1: 0.1
0.2: 0.2
0.4: 0.3
0.5: 0.3
iter end
0.6: 0.1
0.6: 0.2
0.6: done

I was going to work up to this in a few steps, but I'll just point out three common bugs this type of code might have (that apply to threads too).

First, we could increment ndone from the worker, but this makes await queue.get() hang forever for empty iterables, since workers never get to run by the time we get to it; because there's no other await, it's not even a race condition.
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def limit_concurrency(aws, limit):
    aws = iter(aws)
    queue = asyncio.Queue()
    ndone = 0

    async def worker():
        nonlocal ndone

        while True:
            try:
                aw = next(aws)
            except StopIteration:
                ndone += 1
                break

            await queue.put(await aw)

    worker_tasks = [asyncio.create_task(worker()) for _ in range(limit)]

    while ndone < limit or not queue.empty():
        yield await queue.get()
$ python map_unordered.py 2
iter end
Traceback (most recent call last):
  ...
asyncio.exceptions.TimeoutError

The solution is to signal the worker is done in-band, by putting a sentinel on the queue. I guess a good rule of thumb is that you want a put() for each get() without a timeout.1

Second, you have to catch all exceptions; otherwise, the worker gets killed, and get() waits forever for a sentinel that will never come.
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
async def limit_concurrency(aws, limit):
    aws = iter(aws)
    queue = asyncio.Queue()
    ndone = 0

    done = object()

    async def worker():
        while True:
            try:
                aw = next(aws)
            except StopIteration:
                await queue.put(done)
                break

            await queue.put(await aw)

    worker_tasks = [asyncio.create_task(worker()) for _ in range(limit)]

    while ndone < limit or not queue.empty():
        rv = await queue.get()
        if rv is done:
            ndone += 1
            continue
        yield rv
$ python map_unordered.py 2 .1 .2 0 .2 .1
0.1: 0.1
0.2: 0.2
0.4: 0.2
iter end
0.5: 0.1
Traceback (most recent call last):
  ...
asyncio.exceptions.TimeoutError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<limit_concurrency.<locals>.worker() done, defined at map_unordered.py:20> exception=ZeroDivisionError('float division by zero')>
Traceback (most recent call last):
  ...
ZeroDivisionError: float division by zero

Finally, our input iterator is synchronous (for now), so no other task can run during next(aws). But if it were async, any number of tasks could await anext(aws) in parallel, leading to concurrency issues. The fix is the same as with threads: either protect that call with a Lock, or feed awaitables to workers through an input queue.

Anyway, no need to worry about any of that – a better solution awaits.

Aside: backpressure #

At this point, we're technically done – the queue solution does everything Pool.​imap_unordered() does.

So much so, that, like imap_unordered(), it lacks backpressure: when code consuming results from map_unordered() cannot keep up with the tasks producing them, the results accumulate in the internal queue, with potentially infinite memory usage.

>>> pool = multiprocessing.dummy.Pool(1)
>>> for _ in pool.imap_unordered(print, range(4)):
...     time.sleep(.1)
...     print('got result')
...
0
1
2
3
got result
got result
got result
got result
>>> async def async_print(arg):
...     print(arg, '(async)')
...
>>> async for _ in map_unordered(async_print, range(4), limit=1):
...     await asyncio.sleep(.1)
...     print('got result')
...
0 (async)
1 (async)
2 (async)
3 (async)
got result
got result
got result
got result

To fix this, we make the queue bounded, so that workers block while the queue is full.

15
    queue = asyncio.Queue(limit)
>>> async for _ in map_unordered(async_print, range(5), limit=1):
...     await asyncio.sleep(.1)
...     print('got result')
...
0 (async)
1 (async)
2 (async)
got result
3 (async)
got result
4 (async)
got result
got result
got result

Alas, we can't do the same thing for Pool.​imap_unordered() because don't have access to its queue, but that's a story for another time.

asyncio.wait() #

Pretending we're using threads works, but it's not all that idiomatic.

If only there was some sort of low level, select()-like primitive taking a set of tasks and blocking until at least one of them finishes. And of course there is – we've been purposefully avoiding it this entire time – it's asyncio.wait(), and it does exactly that.

By default, it waits until all tasks are completed, which isn't much better than gather().

But, with return_when=​FIRST_COMPLETED, it waits until at least one task is completed. We can use this to keep a limit-sized set of running tasks updated with new tasks as soon as the old ones finish:

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def limit_concurrency(aws, limit):
    aws = iter(aws)
    aws_ended = False
    pending = set()

    while pending or not aws_ended:
        while len(pending) < limit and not aws_ended:
            try:
                aw = next(aws)
            except StopIteration:
                aws_ended = True
            else:
                pending.add(asyncio.ensure_future(aw))

        if not pending:
            return

        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        while done:
            yield done.pop()

We change limit_concurrency() to yield awaitables instead of results, so it's more symmetric – awaitables in, awaitables out. map_unordered() then becomes an async generator function, instead of a sync function returning an async generator. This is functionally the same, but does make it a bit more self-documenting.

 8
 9
10
11
async def map_unordered(func, iterable, *, limit):
    aws = map(func, iterable)
    async for task in limit_concurrency(aws, limit):
        yield await task

This implementation has all the properties that the Queue one has:

$ python map_unordered.py 2 .1 .2 .2 .1
0.1: 0.1
0.2: 0.2
0.3: 0.1
0.3: 0.2
iter end
0.3: done

... and backpressure too:

>>> async for _ in map_unordered(async_print, range(4), limit=1):
...     await asyncio.sleep(.1)
...     print('got result')
...
0 (async)
got result
1 (async)
got result
2 (async)
got result
3 (async)
got result
Liking this so far? Here's another article you might like:

Async iterables #

OK, but what if we pass map_unordered() an asynchronous iterable? We are talking about async stuff, after all.

This opens up a whole looking-glass world of async iteration: instead of iter() you have aiter(), instead of next() you have anext(), some of them you await, some you don't... Thankfully, we can support both without making things much worse.

And we don't need to be particularly smart about it either; we can just feed the current code an async iterable from main(), and punch our way through the exceptions:

57
58
59
async def as_async_iter(it):
    for x in it:
        yield x
66
67
68
    args = on_iter_end(args, lambda: print("iter end"))
    args = as_async_iter(args)
    asyncio.run(asyncio.wait_for(async_main(args, limit), timeout))

$ python map_unordered.py 2 .1 .2 .2 .1
Traceback (most recent call last):
  ...
  File "map_unordered.py", line 9, in map_unordered
    aws = map(func, iterable)
TypeError: 'async_generator' object is not iterable

map() doesn't work with async iterables, so we use a generator expression instead.

 8
 9
10
11
12
13
14
15
async def map_unordered(func, iterable, *, limit):
    try:
        aws = map(func, iterable)
    except TypeError:
        aws = (func(x) async for x in iterable)

    async for task in limit_concurrency(aws, limit):
        yield await task

In true easier to ask for forgiveness than permission style, we handle the exception from map() instead of, say, checking if aws is an instance of collections.​abc.​Iterable.

We could wrap aws to always be an async iterable, but limit_concurrency() is useful on it its own, so it's better to support both.


$ python map_unordered.py 2 .1 .2 .2 .1
Traceback (most recent call last):
  ...
  File "map_unordered.py", line 19, in limit_concurrency
    aws = iter(aws)
TypeError: 'async_generator' object is not iterable

For async iterables, we need to use aiter():

18
19
20
21
22
23
24
async def limit_concurrency(aws, limit):
    try:
        aws = aiter(aws)
        is_async = True
    except TypeError:
        aws = iter(aws)
        is_async = False

$ python map_unordered.py 2 .1 .2 .2 .1
Traceback (most recent call last):
  ...
  File "map_unordered.py", line 32, in limit_concurrency
    aw = next(aws)
TypeError: 'async_generator' object is not an iterator

... and anext():

30
31
32
33
34
35
36
        while len(pending) < limit and not aws_ended:
            try:
                aw = await anext(aws) if is_async else next(aws)
            except StopAsyncIteration if is_async else StopIteration:
                aws_ended = True
            else:
                pending.add(asyncio.ensure_future(aw))

... which unlike aiter(), has to be awaited.


Here's limit_concurrency() in its entire glory:
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def limit_concurrency(aws, limit):
    try:
        aws = aiter(aws)
        is_async = True
    except TypeError:
        aws = iter(aws)
        is_async = False

    aws_ended = False
    pending = set()

    while pending or not aws_ended:
        while len(pending) < limit and not aws_ended:
            try:
                aw = await anext(aws) if is_async else next(aws)
            except StopAsyncIteration if is_async else StopIteration:
                aws_ended = True
            else:
                pending.add(asyncio.ensure_future(aw))

        if not pending:
            return

        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        while done:
            yield done.pop()

Not as clean as before, but it gets the job done:

$ python map_unordered.py 2 .1 .2 .2 .1
0.1: 0.1
0.2: 0.2
0.3: 0.1
0.3: 0.2
iter end
0.3: done

Anyway, that's it for now. Here's the final version of the code.

Learned something new today? Share this with others, it really helps!

If you've made it this far, you might like:

Bonus: exceptions #

OK, so what about exceptions?

A lot of times, you still want to do the rest of the things, even if one fails. Also, you probably want to know which one failed, but the map_unordered() results are not in order, so how could you tell?

The most flexible solution is to let the user handle it just like they would with Pool.​imap_unordered() – by decorating the original function. Here's one way of doing it:

48
49
50
51
52
53
54
55
def return_args_and_exceptions(func):
    return functools.partial(_return_args_and_exceptions, func)

async def _return_args_and_exceptions(func, *args):
    try:
        return *args, await func(*args)
    except Exception as e:
        return *args, e
66
67
68
    wrapped = return_args_and_exceptions(do_stuff)
    async for arg, result in map_unordered(wrapped, args, limit=limit):
        print(f"{(time.monotonic() - start):.1f}: {arg} -> {result}")
$ python map_unordered.py 2 .1 .2 0 .2 .1
0.1: 0.1 -> 0.1
0.1: 0.0 -> float division by zero
0.2: 0.2 -> 0.2
0.3: 0.2 -> 0.2
0.3: 0.1 -> 0.1
iter end
0.3: done

Bonus: better decorators? #

Finally, here's a cool thing I learned from the asyncio docs.

When writing decorators, you can use partial() to bind the decorated function to an existing wrapper, instead of always returning a new one. The result is a more descriptive representation:

>>> return_args_and_exceptions(do_stuff)
functools.partial(<function _return_args_and_exceptions at 0x10647fd80>, <function do_stuff at 0x10647d8a0>)

Compare with the traditional version:

def return_args_and_exceptions(func):
    async def wrapper(*args):
        ...
    return wrapper
>>> return_args_and_exceptions(do_stuff)
<function return_args_and_exceptions.<locals>.wrapper at 0x103993560>
  1. Does this have a fancy, academic name? Do let me know! [return]