This is not interview advice: a priorityexpiry LRU cache without heaps or trees in Python
January 2024 ∙ 24 minute read ∙
It's not your fault I got nerdsniped, but that doesn't matter.
Hi, I'm Adrian, and today we're implementing a least recently used cache with priorities and expiry, using only the Python standard library.
This is a bIG TEch CoDINg InTerVIEW problem, so we'll work hard to stay away from the correct™ data structures – no heaps, no binary search trees – but end up with a decent solution anyway!
Requirements #
So you're at an interview and have to implement a priorityexpiry LRU cache.
Maybe you get more details, but maybe the problem is deliberately vague; either way, we can reconstruct the requirements from the name alone.
A cache is something that stores data for future reuse, usually the result of a slow computation or retrieval. Each piece of data has an associated key used to retrieve it. Most caches are bounded in some way, for example by limiting the number of items.
The other words have to do with eviction – how and when items are removed.
 Each item has a maximum age – items that go past that are expired, so we don't return them. It stands to reason this does not depend on their priority or how full the cache is.
 Each item has a priority – when the cache fills up, we evict items with lower priority before those with higher priority.
 All other things being equal, we evict items least recently used relative to others.
The problem may specify an API; we can reconstruct that from first principles too. Since the cache is basically a keyvalue store, we can get away with two methods:
set(key, value, maxage, priority)
get(key) > value or None
The problem may also suggest:
delete(key)
– allows users to invalidate items for reasons external to the cache; not strictly necessary, but we'll end up with it as part of refactoringevict(now)
– not strictly necessary either, but hints eviction is a separate bit of logic, and may come in handy for testing
Types deserve their own discussion:
key
– usually, the key is a string, but we can relax this to any hashable valuevalue
– for an inmemory cache, any kind of object is finemaxage
andpriority
– a number should do for these; a float is more general, but an integer may allow a simpler solution; limits on these are important too, as we'll see soon enough
Tip
Your interviewer may be expecting you to uncover some of these details through clarifying questions. Be sure to think out loud and state your assumptions.
A minimal plausible solution #
I'm sure there are a lot smart people out there that can Think Really Hard™ and just come up with a solution, but I'm not one of them, so we'll take an iterative, problemsolution approach to this.
Since right now we don't have any solution, we start with the simplest thing that could possibly work^{1} – a basic cache with no fancy eviction and no priorities; we can then write some tests against that, to know if we break anything going forward.
Tip
If during an interview you don't know what to do and choose to work up from the naive solution, make it very clear that's what you're doing. Your interviewer may give you hints to help you skip that.
A class holds all the things we need and gives us something to stick the API on:
class Cache:
def __init__(self, maxsize, time=time.monotonic):
self.maxsize = maxsize
self.time = time
self.cache = {}
And this is our first algorithmic choice: a dict (backed by a hash table) provides average O(1) search / insert / delete.
Tip
Given the problem we're solving, and the context we're solving it in, we have to talk about time complexity. Ned Batchelder's BigO: How Code Slows as Data Grows provides an excellent introduction (text and video available).
set()
leads to more choices:
def set(self, key, value, *, maxage=10, priority=0):
now = self.time()
if key in self.cache:
self.cache.pop(key)
elif len(self.cache) >= self.maxsize:
self.evict(now)
expires = now + maxage
item = Item(key, value, expires, priority)
self.cache[key] = item
First, we evict items only if there's no more room left. (There are other ways of doing this; for example, evicting expired items periodically minimizes memory usage.)
Second, if the key is already in the cache, we remove and insert it again, instead of updating things in place. This way, there's only one code path for setting items, which will make it a lot easier to keep multiple data structures in sync later on.
We use a named tuple to store the parameters associated with a key:
class Item(NamedTuple):
key: object
value: object
expires: int
priority: int
For now, we just evict an arbitrary item; in a happy little coincidence, dicts preserve insertion order, so when iterating over the cache, the oldest key is first.
def evict(self, now):
if not self.cache:
return
key = next(iter(self.cache))
del self.cache[key]
Finally, get()
is trivial:
def get(self, key):
item = self.cache.get(key)
if not item:
return None
if self.time() >= item.expires:
return None
return item.value
With everything in place, here's the first test:
def test_basic():
cache = Cache(2, FakeTime())
assert cache.get('a') == None
cache.set('a', 'A')
assert cache.get('a') == 'A'
cache.set('b', 'B')
assert cache.get('a') == 'A'
assert cache.get('b') == 'B'
cache.set('c', 'C')
assert cache.get('a') == None
assert cache.get('b') == 'B'
assert cache.get('c') == 'C'
To make things predictable, we inject a fake time implementation:
class FakeTime:
def __init__(self, now=0):
self.now = now
def __call__(self):
return self.now
Problem: expired items should go first... #
Following from the requirements, there's an order in which items get kicked out: first expired (lowest expiry time), then lowest priority, and only then least recently used. So, we need a data structure that can efficiently remove the smallest element.
Turns out, there's an abstract data type for that called a priority queue;^{2} for now, we'll honor its abstract nature and not bother with an implementation.
self.cache = {}
self.expires = PriorityQueue()
Since we need the item with the lowest expiry time,
we need a way to get back to the item;
an (expires, key)
tuple should do fine
– since tuples compare lexicographically,
it'll be like comparing by expires
alone,
but with key
along for the ride; in set()
, we add:
self.cache[key] = item
self.expires.push((expires, key))
You may be tempted (like I was) to say
"hey, the item's already a tuple,
if we make expires
the first field,
we can use the item itself",
but let's delay optimizations
until we have and understand a full solution –
make it work, make it right, make it fast.
Still in set()
,
if the key is already in the cache,
we also remove and insert it from the expires queue,
so it's added back with the new expiry time.
if key in self.cache:
item = self.cache.pop(key)
self.expires.remove((item.expires, key))
del item
elif len(self.cache) >= self.maxsize:
self.evict(now)
Moving on to evicting things; for this, we need two operations: first peek at the item that expires next to see if it's expired, then, if it is, pop it from the queue. (Another choice: we only have to evict one item, but evict all expired ones.)
def evict(self, now):
if not self.cache:
return
initial_size = len(self.cache)
while self.cache:
expires, key = self.expires.peek()
if expires > now:
break
self.expires.pop()
del self.cache[key]
if len(self.cache) == initial_size:
_, key = self.expires.pop()
del self.cache[key]
If there are no expired items, we still have to make room for the one item; since we're not handling priorities yet, we'll evict the item that expires next a little early.
Problem: name PriorityQueue is not defined #
OK, to get the code working again, we need a PriorityQueue class. It doesn't need to be fast, we can deal with that after we finish everything else; for now, let's just keep our elements in a plain list.
class PriorityQueue:
def __init__(self):
self.data = []
The easiest way to get the smallest value is to keep the list sorted;
the downside is that push()
is now O(n log n)
– although, because the list is always sorted,
it can be as good as O(n) depending on the implementation.
def push(self, item):
self.data.append(item)
self.data.sort()
This makes peek()
and pop()
trivial;
still, pop()
is O(n),
because it shifts all the items left by one position.
def peek(self):
return self.data[0]
def pop(self):
rv = self.data[0]
self.data[:1] = []
return rv
remove()
is just as simple, and just as O(N),
because it first needs to find the item,
and then shift the ones after it to cover the gap.
def remove(self, item):
self.data.remove(item)
We didn't use the is empty operation, but it should be O(1) regardless of implementation, so let's throw it in anyway:
def __bool__(self):
return bool(self.data)
OK, let's wrap up with a quick test:
def test_priority_queue():
pq = PriorityQueue()
pq.push(1)
pq.push(3)
pq.push(2)
assert pq
assert pq.peek() == 1
assert pq.pop() == 1
assert pq.peek() == 2
assert pq.remove(3) is None
assert pq
assert pq.peek() == 2
with pytest.raises(ValueError):
pq.remove(3)
assert pq.pop() == 2
assert not pq
with pytest.raises(IndexError):
pq.peek()
with pytest.raises(IndexError):
pq.pop()
Now the existing tests pass, and we can add more – first, that expired items are evicted (note how we're moving the time forward):
def test_expires():
cache = Cache(2, FakeTime())
cache.set('a', 'A', maxage=10)
cache.set('b', 'B', maxage=20)
assert cache.get('a') == 'A'
assert cache.get('b') == 'B'
cache.time.now = 15
assert cache.get('a') == None
assert cache.get('b') == 'B'
cache.set('c', 'C')
assert cache.get('a') == None
assert cache.get('b') == 'B'
assert cache.get('c') == 'C'
Second, that setting an existing item changes its expire time:
def test_update_expires():
cache = Cache(2, FakeTime())
cache.set('a', 'A', maxage=10)
cache.set('b', 'B', maxage=10)
cache.time.now = 5
cache.set('a', 'X', maxage=4)
cache.set('b', 'Y', maxage=6)
assert cache.get('a') == 'X'
assert cache.get('b') == 'Y'
cache.time.now = 10
assert cache.get('a') == None
assert cache.get('b') == 'Y'
Problem: ...low priority items second #
Next up, kick out items by priority – shouldn't be too hard, right?
In __init__()
, add another priority queue for priorities:
self.cache = {}
self.expires = PriorityQueue()
self.priorities = PriorityQueue()
In set()
, add new items to the priorities queue:
self.cache[key] = item
self.expires.push((expires, key))
self.priorities.push((priority, key))
...and remove alreadycached items:
if key in self.cache:
item = self.cache.pop(key)
self.expires.remove((item.expires, key))
self.priorities.remove((item.priority, key))
del item
In evict()
, remove expired items from the priorities queue:
while self.cache:
expires, key = self.expires.peek()
if expires > now:
break
self.expires.pop()
item = self.cache.pop(key)
self.priorities.remove((item.priority, key))
...and finally, if none are expired, remove the one with the lowest priority:
if len(self.cache) == initial_size:
_, key = self.priorities.pop()
item = self.cache.pop(key)
self.expires.remove((item.expires, key))
Add one test for eviction by priority:
def test_priorities():
cache = Cache(2, FakeTime())
cache.set('a', 'A', priority=1)
cache.set('b', 'B', priority=0)
assert cache.get('a') == 'A'
assert cache.get('b') == 'B'
cache.set('c', 'C')
assert cache.get('a') == 'A'
assert cache.get('b') == None
assert cache.get('c') == 'C'
...and one for updating the priority of existing items:
def test_update_priorities():
cache = Cache(2, FakeTime())
cache.set('a', 'A', priority=1)
cache.set('b', 'B', priority=0)
cache.set('b', 'Y', priority=2)
cache.set('c', 'C')
assert cache.get('a') == None
assert cache.get('b') == 'Y'
assert cache.get('c') == 'C'
Problem: we're deleting items in three places #
I said we'll postpone performance optimizations until we have a complete solution, but I have a different kind of optimization in mind – for readability.
We're deleting items in three slightly different ways, careful to keep three data structures in sync each time; it would be nice to do it only once. While a bit premature, through the magic of having written the article already, I'm sure it will pay off.
def delete(self, key):
*_, expires, priority = self.cache.pop(key)
self.expires.remove((expires, key))
self.priorities.remove((priority, key))
Sure, eviction is twice as slow, but the complexity stays the the same – the constant factor in O(2n) gets removed, leaving us with O(n). If needed, we can go back to the unpacked version once we have a reasonably efficient implementation (that's what tests are for).
Deleting alreadycached items is shortened to:
if key in self.cache:
self.delete(key)
Idem for the core eviction logic:
while self.cache:
expires, key = self.expires.peek()
if expires > now:
break
self.delete(key)
if len(self.cache) == initial_size:
_, key = self.priorities.peek()
self.delete(key)
Neat!
Problem: ...least recently used items last #
So, how does one implement a least recently used cache?
We could google it... or, we could look at an existing implementation.
functools.lru_cache() #
Standard library functools.lru_cache() comes to mind first; let's have a look.
Tip
You can read the code of stdlib modules by following the Source code: link at the top of each documentation page.
lru_cache() delegates to _lru_cache_wrapper(),
which sets up a bunch of variables to be used by nested functions.^{3}
Among the variables is
a cache
dict
and a doubly linked list
where nodes are [prev, next, key, value] lists.^{4}
And that's the answer – a doubly linked list allows tracking item use in O(1): each time a node is used, remove it from its current position and plop it at the "recently used" end; whatever's at the other end will be the least recently used item.
Note that, unlike lru_cache(), we need one doubly linked list for each priority.
But, before making Item
mutable and giving it prev/next links,
let's dive deeper.
OrderedDict #
If you search the docs for "LRU", the next result after lru_cache() is OrderedDict.
Some differences from dict still remain: [...] The OrderedDict algorithm can handle frequent reordering operations better than dict. [...] this makes it suitable for implementing various kinds of LRU caches.
Specifically:
OrderedDict has a
move_to_end()
method to efficiently reposition an element to an endpoint.
Since dicts preserve insertion order,
you can use d[k] = d.pop(k)
to move items to the end...
What makes move_to_end() better, then?
This comment may shed some light:


Indeed, move_to_end() does exactly what we described above – this is good news, it means we don't have to do it ourselves!
So, we need one OrderedDict (read: doubly linked list) for each priority, but still need to keep track the lowest priority:
self.cache = {}
self.expires = PriorityQueue()
self.priority_buckets = {}
self.priority_order = PriorityQueue()
Handling priorities in set()
gets a bit more complicated:
priority_bucket = self.priority_buckets.get(priority)
if not priority_bucket:
priority_bucket = self.priority_buckets[priority] = OrderedDict()
self.priority_order.push(priority)
priority_bucket[key] = None
But now we can finally evict the least recently used item:
if len(self.cache) == initial_size:
priority = self.priority_order.peek()
priority_bucket = self.priority_buckets.get(priority)
key = next(iter(priority_bucket))
self.delete(key)
In delete()
, we're careful to get rid of empty buckets:^{5}
priority_bucket = self.priority_buckets[priority]
del priority_bucket[key]
if not priority_bucket:
del self.priority_buckets[priority]
self.priority_order.remove(priority)
Existing tests pass again, and we can add a new (still failing) one:
def test_lru():
cache = Cache(2, FakeTime())
cache.set('a', 'A')
cache.set('b', 'B')
cache.get('a') == 'A'
cache.set('c', 'C')
assert cache.get('a') == 'A'
assert cache.get('b') == None
assert cache.get('c') == 'C'
All that's needed to make it pass is to call move_to_end() in get()
:
self.priority_buckets[item.priority].move_to_end(key)
return item.value
Problem: our priority queue is slow #
OK, we have a complete solution, it's time to deal with the priority queue implementation. Let's do a quick recap of the methods we need and why:
push()
– to add itemspeek()
– to get items / buckets with the lowest expiry time / priorityremove()
– to delete itemspop()
– not used, but would be without thedelete()
refactoring
We make two related observations:
first, there's no remove operation on the priority queue Wikipedia page;
second, even if we unpack delete()
,
we only get to pop()
an item/bucket from one of the queues,
and still have to remove()
it from the other.
And this is what makes the problem tricky – we need to maintain not one, but two independent priority queues.
Note
We'll now go through a few data structures in quick succession, which may be a bit overwhelming without preparation. Keep in mind we don't care how they work (not yet, at least), we're just shopping around based on specs.
heapq #
If you search the docs for "priority queue", you'll find heapq, which:
[...] provides an implementation of the heap queue algorithm, also known as the priority queue algorithm.
Reading on, we find extensive notes on implementing priority queues; particularly interesting are using (priority, item) tuples (already doing this!) and removing entries:
Removing the entry or changing its priority is more difficult because it would break the heap structure invariants. So, a possible solution is to mark the entry as removed and add a new entry with the revised priority.
This workaround is needed because while removing the ith element can be done in O(log n), finding its index is O(n). To summarize, we have:
sort  heapq  

push()  O(n)  O(log n) 
peek()  O(1)  O(1) 
pop()  O(n)  O(log n) 
remove()  O(n)  O(n) 
Still, with a few mitigating assumptions, it could work:
 we can assume priorities are static, so buckets never get removed
 toberemoved expiry times will get popped sooner or later anyway; we can assume that most evictions are due to expired items, and that items being evicted due to low priority (i.e. when the cache is full) and item updates are rare (both cause toberemoved entries to accumulate in the expiry queue)
bisect #
One way of finding an element in better than O(n) is bisect, which:
[...] provides support for maintaining a list in sorted order without having to sort the list after each insertion.
This may provide an improvement to our naive implementation; sadly, reading further to Performance Notes we find that:
The insort() functions are O(n) because the logarithmic search step is dominated by the linear time insertion step.
While in general that's better than just about any sort, we happen to be hitting the best case of our sort implementation, which has the same complexity. (Nevertheless, shifting elements is likely cheaper than the same number of comparisons.)
sort  heapq  bisect  

push()  O(n)  O(log n)  O(n) 
peek()  O(1)  O(1)  O(1) 
pop()  O(n)  O(log n)  O(n) 
remove()  O(n)  O(n)  O(n) 
Further down in the docs, there's a see also box:
Sorted Collections is a high performance module that uses bisect to managed sorted collections of data.
Not in stdlib, moving along... ¯\_(ツ)_/¯
pop() optimization #
There's an unrelated improvement that applies to both the naive solution and bisect. With a sorted list, pop() is O(n) because it shifts all elements after the first; if the order was reversed, we'd pop() from the end, which is O(1). So:
sort  heapq  bisect  

push()  O(n)  O(log n)  O(n) 
peek()  O(1)  O(1)  O(1) 
pop()  O(1)*  O(log n)  O(1)* 
remove()  O(n)  O(n)  O(n) 
Binary search trees #
OK, I'm out of ideas, and there's nothing else in stdlib that can help.
We can restate the problem as follows: we need a sorted data structure that can do better than O(n) for push() / remove().
We've already peeked at the Wikipedia priority queue page, so let's keep reading – skipping past the naive implementations, to the usual implementation, we find that:
To improve performance, priority queues are typically based on a heap, [...]
Looked into that, didn't work; next:
Alternatively, when a selfbalancing binary search tree is used, insertion and removal also take O(log n) time [...]
There, that's what we're looking for! (And likely what your interviewer is, too.)
sort  heapq  bisect  BST  

push()  O(n)  O(log n)  O(n)  O(log n) 
peek()  O(1)  O(1)  O(1)  O(log n) 
pop()  O(1)*  O(log n)  O(1)*  O(log n) 
remove()  O(n)  O(log n)  O(n)  O(log n) 
But there's no selfbalancing BST in the standard library, and I sure as hell am not implementing one right now – I still have flashbacks from when I tried to do a redblack tree and two hours later it still had bugs (I mean, look at the length of this explanation!).
After a bit of googling we find, among others, bintrees, a mature library that provides all sorts of binary search trees... except:
Bintrees Development Stopped
Use sortedcontainers instead: https://pypi.python.org/pypi/sortedcontainers
Sounds familiar, doesn't it?
Sorted Containers #
Let's go back to that Sorted Collections library bisect was recommending:
Depends on the Sorted Containers module.
(￢‿￢ )
I remember, I remember now... I'm not salty because the redblack tree took two hours to implement. I'm salty because after all that time, I found Sorted Containers, a pure Python library that is faster in practice than fancy selfbalancing binary search trees implemented in C!
It has extensive benchmarks to prove it, and simulated workload benchmarks for our own use case, priority queues – so yeah, while the interview answer is "selfbalancing BSTs", the actual answer is Sorted Containers.
How does it work? There's an extensive explanation too:^{6}
The Sorted Containers internal implementation is based on a couple observations. The first is that Python’s list is fast, really fast. [...] The second is that bisect.insort^{7} is fast. This is somewhat counterintuitive since it involves shifting a series of items in a list. But modern processors do this really well. A lot of time has been spent optimizing memcopy/memmovelike operations both in hardware and software.
But using only one list and bisect.insort would produce sluggish behavior for lengths exceeding ten thousand. So the implementation of Sorted List uses a list of lists to store elements. [...]
There's also a comparison with trees, which I'll summarize: fewer memory allocations, better cache locality, lower memory overhead, and faster iteration.
I think that gives you a decent idea of how and why it works, enough that with a bit of tinkering you might be able to implement it yourself.^{8}
Problem: Sorted Containers is not in stdlib #
But Sorted Containers is not in the standard library either, and we don't want to implement it ourselves. We did learn something from it, though:
sort  heapq  bisect  bisect <10k  BST  

push()  O(n)  O(log n)  O(n)  O(log n)  O(log n) 
peek()  O(1)  O(1)  O(1)  O(1)  O(log n) 
pop()  O(1)*  O(log n)  O(1)*  O(1)*  O(log n) 
remove()  O(n)  O(log n)  O(n)  O(log n)  O(log n) 
We still need to make some assumptions, though:
 Do we really need more than 10k priorities? Likely no, let's just cap them at 10k.
 Do we really need more than 10k expiry times? Maybe? – with 1 second granularity we can represent only up to 2.7 hours; 10 seconds takes us up to 27 hours, which may just work.
OK, one more and we're done. The other issue, aside from the maximum time, is that the granularity is too low, especially for short times – rounding 1 to 10 seconds is much worse than rounding 2001 to 2010 seconds. Which begs the question –
 Does it really matter if items expire in 2010 seconds instead of 2001? Likely no, but we need a way to round small values with higher granularity than big ones.
Logarithmic time #
How about 1, 2, 4, 8, ...? Rounding up to powers of 2 gets us decreasing granularity, but time doesn't actually start at zero. We fix this by rounding up to multiples of powers of 2 instead; let's get an intuition of how it works:
ceil((2000 + 1) / 1) * 1 = 2001
ceil((2000 + 2) / 2) * 2 = 2002
ceil((2000 + 3) / 4) * 4 = 2004
ceil((2000 + 4) / 4) * 4 = 2004
ceil((2000 + 15) / 16) * 16 = 2016
ceil((2000 + 16) / 16) * 16 = 2016
ceil((2000 + 17) / 32) * 32 = 2048
So far so good, how about after some time has passed?
ceil((2013 + 1) / 1) * 1 = 2014
ceil((2013 + 2) / 2) * 2 = 2016
ceil((2013 + 3) / 4) * 4 = 2016
ceil((2013 + 4) / 4) * 4 = 2020
ceil((2013 + 15) / 16) * 16 = 2032
ceil((2013 + 16) / 16) * 16 = 2032
ceil((2013 + 17) / 32) * 32 = 2048
The beauty of aligned powers is that for a relatively constant number of expiry times, the number of buckets remains roughly the same over time – as closely packed buckets are removed from the beginning, new ones fill the gaps between the sparser ones towards the end.
OK, let's put it into code:
def log_bucket(now, maxage):
next_power = 2 ** math.ceil(math.log2(maxage))
expires = now + maxage
bucket = math.ceil(expires / next_power) * next_power
return bucket
>>> [log_bucket(0, i) for i in [1, 2, 3, 4, 15, 16, 17]]
[1, 2, 4, 4, 16, 16, 32]
>>> [log_bucket(2000, i) for i in [1, 2, 3, 4, 15, 16, 17]]
[2001, 2002, 2004, 2004, 2016, 2016, 2048]
>>> [log_bucket(2013, i) for i in [1, 2, 3, 4, 15, 16, 17]]
[2014, 2016, 2016, 2020, 2032, 2032, 2048]
Looking good!
There are two sources of error –
first from rounding maxage
,
worst when it's one more than a power of 2,
and second from rounding the expiry time,
also worst when it's one more than a power of two.
Together, they approach 200% of maxage
:
>>> log_bucket(0, 17) # (32  17) / 17 ~= 88%
32
>>> log_bucket(0, 33) # (64  33) / 33 ~= 94%
64
>>> log_bucket(16, 17) # (64  31) / 17 ~= 182%
64
>>> log_bucket(32, 33) # (128  64) / 33 ~= 191%
128
200% error is quite a lot; before we set to fix it, let's confirm our reasoning.
def error(now, maxage, *args):
"""log_bucket() error."""
bucket = log_bucket(now, maxage, *args)
return (bucket  now) / maxage  1
def max_error(now, max_maxage, *args):
"""Worst log_bucket() error for all maxages up to max_maxage."""
return max(
error(now, maxage, *args)
for maxage in range(1, max_maxage)
)
def max_error_random(n, *args):
"""Worst log_bucket() error for random inputs, out of n tries."""
max_now = int(time.time()) * 2
max_maxage = 3600 * 24 * 31
rand = functools.partial(random.randint, 1)
return max(
error(rand(max_now), rand(max_maxage), *args)
for _ in range(n)
)
>>> max_error(0, 10_000)
0.9997558891736849
>>> max_error(2000, 10_000)
1.9527896995708156
>>> max_error_random(10_000_000)
1.9995498725910554
Looks confirmed enough to me.
So, how do we make the error smaller? Instead of rounding to the next power of 2, we round to the next half of a power of 2, or next quarter, or next eighth...
def log_bucket(now, maxage, shift=0):
next_power = 2 ** max(0, math.ceil(math.log2(maxage)  shift))
expires = now + maxage
bucket = math.ceil(expires / next_power) * next_power
return bucket
It seems to be working:
>>> for s in range(5):
... print([log_bucket(0, i, s) for i in [1, 2, 3, 4, 15, 16, 17]])
...
[1, 2, 4, 4, 16, 16, 32]
[1, 2, 4, 4, 16, 16, 32]
[1, 2, 3, 4, 16, 16, 24]
[1, 2, 3, 4, 16, 16, 20]
[1, 2, 3, 4, 15, 16, 18]
>>> for s in range(10):
... e = max_error_random(1_000_000, s)
... print(f'{s} {e:6.1%}')
...
0 199.8%
1 99.9%
2 50.0%
3 25.0%
4 12.5%
5 6.2%
6 3.1%
7 1.6%
8 0.8%
9 0.4%
With shift=7
, the error is less that two percent;
I wonder how many buckets that is...
def max_buckets(max_maxage, *args):
"""Number of buckets to cover all maxages up to max_maxage."""
now = time.time()
buckets = {
log_bucket(now, maxage, *args)
for maxage in range(1, max_maxage)
}
return len(buckets)
>>> max_buckets(3600 * 24, 7)
729
>>> max_buckets(3600 * 24 * 31, 7)
1047
>>> max_buckets(3600 * 24 * 365, 7)
1279
A bit over a thousand buckets for the whole year, not bad!
Before we can use any of that, we need to convert expiry times to buckets; that looks a lot like the priority buckets code, the only notable part being eviction.
__init__()
:
self.cache = {}
self.expires_buckets = {}
self.expires_order = PriorityQueue()
self.priority_buckets = {}
self.priority_order = PriorityQueue()
set()
:
expires_bucket = self.expires_buckets.get(expires)
if not expires_bucket:
expires_bucket = self.expires_buckets[expires] = set()
self.expires_order.push(expires)
expires_bucket.add(key)
delete()
:
expires_bucket = self.expires_buckets[expires]
expires_bucket.remove(key)
if not expires_bucket:
del self.expires_buckets[expires]
self.expires_order.remove(expires)
evict()
:
while self.cache:
expires = self.expires_order.peek()
if expires > now:
break
expires_bucket = self.expires_buckets[expires]
for key in list(self.expires_buckets[expires]):
self.delete(key)
And now we use log_bucket()
.
Since we're at it,
why not have unlimited priorities too?
A hammer is a hammer and everything is a nail, after all.
expires = log_bucket(now, maxage, shift=7)
priority = log_bucket(0, priority+1, shift=7)
item = Item(key, value, expires, priority)
bisect, redux #
Time to fix that priority queue.
We use insort() to add priorities and operator.neg() to keep the list reversed:^{9}
def push(self, item):
bisect.insort(self.data, item, key=operator.neg)
We update peek()
and pop()
to handle the reverse order:
def peek(self):
return self.data[1]
def pop(self):
return self.data.pop()
Finally, for remove()
we adapt the index()
recipe
from Searching Sorted Lists:
def remove(self, item):
i = bisect.bisect_left(self.data, item, key=operator.neg)
if i != len(self.data) and self.data[i] == item:
del self.data[i]
return
raise ValueError
And that's it, we're done!
Here's the cache in its full glory (click to expand):
class Cache:
def __init__(self, maxsize, time=time.monotonic):
self.maxsize = maxsize
self.time = time
self.cache = {}
self.expires_buckets = {}
self.expires_order = PriorityQueue()
self.priority_buckets = {}
self.priority_order = PriorityQueue()
def get(self, key):
item = self.cache.get(key)
if not item:
return None
if self.time() >= item.expires:
return None
self.priority_buckets[item.priority].move_to_end(key)
return item.value
def set(self, key, value, *, maxage=10, priority=0):
now = self.time()
if key in self.cache:
self.delete(key)
elif len(self.cache) >= self.maxsize:
self.evict(now)
expires = log_bucket(now, maxage, shift=7)
priority = log_bucket(0, priority+1, shift=7)
item = Item(key, value, expires, priority)
self.cache[key] = item
expires_bucket = self.expires_buckets.get(expires)
if not expires_bucket:
expires_bucket = self.expires_buckets[expires] = set()
self.expires_order.push(expires)
expires_bucket.add(key)
priority_bucket = self.priority_buckets.get(priority)
if not priority_bucket:
priority_bucket = self.priority_buckets[priority] = OrderedDict()
self.priority_order.push(priority)
priority_bucket[key] = None
def evict(self, now):
if not self.cache:
return
initial_size = len(self.cache)
while self.cache:
expires = self.expires_order.peek()
if expires > now:
break
expires_bucket = self.expires_buckets[expires]
for key in list(self.expires_buckets[expires]):
self.delete(key)
if len(self.cache) == initial_size:
priority = self.priority_order.peek()
priority_bucket = self.priority_buckets.get(priority)
key = next(iter(priority_bucket))
self.delete(key)
def delete(self, key):
*_, expires, priority = self.cache.pop(key)
expires_bucket = self.expires_buckets[expires]
expires_bucket.remove(key)
if not expires_bucket:
del self.expires_buckets[expires]
self.expires_order.remove(expires)
priority_bucket = self.priority_buckets[priority]
del priority_bucket[key]
if not priority_bucket:
del self.priority_buckets[priority]
self.priority_order.remove(priority)
class Item(NamedTuple):
key: object
value: object
expires: int
priority: int
class PriorityQueue:
def __init__(self):
self.data = []
def push(self, item):
bisect.insort(self.data, item, key=operator.neg)
def peek(self):
return self.data[1]
def pop(self):
return self.data.pop()
def remove(self, item):
i = bisect.bisect_left(self.data, item, key=operator.neg)
if i != len(self.data) and self.data[i] == item:
del self.data[i]
return
raise ValueError
def __bool__(self):
return bool(self.data)
def log_bucket(now, maxage, shift=0):
next_power = 2 ** max(0, math.ceil(math.log2(maxage)  shift))
expires = now + maxage
bucket = math.ceil(expires / next_power) * next_power
return bucket
(The entire file, with tests and everything.)
Conclusion #
Anyone expecting you to implement this in under an hour is delusional. Explaining what you would use and why should be enough for reasonable interviewers, although that may prove difficult if you haven't solved this kind of problem before.
Bullshit interviews aside, it is useful to have basic knowledge of time complexity. Again, can't recommend BigO: How Code Slows as Data Grows enough.
But, what big O notation says and what happens in practice can differ quite a lot. Be sure to measure, and be sure to think of limits – sometimes, the n in O(n) is or can be made small enough you don't have to do the theoretically correct thing.
You don't need to know how to implement all the data structures, that's what (software) libraries and Wikipedia are for (and for that matter, book libraries too). However, it is useful to have an idea of what's available and when to use it.
Good libraries educate – the Python standard library docs already cover a lot of the practical knowledge we needed, and so did Sorted Containers. But, that won't show up in the API reference you see in your IDE, you have read the actual documentation.
Learned something new today? Share this with others, it really helps!
Note the subtitle: if you're not sure what to do yet. ^{[return]}
This early on, the name doesn't really matter, but we'll go with the correct, descriptive one; in the first draft of the code, it was called MagicDS. ✨ ^{[return]}
You have to admit this is at least a bit weird; what you're looking at is an object in a trench coat, at least if you think closures and objects are equivalent. ^{[return]}
Another way of getting an "object" on the cheap. ^{[return]}
If we assume a relatively small number of buckets that will be reused soon enough, this isn't strictly necessary. I'm partly doing it to release the memory held by the dict, since dicts are resized only when items are added. ^{[return]}
There's even a PyCon talk with the same explanation, if you prefer that. ^{[return]}
bisect itself has a fast C implementation, so I guess technically it's not pure Python. But given that stdlib is already there, does that count? ^{[return]}
If the implementation is easy to explain, it may be a good idea. ^{[return]}
This limits priorities to values that can be negated, so tuples won't work anymore. We could use a "reversed view" wrapper if we really cared about that. ^{[return]}