Inheritance over composition, sometimes
July 2025 ∙ 12 minute read ∙
In ProcessThreadPoolExecutor: when I/O becomes CPU-bound, we built a hybrid concurrent.futures executor that runs tasks in multiple threads on all available CPUs, bypassing Python's global interpreter lock.
Here's some interesting reader feedback:
Currently, the code is complex due to subclassing and many layers of delegation. Could this solution be implemented using only functions, no classes? Intuitively I feel classes would be hell to debug.
Since a lot of advanced beginners struggle with structuring code, we'll implement the same executor using inheritance, composition, and functions only, compare the solutions, and reach some interesting conclusions. Consider this a worked example.
Note
Today we're focusing on code structure. While not required, reading the original article will give you a better idea of why the code does what it does.
Requirements #
Before we delve into the code, we should have some understanding of what we're building. The orginal article sets out the following functional requirements:
- Implement the Executor interface; we want a drop-in replacement for existing concurrent.futures executors, so that user code doesn't have to change.
- Spread the work to one worker process per CPU, and then further to multiple threads inside each worker, to work around CPU becoming a bottleneck for I/O.
Additionally, we have two implicit non-functional requirements:
- Use the existing executors where possible (less code means fewer bugs).
- Only depend on stable, documented features; we don't want our code to break when concurrent.futures internals change.
concurrent.futures #
Since we're building on top of concurrent.futures, we should also get familiar with it; the docs already provide a great introduction:
The concurrent.futures module provides a high-level interface for asynchronously executing callables. [...this] can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.
Let's look at the classes in more detail.
Executor is an abstract base class1 defined in concurrent.futures._base. It provides dummy submit() and shutdown() methods, a concrete map() method implemented in terms of submit(), and context manager methods that shutdown() the executor on exit. Notably, the documentation does not mention the concrete methods, instead saying that the class "should not be used directly, but through its concrete subclasses".
The first subclass, ThreadPoolExecutor, is defined in concurrent.futures.thread; it implements submit() and shutdown(), inheriting map() unchanged.
The second one, ProcessPoolExecutor, is defined in concurrent.futures.process; as an optimization, it overrides map() to chop the input iterables and pass the chunks to the superclass method with super().
Three solutions #
Now we're ready for code.
Inheritance #
First, the original implementation,2 arguably a textbook example of inheritance.
We override __init__
, submit(), and shutdown(),
and do some extra stuff on top of the inherited behavior,
which we access through super().
We inherit
the context manager methods,
map(),
and any public methods ProcessPoolExecutor may get in the future,
assuming they use only other public methods
(more on this below).
class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):
def __init__(self, max_threads=None, initializer=None, initargs=()):
self.__result_queue = multiprocessing.Queue()
super().__init__(
initializer=_init_process,
initargs=(self.__result_queue, max_threads, initializer, initargs)
)
self.__tasks = {}
self.__result_handler = threading.Thread(target=self.__handle_results)
self.__result_handler.start()
def submit(self, fn, *args, **kwargs):
outer = concurrent.futures.Future()
task_id = id(outer)
self.__tasks[task_id] = outer
outer.set_running_or_notify_cancel()
inner = super().submit(_submit, task_id, fn, *args, **kwargs)
return outer
def __handle_results(self):
for task_id, ok, result in iter(self.__result_queue.get, None):
outer = self.__tasks.pop(task_id)
if ok:
outer.set_result(result)
else:
outer.set_exception(result)
def shutdown(self, wait=True):
super().shutdown(wait=wait)
if self.__result_queue:
self.__result_queue.put(None)
if wait:
self.__result_handler.join()
self.__result_queue.close()
self.__result_queue = None
Because we're subclassing a class with private, undocumented attributes, our private attributes have to start with double underscores to avoid clashes with superclass ones (such as _result_queue).
In addition to the main class, there are some global functions used in the worker processes which remain unchanged regardless of the solution:
# this code runs in each worker process
_executor = None
_result_queue = None
def _init_process(queue, max_threads, initializer, initargs):
global _executor, _result_queue
_executor = concurrent.futures.ThreadPoolExecutor(max_threads)
_result_queue = queue
if initializer:
initializer(*initargs)
def _submit(task_id, fn, *args, **kwargs):
task = _executor.submit(fn, *args, **kwargs)
task.task_id = task_id
task.add_done_callback(_put_result)
def _put_result(task):
if exception := task.exception():
_result_queue.put((task.task_id, False, exception))
else:
_result_queue.put((task.task_id, True, task.result()))
Composition #
OK, now let's use composition –
instead of being a ProcessPoolExecutor,
our ProcessThreadPoolExecutor has one.
At a first glance,
the result is the same as before,
with super()
changed to self._inner
:
class ProcessThreadPoolExecutor:
def __init__(self, max_threads=None, initializer=None, initargs=()):
self._result_queue = multiprocessing.Queue()
self._inner = concurrent.futures.ProcessPoolExecutor(
initializer=_init_process,
initargs=(self._result_queue, max_threads, initializer, initargs)
)
self._tasks = {}
self._result_handler = threading.Thread(target=self._handle_results)
self._result_handler.start()
def submit(self, fn, *args, **kwargs):
outer = concurrent.futures.Future()
task_id = id(outer)
self._tasks[task_id] = outer
outer.set_running_or_notify_cancel()
inner = self._inner.submit(_submit, task_id, fn, *args, **kwargs)
return outer
def _handle_results(self):
for task_id, ok, result in iter(self._result_queue.get, None):
outer = self._tasks.pop(task_id)
if ok:
outer.set_result(result)
else:
outer.set_exception(result)
def shutdown(self, wait=True):
self._inner.shutdown(wait=wait)
if self._result_queue:
self._result_queue.put(None)
if wait:
self._result_handler.join()
self._result_queue.close()
self._result_queue = None
Except, we need to implement the context manager protocol ourselves:
def __enter__(self):
# concurrent.futures._base.Executor.__enter__
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# concurrent.futures._base.Executor.__exit__
self.shutdown(wait=True)
return False
...and we need to copy map()
from Executor,
since it should use our submit()
:
def _map(self, fn, *iterables, timeout=None, chunksize=1):
# concurrent.futures._base.Executor.map
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
fs.reverse()
while fs:
if timeout is None:
yield _result_or_cancel(fs.pop())
else:
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
...and the chunksize
optimization from its ProcessPoolExecutor version:
def map(self, fn, *iterables, timeout=None, chunksize=1):
# concurrent.futures.process.ProcessPoolExecutor.map
if chunksize < 1:
raise ValueError("chunksize must be >= 1.")
results = self._map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
return _chain_from_iterable_of_lists(results)
...and a bunch of private functions they use.
def _result_or_cancel(fut, timeout=None):
# concurrent.futures._base._result_or_cancel
try:
try:
return fut.result(timeout)
finally:
fut.cancel()
finally:
del fut
def _process_chunk(fn, chunk):
# concurrent.futures.process._process_chunk
return [fn(*args) for args in chunk]
def _chain_from_iterable_of_lists(iterable):
# concurrent.futures.process._chain_from_iterable_of_lists
for element in iterable:
element.reverse()
while element:
yield element.pop()
And, when the Executor interface gets new methods, we'll need to at least forward them to the inner executor, although we may have to copy those too.
On the upside, no base class means we can name attributes however we want.
But this is Python, why do we need to copy stuff? In Python, methods are just functions, so we could almost get away with this:
class ProcessThreadPoolExecutor:
... # __init__, submit(), and shutdown() just as before
__enter__ = ProcessPoolExecutor.__enter__
__exit__ = ProcessPoolExecutor.__exit__
map = ProcessPoolExecutor.map
Alas, it won't work –
ProcessPoolExecutor map()
calls super().map()
,
and object,
the superclass of our executor,
has no such method,
which is why we had to change it to self._map()
in our copy in the first place.
Functions #
Can this be done using only functions, though?
Theoretically no, since we need to implement the executor interface. Practically yes, since this is Python, where an "interface" just means having specific attributes, usually functions with specific signatures. For example, a module like this:
def init(max_threads=None, initializer=None, initargs=()):
global _result_queue, _inner, _tasks, _result_handler
_result_queue = multiprocessing.Queue()
_inner = concurrent.futures.ProcessPoolExecutor(
initializer=_init_process,
initargs=(_result_queue, max_threads, initializer, initargs)
)
_tasks = {}
_result_handler = threading.Thread(target=_handle_results)
_result_handler.start()
def submit(fn, *args, **kwargs):
outer = concurrent.futures.Future()
task_id = id(outer)
_tasks[task_id] = outer
outer.set_running_or_notify_cancel()
inner = _inner.submit(_submit, task_id, fn, *args, **kwargs)
return outer
def _handle_results():
for task_id, ok, result in iter(_result_queue.get, None):
outer = _tasks.pop(task_id)
if ok:
outer.set_result(result)
else:
outer.set_exception(result)
def shutdown(wait=True):
global _result_queue
_inner.shutdown(wait=wait)
if _result_queue:
_result_queue.put(None)
if wait:
_result_handler.join()
_result_queue.close()
_result_queue = None
Like before, we need to copy map()
with minor tweaks.
def _map(fn, *iterables, timeout=None, chunksize=1):
# concurrent.futures._base.Executor.map
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
fs.reverse()
while fs:
if timeout is None:
yield _result_or_cancel(fs.pop())
else:
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
def map(fn, *iterables, timeout=None, chunksize=1):
# concurrent.futures.process.ProcessPoolExecutor.map
if chunksize < 1:
raise ValueError("chunksize must be >= 1.")
results = _map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
return _chain_from_iterable_of_lists(results)
Behold, we can use the module itself as an executor:
>>> ptpe.init()
>>> ptpe.submit(int, '1').result()
1
Of note,
everything that was an instance variable before
is now a global variable;
as a consequence,
only one executor can exist at any given time,
since there's only the one module.3
But it gets worse – calling init()
a second time
will clobber the state of the first executor,
leading to all sorts of bugs;
if we were serious,
we'd prevent it somehow.
Also, some interfaces are more complicated than having the right functions;
defining __enter__
and __exit__
is not enough to use a module in a with
statement, since
the interpreter looks them up on the class of the object,
not on the object itself.
We can work around this with
an alternate "constructor"
that returns a context manager:
@contextmanager
def init_cm(*args, **kwargs):
init(*args, **kwargs)
try:
yield sys.modules[__name__]
finally:
shutdown()
>>> with ptpe.init_cm() as executor:
... assert executor is ptpe
... ptpe.submit(int, '2').result()
...
2
Comparison #
So, how do the solutions stack up? Here's a summary:
pros | cons | |
---|---|---|
inheritance |
|
|
composition |
|
|
functions | ? |
|
I may be a bit biased, but inheritance looks like a clear winner.
Composition over inheritance #
Given that favoring composition over inheritance is usually a good practice, it's worth discussing why inheritance won this time. I see three reasons:
- Composition helps most when you have unrelated components that need to be flexible in response to an evolving business domain; that's not the case here, so we get all the drawbacks with none of the benefits.
- The existing code is designed for inheritance.
- We have a true is-a relationship – ProcessThreadPoolExecutor really is a ProcessPoolExecutor with extra behavior, and not just part of an arbitrary hierarchy.
For a different line of reasoning involving subtyping, check out Hillel Wayne's When to prefer inheritance to composition; he offers this rule of thumb:
So, here's when you want to use inheritance: when you need to instantiate both the parent and child classes and pass them to the same functions.
Forward compatibility #
The inheritance solution assumes map() and any future public ProcessPoolExecutor methods are implemented only in terms of other public methods. This assumption introduces a risk that updates may break our executor; this is lowered by two things:
- concurrent.futures is in the standard library, which rarely does major rewrites of existing code, and never within a minor (X.Y) version; concurrent.futures exists in its current form since Python 3.2, released in 2011.
- concurrent.futures is clearly designed for inheritance, even if mainly to enable internal reuse, and not explicitly documented.
As active mitigations, we can add a basic test suite (which we should do anyway), and document the supported Python versions explicitly (which we should do anyway if we were to release this on PyPI).
If concurrent.futures were not in the standard library, I'd probably go with the composition version instead, although as already mentioned, this wouldn't be free from upkeep either. Another option would be to upstream ProcessThreadPoolExecutor, so that it is maintained together with the code it depends on.
Global state #
The functions-only solution is probably the worst of the three, since it has all the downsides of composition, and significant limitations due to its use of global state.
We could avoid using globals
by passing the state
(process pool executor instance, result queue, etc.)
as function arguments,
but this breaks the executor interface,
and makes for an awful user experience.
We could group common arguments into a single object
so there's only one argument to pass around;
if you call that argument self
,
it becomes obvious that's just a class instance with extra steps.
Having to keep track of a bunch of related globals has enough downsides that even if you do want a module-level API, it's still worth using a class to group them, and exposing the methods of a global instance at module-level (like so); Brandon Rhodes discusses this at length in The Prebound Method Pattern.
Complexity #
While the code is somewhat complex, that's mostly intrinsic to the problem itself (what runs in the main vs. worker processes, passing results around, error handling, and so on), rather than due to our of use classes, which only affects how we refer to ProcessPoolExecutor methods and how we store state.
One could argue that copying a bunch of code doesn't increase complexity, but if you factor in keeping it up to date and tested, it's not exactly free either.
One could also argue that building our executor on top of ProcessPoolExecutor is increasing complexity, and in a way that's true – for example, we have two result queues and had to deal with dead workers too, which wouldn't be the case if we wrote it from scratch; but in turn, that would come with having to understand, maintain, and test 800+ lines of code of low level process management code. Sometimes, complexity I have to care about is more important that total complexity.
Debugging #
I have to come clean at this point – I use print debugging a lot 🙀 (especially if there are no tests yet, and sometimes from tests too); when that doesn't cut it, IPython's embed() usually provides enough interactivity to figure out what's going on.4
With the minimal test at the end of the file
driving the executor,
I used temporary print() calls
in _submit()
, _put_result()
, and __handle_results()
to check data is making its way through properly;
if I expected the code to change more often,
I'd replace them with permanent logging calls.
In addition,
there were two debugging scripts
in the benchmark file
that I didn't show,
one to automate killing workers at the right time,
and one to make sure shutdown()
waits any pending tasks.
So, does how we wrote the code change any of this? Not really, no; all the techniques above (and using a debugger too) apply equally well. If anything, using classes makes interactive debugging easier, since it's easier to discover state via autocomplete (with functions only, you have to know to look it up on the module).
Try it out #
As I've said before, try it out – it only took ~10 minutes to convert the initial solution to the other two. In part, the right code structure is a matter feeling and taste, and both are educated by reading and writing lots of code. If you think there's a better way to do something, do it and see how it looks; it is a sort of deliberate practice.
Learned something new today? Share this with others, it really helps!
Executor is an abstract base class only by convention: it is a base class (other classes are supposed to subclass it), and it is abstract (other classes are supposed to provide concrete implementations for some methods).
Python also allows formalizing abstract base classes using the abc module; see When to use classes in Python? When you repeat similar sets of functions for an example of this and other ways of achieving the same goal. [return]
For brevity, I'm using the version before dealing with dead workers; the final code is similar, but with a more involved
__handle_results
. [return]This is almost true – we could "this is Python" our way deeper and reload the module while still keeping a reference to the old one, but that's just a round-about, unholy way of emulating class instances. [return]
Pro tip: you can use embed() as a breakpoint() hook:
PYTHONBREAKPOINT=IPython.embed python myscript.py
. [return]
This is part of a series: