homedark

TCP Server in Zig - Part 8 - Epoll & Kqueue

Oct 30, 2024

Now that we're more familiar with epoll and kqueue individually, it's time to bring everything together. We'll begin by looking at the possible interaction between evented I/O and threads and then look at writing a basic cross-platform abstraction over the platform-specific epoll and kqueue.

We began our journey with a limited single-threaded loop, and then replaced it with a multithreaded version. However, when we looked at using evented I/O, first via poll and then epoll and kqueue, all of our examples reverted to being single-threaded. This might make you believe that evented I/O and multithreading are mutually exclusive ideas, but they aren't. The main reason we've only discussed evented I/O in the context of a single thread is that we had enough new things to learn without the added complexity of multithreading. Another reason is that there are different ways to weave evented I/O and multithreading together, and which you pick will largely depend on your specific requirement.

In our last complete implementation, we iterated through sockets which were ready, read and echo'd back messages. Basically, we did:

if (filter == system.EVFILT.READ) {
    // this socket is ready to be read
    while (true) {
        const msg = client.readMessage() catch {
            self.closeClient(client);
            break;
        } orelse break;   // no more messages

        client.writeMessage(msg) catch {
            self.closeClient(client);
            break;
        };
    }
}

In a real application, we'll probably want to "process" the incoming message. That processing might involve external I/O or heavy computation. If we do that in the same thread which is waiting for readiness and reading and writing messages, we're going to have similar scaling challenges as our simple single-threaded server. What we might want to do instead is to launch a thread (probably via a thread-pool), to process the message:

if (filter == system.EVFILT.READ) {
    while (true) {
        const msg = client.readMessage() catch {
            self.closeClient(client);
            break;
        } orelse break;

        // process the client message in a separate thread
        try thread_pool.spawn(Client.process, .{client, msg});
    }
}

As a slight alternative, we could even move the call to client.readMessage to the thread:

if (filter == system.EVFILT.READ) {
    try thread_pool.spawn(Client.process, .{client, msg});
}

Adding a thread pool seems easy but we've introduced bugs. While it's more obvious in the first example, both approaches can result in processing concurrent messages for the same client. Neither our Client nor Reader are thread safe. There are a number of possible solutions. We could make Client thread-safe. Making Reader thread-safe is going to be much more difficult though - we need a distinct buffer and position information per message. Even in the first example, where it seems like Reader might not need to be thread-safe, we have bugs. Remember that msg references client.reader.buf. If we want msg to remain valid through the thread's processing of the message, we'll have to clone it - a pretty big compromise.

We could enhance our thread-pool so that a given client is always passed to the same thread. This serializes the processing of all messages for a client - it's similar to how an actor works in Erlang / Elixir. But that has its own consequences - system resources might not be utilized, and a "heavy" client can adversely impact other clients which are assigned to the same thread.

Another option is to use EPOLL.ONESHOT and EV.DISPATCH to limit the number concurrent messages we have per client to one. The downside with this approach is the extra system call needed to re-arm the notification once we're ready to process the next message.

Each solution has trade-offs. If you expect few clients (a couple hundred) and processing is lightweight, you might not even need multithreading. If messages are infrequent, then the cost of re-arming a disabled notification might be insignificant. If you're able to customize a thread pool for your needs, maybe assigning each client a "weight" or "group" to better distribute load, then you might be able to use your thread pool to serialize message processing.

This series won't explore any of these solutions in greater detail - they're too application-specific and aren't directly related to network programming. But, both epoll and kqueue have two additional features we should talk about.

Both epoll and kevent are thread-safe. Specifically, if you're blocked on a call to epoll_wait or kevent you can make changes to the notification list from another thread using epoll_ctl or kevent. So if we did use EPOLL.ONESHOT or EV.DISPATCH and processed the message in a separate thread, we could re-arm the notification within that thread. Re-arming will always require that extra system call, but being able to do it from the thread provides an easy way to ensure we only process one message per client at a time.

There isn't much more to say about this, but it's such an important behavior that I wanted to highlight it in its own section. I will point out that epoll's documentation is unambiguous about this, but I couldn't find anything definitive for kqueue. My own testing shows that this works as expected and online comments mention that kqueue is thread-safe, but it the man pages doesn't mention anything.

