Inheritance over composition, sometimes

July 2025 ∙ 12 minute read ∙

In Process​Thread​Pool​Executor: when I‍/‍O becomes CPU-bound, we built a hybrid concurrent.​futures executor that runs tasks in multiple threads on all available CPUs, bypassing Python's global interpreter lock.

Here's some interesting reader feedback:

Currently, the code is complex due to subclassing and many layers of delegation. Could this solution be implemented using only functions, no classes? Intuitively I feel classes would be hell to debug.

Since a lot of advanced beginners struggle with structuring code, we'll implement the same executor using inheritance, composition, and functions only, compare the solutions, and reach some interesting conclusions. Consider this a worked example.

Note

Today we're focusing on code structure. While not required, reading the original article will give you a better idea of why the code does what it does.

Requirements #

Before we delve into the code, we should have some understanding of what we're building. The orginal article sets out the following functional requirements:

  1. Implement the Executor interface; we want a drop-in replacement for existing concurrent.​futures executors, so that user code doesn't have to change.
  2. Spread the work to one worker process per CPU, and then further to multiple threads inside each worker, to work around CPU becoming a bottleneck for I‍/‍O.

Additionally, we have two implicit non-functional requirements:

  1. Use the existing executors where possible (less code means fewer bugs).
  2. Only depend on stable, documented features; we don't want our code to break when concurrent.​futures internals change.

concurrent.futures #

Since we're building on top of concurrent.​futures, we should also get familiar with it; the docs already provide a great introduction:

The concurrent.​futures module provides a high-level interface for asynchronously executing callables. [...this] can be performed with threads, using Thread​Pool​Executor, or separate processes, using Process​Pool​Executor. Both implement the same interface, which is defined by the abstract Executor class.

Let's look at the classes in more detail.

Executor is an abstract base class1 defined in concurrent.​futures.​_base. It provides dummy submit() and shutdown() methods, a concrete map() method implemented in terms of submit(), and context manager methods that shutdown() the executor on exit. Notably, the documentation does not mention the concrete methods, instead saying that the class "should not be used directly, but through its concrete subclasses".

The first subclass, Thread​Pool​Executor, is defined in concurrent.​futures.​thread; it implements submit() and shutdown(), inheriting map() unchanged.

The second one, Process​Pool​Executor, is defined in concurrent.​futures.​process; as an optimization, it overrides map() to chop the input iterables and pass the chunks to the superclass method with super().

Three solutions #

Now we're ready for code.

Inheritance #

First, the original implementation,2 arguably a textbook example of inheritance.

We override __init__, submit(), and shutdown(), and do some extra stuff on top of the inherited behavior, which we access through super(). We inherit the context manager methods, map(), and any public methods Process​Pool​Executor may get in the future, assuming they use only other public methods (more on this below).

class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        self.__result_queue = multiprocessing.Queue()
        super().__init__(
            initializer=_init_process,
            initargs=(self.__result_queue, max_threads, initializer, initargs)
        )
        self.__tasks = {}
        self.__result_handler = threading.Thread(target=self.__handle_results)
        self.__result_handler.start()

    def submit(self, fn, *args, **kwargs):
        outer = concurrent.futures.Future()
        task_id = id(outer)
        self.__tasks[task_id] = outer

        outer.set_running_or_notify_cancel()
        inner = super().submit(_submit, task_id, fn, *args, **kwargs)

        return outer

    def __handle_results(self):
        for task_id, ok, result in iter(self.__result_queue.get, None):
            outer = self.__tasks.pop(task_id)
            if ok:
                outer.set_result(result)
            else:
                outer.set_exception(result)

    def shutdown(self, wait=True):
        super().shutdown(wait=wait)
        if self.__result_queue:
            self.__result_queue.put(None)
            if wait:
                self.__result_handler.join()
            self.__result_queue.close()
            self.__result_queue = None

Because we're subclassing a class with private, undocumented attributes, our private attributes have to start with double underscores to avoid clashes with superclass ones (such as _result_queue).

In addition to the main class, there are some global functions used in the worker processes which remain unchanged regardless of the solution:

# this code runs in each worker process

_executor = None
_result_queue = None

