Zig: Freeing resources referenced in multiple threads
May 21, 2024
As you learn Zig, you'll see examples of memory being allocated and through the use of defer
, freed. Often, these allocations and deallocations are wrapped in init
and deinit
functions. But whatever specific implementation is used, the point is to show a common pattern which is suitable in simple cases. It isn't too much of a leap to take such examples and apply them to more complicated scenarios where allocation and deallocation might happen in different parts of the code.
In Zig's HashMap - Part 2 we saw a common example of more complicated memory management. It is up to us to decide and enforce the lifetime of a hashmap's keys and values. Even here simple cases can be complicated. If we want tie the lifetime of a value to the hashmap, we can store the value directly in the hashmap. But care must be taken since the address of these values can change as the hashmap changes (i.e. as the hashmap grows any references to its values will become invalid). This example, taken from that blog post, crashes since first
becomes an invalid reference as lookup
grows:
const std = @import("std");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var lookup = std.AutoHashMap(usize, usize).init(allocator);
defer lookup.deinit();
try lookup.put(100, 100);
const first = lookup.getPtr(100).?;
for (0..50) |i| {
try lookup.put(i, i);
}
first.* = 200;
}
As systems grow more complex and data lives longer, managing memory tends to get more complicated. One case where this is particularly true is in multithreaded programming. Let's consider a simple cache; we begin with just two fields:
const Cache = struct {
allocator: Allocator,
lookup: std.StringHashMapUnmanaged([]const u8),
pub fn init(allocator: Allocator) Cache {
return .{
.lookup = .{},
.allocator = allocator,
};
}
}
A reasonable first attempt at creating a put
method might go something like:
pub fn put (self: *Cache, key: []const u8, value: []const u8) !void {
const allocator = self.allocator;
const gop = try self.lookup.getOrPut(allocator, key);
if (gop.found_existing) {
allocator.free(gop.key_ptr.*);
allocator.free(gop.value_ptr.*);
}
gop.key_ptr.* = try allocator.dupe(u8, key);
gop.value_ptr.* = try allocator.dupe(u8, value);
}
If we were to implement get
and deinit
, we'd have a skeleton. Now, to make this thread-safe, we'd need to synchronize access to lookup
, probably with a std.Thread.Mutex
or std.Thread.RwLock
. But there's another thread-safety issue in put
, one specific to manual memory management that won't be fixed with just a mutex. Can you spot it?
Imagine that our cache is populated with data when two threads interact with it thusly: one calls cache.get("goku")
while another calls cache.put("goku", "9000!")
. A mutex would serialize access to lookup
, but that wouldn't stop thread 2 from freeing the value now referenced by thread 1. This isn't strictly a multithreading problem. References to a value or a hashmap entry (or whatever), can always be referenced from multiple places. But in a single threaded context, there's almost always a clear lifetime and owner for data. More importantly, there's a simple execution model. Look at it this way: in a single threaded context, if we have:
if (cache.get("goku")) |power| {
...
}
We know that cache
can't be mutated while we're in the if
block (unless we explicit do something to mutate it). Because of this, power
cannot be invalidated or leaked. But in a multithreaded context, while our if
block might not mutate the cache, another thread could. The simple execution model is gone.
I've written at length about having a deep understanding of your data's lifetime and ownership. That leads to clear decisions about where data must be freed: the memory must be freed when it's replaced or when we deinitialize the cache. In a multithreaded context, in addition to that insight, we also need a more generic perspective: the memory must be freed when the last reference is no longer needed.
What does that mean in practice? First, we'll add a new type:
const Entry = struct {
rc: usize,
value: []const u8,
allocator: Allocator,
}
Our lookup
will now be an std.StringHashMapUnmanaged(*Entry)
. The rc
field is a reference counter. Whenever we get an entry, we'll increment rc
by 1. Whenever we're done with an entry, we'll decrement rc
by 1. When rc
reaches 0, the entry can be freed. In other words, we don't track the lifetime of a value (i.e., until it's replaced or until the cache is deinitialized), but rather its usage. Let's implement that now:
const Entry = struct {
rc: usize,
value: []const u8,
allocator: Allocator,
pub fn acquire(self: *Entry) void {
_ = @atomicRmw(usize, &self.rc, .Add, 1, .monotonic);
}
pub fn release(self: *Entry) void {
if (@atomicRmw(usize, &self.rc, .Sub, 1, .monotonic) == 1) {
const allocator = self.allocator;
allocator.free(self.value);
allocator.destroy(self);
}
}
};
@atomicRmw
atomically modifies the value and returns the previous value. That's why we free our value and destroy the entry in release
when it returns 1
: if the "previous value" was 1, then the current value (after we subtracted 1) is 0, meaning there are no more references to our entry.
Now our cache's get
methods. We previously skipped this because, in our simple implementation, the read path didn't participate in memory management. This is no longer true:
pub fn get(self: *Cache, key: []const u8) ?*Entry {
self.lock.lockShared();
defer self.lock.unlockShared();
var entry = self.lookup.get(key) orelse return null;
entry.acquire();
return entry;
}
Without telling you, we've added a std.Thread.RwLock
to our cache which we see here for the first time. We also acquire
the entry, increasing it's rc
counter by 1. We return the full *Entry
because the calling code is responsible for calling release()
:
if (cache.get("goku")) |power| {
defer power.release();
...
}
If you were trying to really optimize this code, you might be tempted to write get
like so:
pub fn get(self: *Cache, key: []const u8) ?*Entry {
self.lock.lockShared();
const entry = self.lookup.get(key);
self.lock.unlockShared();
var e = entry orelse return null;
e.acquire();
return e;
}
Notice that we've narrowed our lock and no longer hold it over the call to acquire
. This seems correct since acquire
atomically increments rc
, right?. But this version is wrong in the same way as or original naive implementation was. Consider entry
just after the lock is released; rc
hasn't been incremented yet. Or put differently, our reference hasn't been registered yet. This opens the small window where another thread could release
its own reference to this entry, possibly causing the entry to clean itself up.
Finally, we have our enhanced put
:
pub fn put(self: *Cache, key: []const u8, value: []const u8) !void {
const allocator = self.allocator;
const entry = try allocator.create(Entry);
errdefer allocator.destroy(entry);
entry.* = .{
.rc = 1,
.allocator = allocator,
.value = try allocator.dupe(u8, value),
};
errdefer entry.release();
var existing: ?*Entry = null;
{
self.lock.lock();
defer self.lock.unlock();
const gop = try self.lookup.getOrPut(allocator, key);
if (gop.found_existing) {
existing = gop.value_ptr.*;
} else {
gop.key_ptr.* = try allocator.dupe(u8, key);
}
gop.value_ptr.* = entry;
}
if (existing) |e| {
e.release();
}
}
There's a lot more going on here the before. First, when we create the entry, we set rc
to 1. This is because the reference that our hashmap has to the entry is like any other. This is a bit more obvious when we consider the second point: to replace an entry, we don't free it, we release()
it. Because the entry might still be referenced by another thread, all we can do is unregister our interest. If we call e.release()
and happen to be the last reference to the entry, then yes the entry will freed. This highlights why our hashmap stores a *Entry
and why, in the above put
, we store entry
on the heap using allocator.create
: the hashmap doesn't own or manage the entry. The entry
isn't owned by anyone and can be shared and referenced by any thread or any part of the code. The reference count is needed, not because we care about how many references there are, but because we want to detect when there are no references.
Not related specifically to multithreaded, but we don't store a copy of the key
with the Entry
. This is an optimization, once we have an entry for a key, say "goku", there's really no need to re-dupe that key. This is because the key, unlike our value, is immutable. In this case, I'd say that the owner of the key is the cache itself. I chose this implementation because it's both more efficient and because it highlights how these different techniques end up living side by side and the impact that has on the complexity of the systems.
Another optimization I've made is to narrow the scope of our write lock. You might be surprised to see that existing.release()
is not called under lock, despite making the case that entry.acquire()
in get
had to be. The entire point of this exercise is to make sure that rc
doesn't become 0
while we still have live references. When we get a new reference we need to make sure no thread decrements rc
before we can increment it. If it does, and if rc
becomes 0, our reference will become invalid. Once we have our reference and incremented rc
to represent our interest, assuming every acquire()
is paired with a release()
, rc
can never reach 0 prematurely and our reference is always valid until we call release()
. After release()
is called, the reference should not be used. But the reality of our implementation means that it'll either be freed as part of the call to release()
or at some indeterminate time in the future.
Reference counting isn't always something you'll need to rely on when sharing data across threads. Some workloads maintain clear ownership and lifetimes across threads. On the flip side, while we looked at referencing counting in the context of a cache, this is a pattern that can be used for other types of shared data. Further, while we looked at a thread-safe implementation of reference counting (known as ARC, Atomically Reference Counted), reference counting in a single thread context can be useful in some situations as well. In these cases we don't need our lock and can simply increment and decrement rc
using +=1
and -=1
.
I wrote a thread-safe, expiration-aware, LRU cache for Zig and this is the solution I ended up adopting to deal with entries being removed or replaced while still being referenced from other threads. I've been meaning to blog about it for a while. More recently, I came across the problem again, in another piece of code. I figured if I keep running into this then others might too.