You might find yourself waiting to unblock the call to epoll_wait or kevent. A simple use-case would be to initiate a shutdown. A lazy solution is to rely on a short timeout and then check a condition, something like this:

// never wait for less than 1 second.
const ready_events = try self.loop.wait(1000);
if (@atomicLoad(bool, &self.shutdown, .acquire)) {
    return;
}

But, I always find this an awkward compromise between having a wastefully short timeout and a unusable long one.

One option which works on both Linux and BSD/MacOS is to create a pipe. From our point of view, this behaves like a socket - we monitor one end of the pipe and can write to the other:

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    // epoll_create1 takes flags. We aren't using any in these examples
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    const pipe = try posix.pipe();
    defer posix.close(pipe[0]);

    {
        // monitor one end of the pipe
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = pipe[0]}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, pipe[0], &event);
    }

    const thread = try std.Thread.spawn(.{}, shutdown, .{pipe[1]});
    thread.detach();

    var ready_list: [16]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == pipe[0]) {
                std.debug.print("shutting down", .{});
                return;
            }
        }
    }
}

fn shutdown(signal: posix.socket_t) void {
    std.time.sleep(std.time.ns_per_s * 5);
    posix.close(signal);
}

I've removed all of the other socket code to keep the example lean. The point is to show how to create a pipe, and how we can monitor one end, while manipulating the other from a separate thread. Our thread closes the pipe, but we could just have easily written to it. However, as far as I know, it is technically possible to get a partial write to a pipe (just like a socket). So if you're writing more than one byte to the pipe from multiple threads, you'll need to synchronize the writes (although, a partial write to a pipe might be so rare that you don't have to worry about it...I don't know).

While pipe is fine for a one-time only signal like shutdown, and can be made to work for more complex cases, both epoll and kqueue have specific mechanisms that we can leverage. For epoll we can use an eventfd:

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    const signal = try posix.eventfd(0, 0);
    errdefer posix.close(signal);

    {
        // monitor the signal
        var event = linux.epoll_event{.events = linux.EPOLL.IN | linux.EPOLL.ET, .data = .{.fd = signal}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, signal, &event);
    }

    const thread = try std.Thread.spawn(.{}, notify, .{signal});
    thread.detach();

    var ready_list: [16]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == signal) {
                std.debug.print("got notification\n", .{});
            }
        }
    }
}

fn notify(signal: posix.fd_t) void {
    for (0..5) |_| {
        std.time.sleep(std.time.ns_per_s);
        const value: usize = 1;
        _ = posix.write(signal, std.mem.asBytes(&value)) catch break;
    }
}

By default, an eventfd is actually an 8-byte integer value. The value we write to it, 1 above, increments the counter. We can posix.read the eventfd to get the value and reset the counter. The reason we don't read in the above code is because we used EPOLL.ET. We looked at edge-trigger before, but as a recap, if we didn't use EPOLL.ET and we didn't read, a single write would result in endless notifications. With EPOLL.ET, we're only notified once per write, which is exactly what we want. An interesting thing about reading from an eventfd is that the read will always read 8 bytes (provided you give it an 8-byte buffer) or error - it can't under-read like a socket.

In addition to working as a signal, like we did above with EPOLL.ET, or as a counter - requiring a posix.read to get the value and reset it - an eventfd can also act like a semaphore. We created our evenfd using posix.eventfd(0, 0). The first parameter is the initial counter value. We set it to 0 because we're using it as a signaling mechanism, not a counter, so we don't care about the value. The second parameter is for flags. We could have specified linux.EFD.SEMAPHORE which would have turned our eventfd into a semaphore. In this mode read always decrements the counter by 1, and blocks if the counter is 0. If we OR'd the linux.EFD.NONBLOCK flag, then read would return error.WouldBlock rather than blocking.

You might never use the counter or semaphore behaviors of eventfd, but using it as a signal, especially when paired with EPOLL.ET is a simple way to unblock an epoll_wait from a separate thread.

kqueue has its own special mechanism for signaling: user filters. Unlike epoll, we don't have to set anything up:

const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const kfd = try posix.kqueue();
    defer posix.close(kfd);

    const thread = try std.Thread.spawn(.{}, notify, .{kfd});
    thread.detach();

    var ready_list: [15]posix.Kevent = undefined;
    while (true) {
        const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
        for (ready_list[0..ready_count]) |ready| {
            const ready_data = ready.udata;
            if (ready_data == 0) {
                std.debug.print("got notification\n", .{});
            }
        }
    }
}

fn notify(kfd: posix.fd_t) void {
    for (0..5) |_| {
        std.time.sleep(std.time.ns_per_s);
        _ = posix.kevent(kfd, &.{.{
            .ident = 1,
            .filter = posix.system.EVFILT.USER,
            .flags = posix.system.EV.ADD | posix.system.EV.CLEAR,
            .fflags = posix.system.NOTE.TRIGGER,
            .data = 0,
            .udata = 0,
        }}, &.{}, null) catch @panic("TODO");
    }
}

Instead of using a filter of EVFILT.READ or EVFILT.WRITE, as in previous example, we're using EVFILT.USER. The other important part is setting the EV.CLEAR flag to ensure that the notification only triggers once. Setting the fflags to NOTE.TRIGGER is what actually triggers the wakeup.

You might be wondering about the ident field set to 1. In our Kqueue part we mentioned that a monitor is identified by its ident and filter fields. Therefore, while we might add another monitor to something with an ident of 1 (like a file descriptor), that would have a different filter, likely EV.READ or EV.WRITE.

An EVFILT.USER supports special values for the fflags field. We're using NOTE.TRIGGER which is what activates the notification. But we can also store arbitrary data in the lower 24-bits of fflag and can manipulate the value by setting special values, like NOTE.FFAND, to bitwise-AND the existing fflags value with the one we specify. I've got no idea how / where this feature is being used, but it's there.

eventfd and EVFLT.USER are useful thread-safe mechanisms for signalling our accept loop. If you do add a thread pool to process messages, you might find yourself needing to unblock the main thread's epoll_wait or kevent from within the spawned threads. And the reason you want a thread pool is to keep your accept loop lean and free to react to socket readiness.

However, at very large scale, where you might see tens of thousands of clients trying to connect per second, even the leanest accept loop can become a bottleneck. One option is to have multiple threads accepting on the same socket. This mostly works as you'd hope, but there are a few changes we'll want to make. First, the basic code, which works but isn't as efficient as we can make:

const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    var listeners: [2]std.Thread = undefined;
    for (&listeners, 0..) |*l, id| {
        l.* = try std.Thread.spawn(.{}, accept, .{listener, id});
    }

    for (listeners) |l| {
        l.join();
    }
}
fn accept(listener: posix.socket_t, id: usize) !void {
    while (true) {
        var client_address: net.Address = undefined;
        var client_address_len: posix.socklen_t = @sizeOf(net.Address);

        const socket = posix.accept(listener, &client_address.any, &client_address_len, 0) catch |err| {
            // Rare that this happens, but in later parts we'll
            // see examples where it does.
            std.debug.print("error accept: {}\n", .{err});
            continue;
        };
        defer posix.close(socket);

        std.debug.print("[{d}] {} connected\n", .{id, client_address});

        write(socket, "Hello (and goodbye)") catch |err| {
            // This can easily happen, say if the client disconnects.
            std.debug.print("error writing: {}\n", .{err});
        };
    }
}

fn write(socket: posix.socket_t, msg: []const u8) !void {
    var pos: usize = 0;
    while (pos < msg.len) {
        const written = try posix.write(socket, msg[pos..]);
        if (written == 0) {
            return error.Closed;
        }
        pos += written;
    }
}

I won't hold it against you for not noticing, but this is almost identical to our very first implementation from part 1. The only difference is that we've extracted the accept loop into its own function and launched multiple threads to accept connections.

One issue with the above is that the O/S might not distribute new connection very well. At a small scale, this is something you might not notice. For example, when I run the above and try to connect multiple clients, I get:

[0] 127.0.0.1:49284 connected
[1] 127.0.0.1:49285 connected
[0] 127.0.0.1:49286 connected
[1] 127.0.0.1:49287 connected
[0] 127.0.0.1:49288 connected
[1] 127.0.0.1:49289 connected
[0] 127.0.0.1:49290 connected

So while it might not seem that we have a balancing problem, under load, we might. This is easy to fix: set the SO.REUSEPORT socket option:

// We keep the existing SO.REUSEADDR
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));

// We add SO.REUSEPORT
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));

There's a bit more to this story. SO.REUSEPORT is originally a BSD option which allows multiple processes to bind to the same address. When the option was added to Linux, load balancing was included. In the name of backwards compatibility (I assume), BSD didn't add load balancing as a feature to their SO.REUSEPORT, but rather added a different option: SO.REUSEPORT_LB. The simple solution is to see what's defined and use that:

// We keep the existing SO.REUSEADDR
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));

// Add...
if (@hasDecl(posix.SO, "REUSEPORT_LB")) {
    // if available, use SO.REUSEPORT_LB
    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEPORT_LB, &std.mem.toBytes(@as(c_int, 1)));
} else if (@hasDecl(posix.SO, "REUSEPORT")) {
    // else, if available use REUSEPORT
    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
}

Having multiple threads accepting connections doesn't mean that we can't also have a thread pool to process client messages. They're solving different problems. Here we're trying to make sure our accept-loop itself isn't a bottleneck, whereas our above thread pools are trying to make sure our event loop doesn't block while processing a client message.

As we discovered above, one of the neat features of both epoll and kqueue is that they're thread-safe. We can modify an instance from different threads, and those changes will be picked up even by a blocked epoll_wait or kevent. It's great.

But consider our multi-threaded accept loop. That simple example was using a blocking listening socket - would it still work with epoll and kqueue using non-blocking sockets? The short answer is yes. The longer answer is that when multiple threads register their interest in a socket, every thread will be notified of readiness. For example, imagine our listening socket is in non-blocking mode, and our accept function sets up an epoll instance:

fn accept(listener: posix.socket_t, id: usize) !void {
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    // rest of our normal accept loop
}

We now have multiple threads monitoring the same socket. This is perfectly legal, but it isn't efficient. When a socket becomes ready, in this case when one or more client connects, all of the threads will be notified. To deal with this, we have two options. The first is to do nothing, our code already swallows error.WouldBlock on accept. We ignore this error because we accept in a loop, accepting until there are no more new connection. Now this error is going to be more frequent since multiple threads will be competing to accept connections. Like I said, it isn't efficient, but it works.

On new-ish versions of linux, we can set events to linux.EPOLL.IN | linux.EPOLL.EXCLUSIVE. This will limit the number of threads that will be notified. The documentation says that "one or more" threads will be notified. I interpret this to mean: mostly one thread will be notified, but we make no guarantees, so program defensively. And that's fine since we're already ignoring error.WouldBlock.

For BSD, I'm not aware of a solution that's as simple and effective as EPOLL.EXCLUSIVE. As far as I know, the only way to avoid this is to use EV.ONESHOT (discussed briefly in part 7) and then re-arm the notification. Here's the relevant change to KQueue.accept:

fn accept(self: *Server, listener: posix.socket_t) !void {
    const space = self.max - self.connected;
    for (0..space) |_| {
        var address: net.Address = undefined;
        var address_len: posix.socklen_t = @sizeOf(net.Address);
        const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
            error.WouldBlock => {
                // This was added. We have to re-arm the listener.
                try self.loop.addListener(listener);
                return;
            }
            else => return err,
        };

        // ... same as before

    } else {
        // we no longer need to remove the listener if we're full
        // since the listener was added with ONESHOT and willy only notify
        // if it's re-added, which we do above
        // Remove this entire else block.
    }
}

Whatever thread gets woken up needs to re-arm the listener after it's done accepting pending connections. As a small consolation prize for this extra system call, we no longer have to remove our listener when the server is full (because we're using EV.ONESHOT).