def _init_process(queue, max_threads, initializer, initargs):
    global _executor, _result_queue

    _executor = concurrent.futures.ThreadPoolExecutor(max_threads)
    _result_queue = queue

    if initializer:
        initializer(*initargs)

def _submit(task_id, fn, *args, **kwargs):
    task = _executor.submit(fn, *args, **kwargs)
    task.task_id = task_id
    task.add_done_callback(_put_result)

def _put_result(task):
    if exception := task.exception():
        _result_queue.put((task.task_id, False, exception))
    else:
        _result_queue.put((task.task_id, True, task.result()))

Download the entire file.

Composition #

OK, now let's use composition – instead of being a Process​Pool​Executor, our Process​Thread​Pool​Executor has one. At a first glance, the result is the same as before, with super() changed to self._inner:

class ProcessThreadPoolExecutor:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        self._result_queue = multiprocessing.Queue()
        self._inner = concurrent.futures.ProcessPoolExecutor(
            initializer=_init_process,
            initargs=(self._result_queue, max_threads, initializer, initargs)
        )
        self._tasks = {}
        self._result_handler = threading.Thread(target=self._handle_results)
        self._result_handler.start()

    def submit(self, fn, *args, **kwargs):
        outer = concurrent.futures.Future()
        task_id = id(outer)
        self._tasks[task_id] = outer

        outer.set_running_or_notify_cancel()
        inner = self._inner.submit(_submit, task_id, fn, *args, **kwargs)

        return outer

    def _handle_results(self):
        for task_id, ok, result in iter(self._result_queue.get, None):
            outer = self._tasks.pop(task_id)
            if ok:
                outer.set_result(result)
            else:
                outer.set_exception(result)

    def shutdown(self, wait=True):
        self._inner.shutdown(wait=wait)
        if self._result_queue:
            self._result_queue.put(None)
            if wait:
                self._result_handler.join()
            self._result_queue.close()
            self._result_queue = None

Except, we need to implement the context manager protocol ourselves:

    def __enter__(self):
        # concurrent.futures._base.Executor.__enter__
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # concurrent.futures._base.Executor.__exit__
        self.shutdown(wait=True)
        return False

...and we need to copy map() from Executor, since it should use our submit():

    def _map(self, fn, *iterables, timeout=None, chunksize=1):
        # concurrent.futures._base.Executor.map

        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        def result_iterator():
            try:
                fs.reverse()
                while fs:
                    if timeout is None:
                        yield _result_or_cancel(fs.pop())
                    else:
                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

...and the chunksize optimization from its Process​Pool​Executor version:

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        # concurrent.futures.process.ProcessPoolExecutor.map

        if chunksize < 1:
            raise ValueError("chunksize must be >= 1.")

        results = self._map(partial(_process_chunk, fn),
                            itertools.batched(zip(*iterables), chunksize),
                            timeout=timeout)
        return _chain_from_iterable_of_lists(results)
...and a bunch of private functions they use.
def _result_or_cancel(fut, timeout=None):
    # concurrent.futures._base._result_or_cancel
    try:
        try:
            return fut.result(timeout)
        finally:
            fut.cancel()
    finally:
        del fut

def _process_chunk(fn, chunk):
    # concurrent.futures.process._process_chunk
    return [fn(*args) for args in chunk]

def _chain_from_iterable_of_lists(iterable):
    # concurrent.futures.process._chain_from_iterable_of_lists
    for element in iterable:
        element.reverse()
        while element:
            yield element.pop()

And, when the Executor interface gets new methods, we'll need to at least forward them to the inner executor, although we may have to copy those too.

On the upside, no base class means we can name attributes however we want.

Download the entire file.


But this is Python, why do we need to copy stuff? In Python, methods are just functions, so we could almost get away with this:

class ProcessThreadPoolExecutor:
    ... # __init__, submit(), and shutdown() just as before
    __enter__ = ProcessPoolExecutor.__enter__
    __exit__ = ProcessPoolExecutor.__exit__
    map = ProcessPoolExecutor.map

Alas, it won't work – Process​Pool​Executor map() calls super().​map(), and object, the superclass of our executor, has no such method, which is why we had to change it to self.​_map() in our copy in the first place.

Functions #

Can this be done using only functions, though?

Theoretically no, since we need to implement the executor interface. Practically yes, since this is Python, where an "interface" just means having specific attributes, usually functions with specific signatures. For example, a module like this:

def init(max_threads=None, initializer=None, initargs=()):
    global _result_queue, _inner, _tasks, _result_handler
    _result_queue = multiprocessing.Queue()
    _inner = concurrent.futures.ProcessPoolExecutor(
        initializer=_init_process,
        initargs=(_result_queue, max_threads, initializer, initargs)
    )
    _tasks = {}
    _result_handler = threading.Thread(target=_handle_results)
    _result_handler.start()

def submit(fn, *args, **kwargs):
    outer = concurrent.futures.Future()
    task_id = id(outer)
    _tasks[task_id] = outer

    outer.set_running_or_notify_cancel()
    inner = _inner.submit(_submit, task_id, fn, *args, **kwargs)

    return outer

def _handle_results():
    for task_id, ok, result in iter(_result_queue.get, None):
        outer = _tasks.pop(task_id)
        if ok:
            outer.set_result(result)
        else:
            outer.set_exception(result)

def shutdown(wait=True):
    global _result_queue
    _inner.shutdown(wait=wait)
    if _result_queue:
        _result_queue.put(None)
        if wait:
            _result_handler.join()
        _result_queue.close()
        _result_queue = None
Like before, we need to copy map() with minor tweaks.
def _map(fn, *iterables, timeout=None, chunksize=1):
    # concurrent.futures._base.Executor.map

    if timeout is not None:
        end_time = timeout + time.monotonic()

    fs = [submit(fn, *args) for args in zip(*iterables)]

    def result_iterator():
        try:
            fs.reverse()
            while fs:
                if timeout is None:
                    yield _result_or_cancel(fs.pop())
                else:
                    yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

def map(fn, *iterables, timeout=None, chunksize=1):
    # concurrent.futures.process.ProcessPoolExecutor.map

    if chunksize < 1:
        raise ValueError("chunksize must be >= 1.")

    results = _map(partial(_process_chunk, fn),
                   itertools.batched(zip(*iterables), chunksize),
                   timeout=timeout)
    return _chain_from_iterable_of_lists(results)

Behold, we can use the module itself as an executor:

>>> ptpe.init()
>>> ptpe.submit(int, '1').result()
1

Of note, everything that was an instance variable before is now a global variable; as a consequence, only one executor can exist at any given time, since there's only the one module.3 But it gets worse – calling init() a second time will clobber the state of the first executor, leading to all sorts of bugs; if we were serious, we'd prevent it somehow.

Also, some interfaces are more complicated than having the right functions; defining __enter__ and __exit__ is not enough to use a module in a with statement, since the interpreter looks them up on the class of the object, not on the object itself. We can work around this with an alternate "constructor" that returns a context manager:

@contextmanager
def init_cm(*args, **kwargs):
    init(*args, **kwargs)
    try:
        yield sys.modules[__name__]
    finally:
        shutdown()
>>> with ptpe.init_cm() as executor:
...     assert executor is ptpe
...     ptpe.submit(int, '2').result()
...
2

Download the entire file.

Liking this so far? Here's another article you might like:

Comparison #

So, how do the solutions stack up? Here's a summary:

pros cons
inheritance
  • least amount of code
  • inherits new high level methods for free
  • assumes inherited high level methods use only the public API
  • attribute names have to start with double underscores (minor)
composition
  • attributes can have any name (minor)
  • copies lots of code
  • must be kept in sync with the interface
functions ?
  • copies lots of code
  • must be kept in sync with the interface
  • only one global executor at a time
  • state is harder to discover
  • alternate "constructor" to use as context manager (minor)

I may be a bit biased, but inheritance looks like a clear winner.

Composition over inheritance #

Given that favoring composition over inheritance is usually a good practice, it's worth discussing why inheritance won this time. I see three reasons:

  1. Composition helps most when you have unrelated components that need to be flexible in response to an evolving business domain; that's not the case here, so we get all the drawbacks with none of the benefits.
  2. The existing code is designed for inheritance.
  3. We have a true is-a relationship – Process​Thread​Pool​Executor really is a Process​Pool​Executor with extra behavior, and not just part of an arbitrary hierarchy.

For a different line of reasoning involving subtyping, check out Hillel Wayne's When to prefer inheritance to composition; he offers this rule of thumb:

