ProcessThreadPoolExecutor: when I/O becomes CPU-bound
April 2025 ∙ 14 minute read ∙
So, you're doing some I/O bound stuff, in parallel.
Maybe you're scraping some websites – a lot of websites.
Maybe you're deleting many millions of DynamoDB items.
You've got your ThreadPoolExecutor, you've increased the number of threads and tuned connection limits... but after some point, it's just not getting any faster. You look at your Python process, and you see CPU utilization hovers above 100%.
You could split the your work into batches and pass them to a ProcessPoolExecutor, which then runs your code in separate processes. But that requires yet more code, and a bunch of changes, which is no fun. And maybe your input is not that easy to split into batches.
If only we had an executor that works seamlessly across processes and threads.
Well, you're in luck, since that's exactly what we're building today!
And even better, in a couple years you won't even need it anymore.
Establishing a baseline #
To measure things, we'll use a mock that pretends to do mostly I/O, with a sprinkling of CPU-bound work thrown in – a stand-in for something like a database connection, a Requests session, or a DynamoDB client.
class Client:
io_time = 0.02
cpu_time = 0.0008
def method(self, arg):
# simulate I/O
time.sleep(self.io_time)
# simulate CPU-bound work
start = time.perf_counter()
while time.perf_counter() - start < self.cpu_time:
for i in range(100): i ** i
return arg
We sleep() for the I/O, and do some math in a loop for the CPU stuff; it doesn't matter exactly how long each takes, as long I/O time dominates.
Real multi-threaded clients are usually backed by a connection pool; we could simulate one using a semaphore, but it's not relevant here – we're assuming the connection pool is effectively unbounded.
Since we'll use our client from multiple processes,
we set up a global instance and a function that uses it;
we can then pass init_client()
as
an executor initializer,
which also allows us passing arguments to the client
when creating it.
client = None
def init_client(*args):
global client
client = Client(*args)
def do_stuff(*args):
return client.method(*args)
Finally, we make a simple timing context manager:
@contextmanager
def timer():
start = time.perf_counter()
yield
end = time.perf_counter()
print(f"elapsed: {end-start:1.3f}")
...and put everything together in a function that measures how long it takes to do a bunch of work using a concurrent.futures executor:
def benchmark(executor, n=10_000, timer=timer, chunksize=10):
with executor:
# make sure all the workers are started,
# so we don't measure their startup time
list(executor.map(time.sleep, [0] * 200))
with timer():
values = list(executor.map(do_stuff, range(n), chunksize=chunksize))
assert values == list(range(n)), values
Threads #
So, a ThreadPoolExecutor should suffice here, since we're mostly doing I/O, right?
>>> from concurrent.futures import *
>>> from bench import *
>>> init_client()
>>> benchmark(ThreadPoolExecutor(10))
elapsed: 24.693
More threads!
>>> benchmark(ThreadPoolExecutor(20))
elapsed: 12.405
Twice the threads, twice as fast. More!
>>> benchmark(ThreadPoolExecutor(30))
elapsed: 8.718
Good, it's still scaling linearly. MORE!
>>> benchmark(ThreadPoolExecutor(40))
elapsed: 8.638
...more?
>>> benchmark(ThreadPoolExecutor(50))
elapsed: 8.458
>>> benchmark(ThreadPoolExecutor(60))
elapsed: 8.430
>>> benchmark(ThreadPoolExecutor(70))
elapsed: 8.428
Problem: CPU becomes a bottleneck #
It's time we take a closer look at what our process is doing.
I'd normally use the top command for this, but since the flags and output vary with the operating system, we'll implement our own using the excellent psutil library.
@contextmanager
def top():
"""Print information about current and child processes.
RES is the resident set size. USS is the unique set size.
%CPU is the CPU utilization. nTH is the number of threads.
"""
process = psutil.Process()
processes = [process] + process.children(True)
for p in processes: p.cpu_percent()
yield
print(f"{'PID':>7} {'RES':>7} {'USS':>7} {'%CPU':>7} {'nTH':>7}")
for p in processes:
try:
m = p.memory_full_info()
except psutil.AccessDenied:
m = p.memory_info()
rss = m.rss / 2**20
uss = getattr(m, 'uss', 0) / 2**20
cpu = p.cpu_percent()
nth = p.num_threads()
print(f"{p.pid:>7} {rss:6.1f}m {uss:6.1f}m {cpu:7.1f} {nth:>7}")
And because it's a context manager, we can use it as a timer:
>>> init_client()
>>> benchmark(ThreadPoolExecutor(10), timer=top)
PID RES USS %CPU nTH
51395 35.2m 28.5m 38.7 11
So, what happens if we increase the number of threads?
>>> benchmark(ThreadPoolExecutor(20), timer=top)
PID RES USS %CPU nTH
13912 16.8m 13.2m 70.7 21
>>> benchmark(ThreadPoolExecutor(30), timer=top)
PID RES USS %CPU nTH
13912 17.0m 13.4m 99.1 31
>>> benchmark(ThreadPoolExecutor(40), timer=top)
PID RES USS %CPU nTH
13912 17.3m 13.7m 100.9 41
With more threads, the compute part of our I/O bound workload increases, eventually becoming high enough to saturate one CPU – and due to the global interpreter lock, one CPU is all we can use, regardless of the number of threads.1
Processes? #
I know, let's use a ProcessPoolExecutor instead!
>>> benchmark(ProcessPoolExecutor(20, initializer=init_client))
elapsed: 12.374
>>> benchmark(ProcessPoolExecutor(30, initializer=init_client))
elapsed: 8.330
>>> benchmark(ProcessPoolExecutor(40, initializer=init_client))
elapsed: 6.273
Hmmm... I guess it is a little bit better.
More? More!
>>> benchmark(ProcessPoolExecutor(60, initializer=init_client))
elapsed: 4.751
>>> benchmark(ProcessPoolExecutor(80, initializer=init_client))
elapsed: 3.785
>>> benchmark(ProcessPoolExecutor(100, initializer=init_client))
elapsed: 3.824
OK, it's better, but with diminishing returns – there's no improvement after 80 processes, and even then, it's only 2.2x faster than the best time with threads, when, in theory, it should be able to make full use of all 4 CPUs.
Also, we're not making best use of connection pooling (relevant if the client connects to many different hosts, since we now have 80 pools), nor multiplexing (relevant with protocols like HTTP/2 or newer, since we now have 80 connections).
Problem: processes use more memory #
But it gets worse!
>>> benchmark(ProcessPoolExecutor(80, initializer=init_client), timer=top)
PID RES USS %CPU nTH
2479 21.2m 15.4m 15.0 3
2480 11.2m 6.3m 0.0 1
2481 13.8m 8.5m 3.4 1
... 78 more lines ...
2560 13.8m 8.5m 4.4 1
13.8 MiB * 80 ~= 1 GiB ... that is a lot of memory.
Now, there's some nuance to be had here.
First, on most operating systems that have virtual memory, code segment pages are shared between processes – there's no point in having 80 copies of libc or the Python interpreter in memory.
The unique set size is probably a better measurement than the resident set size, since it excludes memory shared between processes.2 So, for the macOS output above,3 the actual usage is more like 8.5 MiB * 80 = 680 MiB.
Second, if you use the fork or forkserver start methods, processes also share memory allocated before the fork() via copy-on-write; for Python, this includes module code and variables. On Linux, the actual usage is 1.7 MiB * 80 = 136 MiB:
>>> benchmark(ProcessPoolExecutor(80, initializer=init_client), timer=top)
PID RES USS %CPU nTH
329801 17.0m 6.6m 5.1 3
329802 13.3m 1.6m 2.1 1
... 78 more lines ...
329881 13.3m 1.7m 2.0 1
However, it's important to note that's just a lower bound; memory allocated after fork() is not shared, and most real work will unavoidably allocate more memory.
Being unreasonable #
One reasonable way of dealing with this would be to split the input into batches, one per CPU, and pass them to a ProcessPoolExecutor, which in turn runs the batch items using a ThreadPoolExecutor.4
But that would mean we need to change our code, and that's no fun. If only there was an executor that works seamlessly across processes and threads.
A minimal plausible solution #
In keeping with what has become tradition by now, we'll take an iterative, problem-solution approach; since we're not sure what to do yet, we start with the simplest thing that could possibly work.
We know we want a process pool executor that starts one thread pool executor per process, so let's deal with that first.
class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):
def __init__(self, max_threads=None, initializer=None, initargs=()):
super().__init__(
initializer=_init_process,
initargs=(max_threads, initializer, initargs)
)
By subclassing ProcessPoolExecutor, we get the map() implementation for free.
By going with the default max_workers
,
we get one process per CPU (which is what we want);
we can add more arguments later if needed.
In our custom process initializer, we set up a global thread pool executor, and then call the initializer provided by the user:
_executor = None
def _init_process(max_threads, initializer, initargs):
global _executor
_executor = concurrent.futures.ThreadPoolExecutor(max_threads)
atexit.register(_executor.shutdown)
if initializer:
initializer(*initargs)
Likewise, submit() passes the work along to the thread pool executor:
class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):
# ...
def submit(self, fn, *args, **kwargs):
return super().submit(_submit, fn, *args, **kwargs)
def _submit(fn, *args, **kwargs):
return _executor.submit(fn, *args, **kwargs).result()
OK, that looks good enough; let's use it and see if it works:
def _do_stuff(n):
print(f"doing: {n}")
return n ** 2
if __name__ == '__main__':
with ProcessThreadPoolExecutor() as e:
print(list(e.map(_do_stuff, [0, 1, 2])))
$ python ptpe.py
doing: 0
doing: 1
doing: 2
[0, 1, 4]
Wait, we got it on the first try?!
Let's measure that:
>>> from bench import *
>>> from ptpe import *
>>> benchmark(ProcessThreadPoolExecutor(30, initializer=init_client), n=1000)
elapsed: 6.161
Hmmm... that's unexpectedly slow... almost as if:
>>> multiprocessing.cpu_count()
4
>>> benchmark(ProcessPoolExecutor(4, initializer=init_client), n=1000)
elapsed: 6.067
Ah, because _submit()
waits for the result()
in the main thread of the worker process,
this is just a ProcessPoolExecutor with extra steps.
But what if we send back the future instead?
def submit(self, fn, *args, **kwargs):
return super().submit(_submit, fn, *args, **kwargs).result()
def _submit(fn, *args, **kwargs):
return _executor.submit(fn, *args, **kwargs)
Alas:
$ python ptpe.py
doing: 0
doing: 1
doing: 2
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "concurrent/futures/process.py", line 210, in _sendback_result
result_queue.put(_ResultItem(work_id, result=result,
File "multiprocessing/queues.py", line 391, in put
obj = _ForkingPickler.dumps(obj)
File "multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "ptpe.py", line 42, in <module>
print(list(e.map(_do_stuff, [0, 1, 2])))
...
TypeError: cannot pickle '_thread.RLock' object
It may not seem like it, but this is a partial success: the work happens, we just can't get anything back. Not surprising, to be honest, it couldn't have been that easy.
Getting results #
If you look carefully at the traceback, you'll find a hint of how ProcessPoolExecutor gets its own results back from workers – a queue; the module docstring even has a neat data-flow diagram:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Now, we could probably use the same queue somehow, but it would involve touching a lot of (private) internals.5 Instead, let's use a separate queue:
def __init__(self, max_threads=None, initializer=None, initargs=()):
self.__result_queue = multiprocessing.Queue()
super().__init__(
initializer=_init_process,
initargs=(self.__result_queue, max_threads, initializer, initargs)
)
On the worker side, we make it globally accessible:
_executor = None
_result_queue = None
def _init_process(queue, max_threads, initializer, initargs):
global _executor, _result_queue
_executor = concurrent.futures.ThreadPoolExecutor(max_threads)
atexit.register(_executor.shutdown)
_result_queue = queue
atexit.register(_result_queue.close)
if initializer:
initializer(*initargs)
...so we can use it from a task callback registered by _submit()
:
def _submit(fn, *args, **kwargs):
task = _executor.submit(fn, *args, **kwargs)
task.add_done_callback(_put_result)
def _put_result(task):
if exception := task.exception():
_result_queue.put((False, exception))
else:
_result_queue.put((True, task.result()))
Back in the main process, we handle the results in a thread:
def __init__(self, max_threads=None, initializer=None, initargs=()):
# ...
self.__result_handler = threading.Thread(target=self.__handle_results)
self.__result_handler.start()
def __handle_results(self):
for ok, result in iter(self.__result_queue.get, None):
print(f"{'ok' if ok else 'error'}: {result}")
Finally, to stop the handler, we use None as a sentinel on executor shutdown:
def shutdown(self, wait=True):
super().shutdown(wait=wait)
if self.__result_queue:
self.__result_queue.put(None)
if wait:
self.__result_handler.join()
self.__result_queue.close()
self.__result_queue = None
Let's see if it works:
$ python ptpe.py
doing: 0
ok: [0]
doing: 1
ok: [1]
doing: 2
ok: [4]
Traceback (most recent call last):
File "concurrent/futures/_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
AttributeError: 'NoneType' object has no attribute 'result'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
...
AttributeError: 'NoneType' object has no attribute 'cancel'
Yay, the results are making it to the handler!
The error happens because instead of returning a Future,
our submit() returns the result of _submit()
, which is always None.
Fine, we'll make our own futures #
But submit() must return a future, so we make our own:
def __init__(self, max_threads=None, initializer=None, initargs=()):
# ...
self.__tasks = {}
# ...
def submit(self, fn, *args, **kwargs):
outer = concurrent.futures.Future()
task_id = id(outer)
self.__tasks[task_id] = outer
outer.set_running_or_notify_cancel()
inner = super().submit(_submit, task_id, fn, *args, **kwargs)
return outer
In order to map results to their futures, we need an unique identifier; the id() of the outer future should do, since it is unique for the object's lifetime.
We pass the id to _submit()
,
then to _put_result()
as an attribute on the future,
and finally back in the queue with the result:
def _submit(task_id, fn, *args, **kwargs):
task = _executor.submit(fn, *args, **kwargs)
task.task_id = task_id
task.add_done_callback(_put_result)
def _put_result(task):
if exception := task.exception():
_result_queue.put((task.task_id, False, exception))
else:
_result_queue.put((task.task_id, True, task.result()))
Back in the result handler, we find the maching future, and set the result accordingly:
def __handle_results(self):
for task_id, ok, result in iter(self.__result_queue.get, None):
outer = self.__tasks.pop(task_id)
if ok:
outer.set_result(result)
else:
outer.set_exception(result)
And it works:
$ python ptpe.py
doing: 0
doing: 1
doing: 2
[0, 1, 4]
I mean, it really works:
>>> benchmark(ProcessThreadPoolExecutor(10, initializer=init_client))
elapsed: 6.220
>>> benchmark(ProcessThreadPoolExecutor(20, initializer=init_client))
elapsed: 3.397
>>> benchmark(ProcessThreadPoolExecutor(30, initializer=init_client))
elapsed: 2.575
>>> benchmark(ProcessThreadPoolExecutor(40, initializer=init_client))
elapsed: 2.664
3.3x is not quite the 4 CPUs my laptop has, but it's pretty close, and much better than the 2.2x we got from processes alone.
Death becomes a problem #
I wonder what happens when a worker process dies.
For example, the initializer can fail:
>>> executor = ProcessPoolExecutor(initializer=divmod, initargs=(0, 0))
>>> executor.submit(int).result()
Exception in initializer:
Traceback (most recent call last):
...
ZeroDivisionError: integer division or modulo by zero
Traceback (most recent call last):
...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
...or it can die some time later,
which we can help with a custom timer
:6
@contextmanager
def terminate_child(interval=1):
threading.Timer(interval, psutil.Process().children()[-1].terminate).start()
yield
>>> executor = ProcessPoolExecutor(initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
[ one second later ]
Traceback (most recent call last):
...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Now let's see our executor:
>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
[ one second later ]
[ ... ]
[ still waiting ]
[ ... ]
[ hello? ]
There's actually two scenarios here:
- if map() is still submitting tasks,
inner
will silently fail with BrokenProcessPool - if map() is already waiting for results, it'll keep waiting, forever
Either way, we want to fail all pending tasks with BrokenProcessPool.
For the first case, we don't need any special handling,
since it's fixed by handling the second one.
Nevertheless,
we should still propagate inner
exceptions to the outer
task,
otherwise we may ignore other exceptions.
def submit(self, fn, *args, **kwargs):
# ...
inner = super().submit(_submit, task_id, fn, *args, **kwargs)
inner.task_id = task_id
inner.add_done_callback(self.__handle_inner)
return outer
Of note, the task may have already been handled by the second case:
def __handle_inner(self, inner):
task_id = inner.task_id
if exception := inner.exception():
if outer := self.__tasks.pop(task_id, None):
outer.set_exception(exception)
This already fixes the case where a worker dies almost instantly:
>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=lambda: terminate_child(0))
Traceback (most recent call last):
...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
For the second case, we need to check if the executor is broken – but how? ProcessPoolExecutor._broken exists, but we decided we don't want to depend on internals. Maybe we can submit a dummy task and see if it fails:
def __handle_results(self):
while True:
try:
value = self.__result_queue.get(timeout=.1)
except queue.Empty:
try:
super().submit(int).cancel()
except concurrent.futures.BrokenExecutor as e:
exc = type(e)(str(e))
break
except RuntimeError as e:
assert 'shutdown' in str(e), e
continue
if not value:
return
task_id, ok, result = value
if outer := self.__tasks.pop(task_id, None):
if ok:
outer.set_result(result)
else:
outer.set_exception(result)
while self.__tasks:
try:
_, outer = self.__tasks.popitem()
except KeyError:
break
outer.set_exception(exc)
We can use the Queue.get() timeout to submit the dummy task only if we're not receiving results fast enough. When the executor is broken, submit() fails, which breaks us out of the while loop where we can fail the pending tasks.
Like so:
>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
Traceback (most recent call last):
...
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
So, yeah, I think we're done. Here's the final executor and benchmark code.
Some features are left as an exercise for the reader:
- providing an initializer for the ThreadPoolExecutor as well
- using other multiprocessing start methods
- shutdown()'s
cancel_futures
Learned something new today? Share this with others, it really helps!
Bonus: free threading #
You may have heard people being excited about the experimental free threading support added in Python 3.13, which allows running Python code on multiple CPUs.
And for good reason:
$ python3.13t
Python 3.13.2 experimental free-threading build
>>> from concurrent.futures import *
>>> from bench import *
>>> init_client()
>>> benchmark(ThreadPoolExecutor(30))
elapsed: 8.224
>>> benchmark(ThreadPoolExecutor(40))
elapsed: 6.193
>>> benchmark(ThreadPoolExecutor(120))
elapsed: 2.323
3.6x over to the GIL version, with none of the shenanigans we've been up to!
Alas, packages with extensions need to be updated to support it:
>>> import psutil
zsh: segmentation fault python3.13t
At least, all we can use for pure-Python code. I/O always releases the global interpreter lock, and so do some extension modules. [return]
The psutil documentation for memory_full_info() explains the difference quite nicely and links to further resources, because good libraries educate. [return]
You may have to run Python as root to get the USS of child processes. [return]
And no, asyncio is not a solution, since the event loop runs in a single thread, so you'd still need to run one event loop per CPU in dedicated processes. [return]
Check out nilp0inter/threadedprocess for an idea of what that looks like. [return]
pkill -fn '[Pp]ython'
would've done it too, but it gets tedious if you do it a lot, and it's a different command on Windows. [return]