One of the things I like most about epoll and kqueue is that they're similar, so building an abstraction for their differences isn't too difficult. We already did most of the work in the previous parts when we extracted most of the logic to an Epoll and KQueue struct. One thing that we're still missing is an abstraction over our ready events. For example, in our epoll code, our loop looks like:

const ready_events = self.loop.wait(next_timeout);
for (ready_events) |ready| {
    switch (ready.data.ptr) {
        0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
        else => |nptr| {
            const events = ready.events;
            const client: *Client = @ptrFromInt(nptr);

            if (events & linux.EPOLL.IN == linux.EPOLL.IN) {
             //...
            }

            //...
        },
    }
}

Details like data.ptr, ready.events and EPOLL.IN need to be moved into our Epoll struct.

Before we solve that, you might be thinking that an interface would be a good option. Alternatively, if you're more familiar with Zig, you might be thinking that a tagged union would make more sense, given that all implementation are known ahead of time.

In my opinion, a better option is to use comptime and set the implementation based on our build target:

const Loop = switch (@import("builtin").os.tag) {
    .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => KQueue,
    .linux => EPoll,
    else => @panic("platform not supported, (yet??)"),
};

Now we can replace any use of Epoll or KQueue with Loop.

Back to finalizing our abstraction. I think the best solution is to make wait return an Iterator. I'll provide the epoll implementation and leave it up to you to write it for kqueue:

// This is what both the Epoll.Iterator and your KQueue.Iterator should yield
const Event = union(enum) {
    accept: void,
    read: *Client,
    write: *Client,
};

const Epoll = struct {
    efd: posix.fd_t,
    ready_list: [128]linux.epoll_event = undefined,

    // ...
    // everything is the same, except for wait
    // ...

    // we now return an Epoll.Iterator
    fn wait(self: *Epoll, timeout: i32) Iterator {
        const ready_list = &self.ready_list;
        const count = posix.epoll_wait(self.efd, ready_list, timeout);

        return .{
            .index = 0,
            .ready_list = ready_list[0..count],
        };
    }

    const Iterator = struct {
        index: usize,
        ready_list: []linux.epoll_event,

        fn next(self: *Iterator) ?Event {
            const index = self.index;
            const ready_list = self.ready_list;
            if (index == ready_list.len) {
                return null;
            }

            self.index = index + 1;
            const ready = ready_list[index];
            switch (ready.data.ptr) {
                0 => return .{ .accept = {} },
                else => |nptr| {
                    const client: *Client = @ptrFromInt(nptr);
                    if (ready.events & linux.EPOLL.IN == linux.EPOLL.IN) {
                        return .{.read = client};
                    }
                    return .{.write = client};
                }
            }
        }
    };
};

We use a tagged union, named Event, to represent the type of readiness our system handles. This can be expanded to support other type, like shutdown. Using this tagged union not only provides a platform-agnostic target for our platform-specific iterators, but it also cleans up our accept loop:

while (true) {
    const next_timeout = self.enforceTimeout();
    var it = self.loop.wait(next_timeout);
    while (it.next()) |ready| {
        switch (ready) {
            .accept => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
            .read => |client| {
                while (true) {
                    // our readMessage Loop...
                }
            },
            .write => |client| client.write() catch self.closeClient(client),
        }
    }
}

We'll need another change to make the two structures compatible. For example, Epoll.wait can't fail, while KQueue.wait can; the above return type needs to change to !Iterator.

There are various patterns and techniques that can be weaved under, above or into the fabric of a TCP server. Hopefully this series has given you a reasonable overview as well as usable code. It probably goes without saying that a lot more could be written about these topics. I do want to highlight that both epoll and kqueue support additional features, like timers. If you're using either, read their documentation thoroughly.

If you're only expecting a handful of clients, consider the thread-per-connection approach. It isn't flashy, but it's simpler to implement and has fewer edge-cases.

Whatever number of clients you're expecting, don't trust them. Obviously a service opened to the public internet needs to be mindful of malicious actors, but even in a closed system, bugs happen. As a basic example, if you're using length-prefixed message and are allocating dynamic memory for large messages, check that the length against a MAX_LENGTH value first. Finally, micro-optimize your code. It's how you'll learn and, if you're dealing with a lot of clients, small changes add up.