So, here's when you want to use inheritance: when you need to instantiate both the parent and child classes and pass them to the same functions.

Forward compatibility #

The inheritance solution assumes map() and any future public Process​Pool​Executor methods are implemented only in terms of other public methods. This assumption introduces a risk that updates may break our executor; this is lowered by two things:

  1. concurrent.​futures is in the standard library, which rarely does major rewrites of existing code, and never within a minor (X.Y) version; concurrent.​futures exists in its current form since Python 3.2, released in 2011.
  2. concurrent.​futures is clearly designed for inheritance, even if mainly to enable internal reuse, and not explicitly documented.

As active mitigations, we can add a basic test suite (which we should do anyway), and document the supported Python versions explicitly (which we should do anyway if we were to release this on PyPI).

If concurrent.​futures were not in the standard library, I'd probably go with the composition version instead, although as already mentioned, this wouldn't be free from upkeep either. Another option would be to upstream Process​Thread​Pool​Executor, so that it is maintained together with the code it depends on.

Global state #

The functions-only solution is probably the worst of the three, since it has all the downsides of composition, and significant limitations due to its use of global state.

We could avoid using globals by passing the state (process pool executor instance, result queue, etc.) as function arguments, but this breaks the executor interface, and makes for an awful user experience. We could group common arguments into a single object so there's only one argument to pass around; if you call that argument self, it becomes obvious that's just a class instance with extra steps.

Having to keep track of a bunch of related globals has enough downsides that even if you do want a module-level API, it's still worth using a class to group them, and exposing the methods of a global instance at module-level (like so); Brandon Rhodes discusses this at length in The Prebound Method Pattern.

Complexity #

While the code is somewhat complex, that's mostly intrinsic to the problem itself (what runs in the main vs. worker processes, passing results around, error handling, and so on), rather than due to our of use classes, which only affects how we refer to Process​Pool​Executor methods and how we store state.

One could argue that copying a bunch of code doesn't increase complexity, but if you factor in keeping it up to date and tested, it's not exactly free either.

One could also argue that building our executor on top of Process​Pool​Executor is increasing complexity, and in a way that's true – for example, we have two result queues and had to deal with dead workers too, which wouldn't be the case if we wrote it from scratch; but in turn, that would come with having to understand, maintain, and test 800+ lines of code of low level process management code. Sometimes, complexity I have to care about is more important that total complexity.

Debugging #

I have to come clean at this point – I use print debugging a lot 🙀 (especially if there are no tests yet, and sometimes from tests too); when that doesn't cut it, IPython's embed() usually provides enough interactivity to figure out what's going on.4

With the minimal test at the end of the file driving the executor, I used temporary print() calls in _submit(), _put_result(), and __handle_results() to check data is making its way through properly; if I expected the code to change more often, I'd replace them with permanent logging calls.

In addition, there were two debugging scripts in the benchmark file that I didn't show, one to automate killing workers at the right time, and one to make sure shutdown() waits any pending tasks.

So, does how we wrote the code change any of this? Not really, no; all the techniques above (and using a debugger too) apply equally well. If anything, using classes makes interactive debugging easier, since it's easier to discover state via autocomplete (with functions only, you have to know to look it up on the module).

Try it out #

As I've said before, try it out – it only took ~10 minutes to convert the initial solution to the other two. In part, the right code structure is a matter feeling and taste, and both are educated by reading and writing lots of code. If you think there's a better way to do something, do it and see how it looks; it is a sort of deliberate practice.

Learned something new today? Share this with others, it really helps!

  1. Executor is an abstract base class only by convention: it is a base class (other classes are supposed to subclass it), and it is abstract (other classes are supposed to provide concrete implementations for some methods).

    Python also allows formalizing abstract base classes using the abc module; see When to use classes in Python? When you repeat similar sets of functions for an example of this and other ways of achieving the same goal. [return]

  2. For brevity, I'm using the version before dealing with dead workers; the final code is similar, but with a more involved __handle_results. [return]

  3. This is almost true – we could "this is Python" our way deeper and reload the module while still keeping a reference to the old one, but that's just a round-about, unholy way of emulating class instances. [return]

  4. Pro tip: you can use embed() as a breakpoint() hook: PYTHONBREAKPOINT=IPython.embed python myscript.py. [return]


This is part of a series: