homedark

TCP Server in Zig - Part 5a - Poll

Oct 15, 2024

One of the reasons we introduced multithreading was to get around that fact that our read and, to a lesser extent, accept and write, block. In our initial single-threaded implementation, rather than pushing our server to its limits, we spent a lot of time idle, waiting for data to come in. Multithreading helped to unblock the main thread so that new connections could be accepted - as long as we had enough workers to handle them - while processing existing connections. But threads are relatively heavyweight constructs and it isn't particularly efficient to spawn them and then have them blocked waiting for data.

There are two complimentary parts to improving our design: non-blocking I/O and event-notification.

It's possible to put a socket in non-blocking mode. When enabled, functions which normally block, such as accept, read or write, will return error.WouldBlock (or EAGAIN in C) rather than blocking. As we're about to see, it's hard to take advantage of this on its own, but we're looking at it first, to get a feel for it. Consider what happens if we go to one of our earlier single-threaded implementations and enable non-blocking sockets (two lines have been changed, both commented):

const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    // ADDED: ` | posix.SOCK.NONBLOCK`
    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    var buf: [128]u8 = undefined;
    while (true) {
        // Replaced: `0` with `posix.SOCK.NONBLOCK`
        const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
            std.debug.print("error accept: {}\n", .{err});
            continue;
        };
        defer posix.close(socket);

        const stream = std.net.Stream{.handle = socket};
        const read = try stream.read(&buf);
        if (read == 0) {
            continue;
        }
        try stream.writeAll(buf[0..read]);
    }
}

When we create our listening socket, using posix.socket, we now set the SOCK.NONBLOCK flag. Similarly, when we accept we now pass that same flag, SOCK.NONBLOCK, as our fourth parameter. The first usage puts our listening socket in non-blocking mode (we'll see what that accomplishes shortly). The second usage puts a newly connected socket in non-blocking mode. This second usage is a special-case. There are actually two accept system calls: accept and accept4. The first one, accept only takes 3 parameters whereas accept4 takes a 4th parameter. That 4th parameter is for any flags, like SOCK.NONBLOCK, we want to apply to the connected socket. If Zig's posix.accept detects that accept4 is available, it uses it, otherwise it calls accept and then calls fcntl to set the appropriate flags. accept4 does what Part 3 talked about (minimizing system calls) by combining accept with fcntl.

If you try to run the above code, you should get a rapid and endless streams of:

error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock

We said that SOCK.NONBLOCK made it so functions like accept return an error instead of blocking, given that our accept looks like:

while (true) {
    const socket = posix.accept(listener, &client_address.any, &client_address_len, posix.SOCK.NONBLOCK) catch |err| {
        std.debug.print("error accept: {}\n", .{err});
        continue;
    };
    // ...
}

The output makes sense. There's no obvious solution either. We could sleep whenever accept returns error.WouldBlock, but that would be worse than leaving the socket in blocking mode - at least in blocking mode we're woken up as soon as their an connection waiting to be accepted.

If we keep our listening socket in blocking mode, but put connected sockets in non-blocking mode, we'll face the same problems. When our read returns error.WouldBlock what can we realistically do? All of our implementations so far depend on one thread processing one connection. We need to fundamentally rethink our approach.

We need to break out of the one-thread-per-connection pattern, which is something we can only do now that we've discovered non-blocking I/O. We probably need to start organizing our code a little better, but you can start thinking about having an array of sockets that we can loop through and try to read from. As an incomplete prototype, something like:

for (sockets) |s| {
    const n = posix.read(s, &buf) catch |err| {
        switch (err) {
            error.WouldBlock => {},
            else => {
                posix.close(s);
                // TODO: remove the socket from our array
            },
        }
        // go to the next socket, this one isn't ready yet
        continue;
    }

    if (n == 0) {
        posix.close(s);
        // TODO: remove the socket from our array
        continue;
    }

    process(buf([0..n]))
}

This is possible because posix.read won't block when no data is available. If error.WouldBlock is returned, we can skip to the next socket in our list. There are at least two major problems with this prototype. The first is that this will result in a tight loop when no socket is ready. We previously talked about the importance of minimizing system calls, but the above code would result in a massive number of calls to read as we constantly poll each socket, hoping that one has data.

A larger issue is the need to associate state with each connection. This was trivial in our thread-per-connection model, but gets more complicated now that we need to track multiple connections. For example, if we consider that a read is likely to return less than or more than a single "message" (see Part 2), it seems unlikely that we can share a buffer for all sockets. This is something that we started solving when we introduced a Client. Our above prototype can be improved if we think clients rather than sockets:

for (clients) |client| {
    while (true) {
        const message = client.readMessage() catch |err| {
            client.close();
            // TODO: remove client from our clients array;
        } orelse break; // no message (either not enough bytes, or WouldBlock)

        client.process(message) catch {
            client.close();
            // TODO: remove client from our clients array;
        };
    }
}

We're getting further and further away from working code, but the key change here is that a Client encapsulates the state necessary for reading and [eventually] writing messages. Our fictional readMessage could fail, but it could also return null to indicate that there isn't enough data to form a complete message yet - maybe because we don't have all the bytes, or maybe because posix.read returned error.WouldBlock. If you're wondering why we have an inner-loop, recall that we're not just concerned about reading less than a whole message, but also about reading more than a single message. It's possible that readMessage, which would eventually call posix.read, fills our buffer with multiple messages. We need to process them all.

So far, this is admittedly very abstract, but remember in Part 3 where we create a stateful message-aware Reader. Our client.readMessage can be a wrapper around reader.readMessage which translates error.WouldBlock into null:

fn readMessage(self: *Client) !?[]byte {
    return self.reader.readMessage() catch |err| switch (err) {
        error.WouldBlock => return null,
        else => return err,
    };
}

While this seems promising, constantly looping through our clients, hoping that one or more has data, isn't going to be efficient.

The poll system call lets us register file descriptors (like sockets) with the operating system and be notified when certain events, like reading or writing, can be done without blocking. For example, we can give poll an array of sockets and it'll block until one is ready to be read. It's a simple API, a single function, but it's a change in how we think about serving clients. We're going to start by looking at a basic implementation which is unconcerned with things like message boundaries. Our goal, for now, is to get familiar with poll's API and this new way of working with sockets.

const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // Our server can support 4095 clients. Wait, shouldn't that be 4096? No
    // One of the polling slots (the first one) is reserved for our listening
    // socket.

    var polls: [4096]posix.pollfd = undefined;
    polls[0] = .{
        .fd = listener,
        .events = posix.POLL.IN,
        .revents = 0,
    };
    var poll_count: usize = 1;

    while (true) {
        // polls is the total number of connections we can monitor, but
        // polls[0..poll_count + 1] is the actual number of clients + the listening
        // socket that are currently connected (remember, the upper bound is exclusive)
        var active = polls[0..poll_count + 1];

        // 2nd argument is the timeout, -1 is infinity
        _ = try posix.poll(active, -1);

        // Active[0] is _always_ the listening socket. When this socket is ready
        // we can accept. Putting it outside the following while loop means that
        // we don't have to check if if this is the listening socket on each
        // iteration
        if (active[0].revents != 0) {
            // The listening socket is ready, accept!
            // Notice that we pass SOCK.NONBLOCK to accept, placing the new client
            // socket in non-blocking mode. Also, for now, for simplicity,
            // we're not capturing the client address (the two null arguments).
            const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);

            // Add this new client socket to our polls array for monitoring
            polls[poll_count] = .{
                .fd = socket,

                // This will be SET by posix.poll to tell us what event is ready
                // (or it will stay 0 if this socket isn't ready)
                .revents = 0,

                // We want to be notified about the POLL.IN event
                // (i.e. can read without blocking)
                .events = posix.POLL.IN,
            };

            // increment the number of active connections we're monitoring
            // this can overflow our 4096 polls array. TODO: fix that!
            poll_count += 1;
        }

        var i: usize = 1;
        while (i < active.len) {
            const polled = active[i];

            const revents = polled.revents;
            if (revents == 0) {
                // This socket isn't ready, go to the next one
                i += 1;
                continue;
            }

            var closed = false;

            // the socket is ready to be read
            if (revents & posix.POLL.IN == posix.POLL.IN) {
                var buf: [4096]u8 = undefined;
                const read = posix.read(polled.fd, &buf) catch 0;
                if (read == 0) {
                    // probably closed on the other side
                    closed = true;
                } else {
                    std.debug.print("[{d}] got: {any}\n", .{polled.fd, buf[0..read]});
                }
            }

            // either the read failed, or we're being notified through poll
            // that the socket is closed
            if (closed or (revents & posix.POLL.HUP == posix.POLL.HUP)) {
                posix.close(polled.fd);

                // We use a simple trick to remove it: we swap it with the last
                // item in our array, then "shrink" our array by 1
                const last_index = active.len - 1;
                active[i] = active[last_index];
                active = active[0..last_index];
                poll_count -= 1;

                // don't increment `i` because we swapped out the removed item
                // and shrank the array
            } else {
                // not closed, go to the next socket
                i += 1;
            }
        }
    }
}

The above is a working example. You can run it and connect up to 4095 clients - any more and it'll crash, something we can and will eventually fix. The above code is documented, but there are a number of things to go over. The posix.pollfd structure has three fields:

  1. fd - The file descriptor that we're polling,
  2. events - A bitwise list of events we care about, for now that's only POLL.IN but later, when we look at writing, it'll be POLL.IN | POLL.OUT,
  3. revents - The ready events set by poll system call. This tells us which, if any, events are ready.

The first thing we do is setup a pollfd for our listening socket, registering our interest in POLL.IN. For a listening socket POLL.IN indicates that we can accept without blocking. We set and keep this at index zero throughout the lifetime of our program. When posix.poll returns, it means that at least 1 of the monitored file descriptor is ready. We iterate through them all, looking for any where revents != 0. We special-case our listening socket, always at active[0] - we need to call accept and process the new connection. As an optimization, we've moved that out of our loop. This means we don't have to check if (i == 0) for each iteration of our loop.

We could have used an std.ArrayList(posix.pollfd) to make our life of adding and removing entries a little easier, but, at least for the purpose of learning, I prefer the explicit code to handle new connections and disconnects.

You might be wondering why we check if revents has the POLL.HUP flag, even though we didn't register our interest in POLL.HUP. POLL.HUP is always monitored even if we don't explicitly ask for it. We check POLL.IN first, but we could check POLL.HUP first instead. For example, an HTTP server might prefer checking POLL.HUP and remove disconnected clients, ignoring any pending data they've sent. When POLL.IN is set, you might be wondering if read can fail and/or return zero bytes. The simple answer is: I'm not sure. It would certainly be possible for read to return error.WouldBlock if another thread called read on the same socket, draining it. And, we could get zero bytes if the supplied buffer passed into read was zero-length. In this single-threaded example, with a fixed-length buffer, neither of those cases is possible. But I think it's better to be safe than sorry, and we should always assume read can return zero bytes or an error.

The poll call unblocks when at least one of the monitored file descriptor is ready. But the only way to know which are ready is to iterate through all of them, checking if revents != 0. Even with a modest upper limit of 4K clients, this isn't ideal. There isn't much we can do about. However, poll does return the number of entries that have a non-zero revents. So while our code will still be O(N), we can at least stop iterating once we've processed the number of ready sockets

while (true) {
    var active = polls[0..poll_count + 1];

    // we no longer ignore the return value
    var pending = try posix.poll(active, -1);

    if (active[0].revents != 0) {
        // previous code to accept the socket and add it to polls is unchanged
        // (but I removed it to keep this snippet small)

        // add this:
        pending -= 1;
    }

    // Next two lines are changed
    var i: usize = 1;
    while (pending > 0) {
        const polled = active[i];

        const revent = polled.revents;
        if (revent == 0) {
            i += 1;
            continue;
        }

        // add this
        // we've processed one of the pending notifications
        pending -= 1;

        // the rest is the same
        // ....
    }
}

Both epoll and kqueue, platform-specific alternatives to poll, elegantly solve this problem. We'll cover both those APIs in following parts.

When the listening socket is ready to accept a new connection, that is when polls[0].revents != 0, we call accept once. But because our socket is in non-blocking mode, we could call it until we receive an error.WouldBlock error:

if (active[0].revents != 0) {
    while (true) {
        const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| switch (err) {
            error.WouldBlock => break,
            else => return err,
        };
        polls[poll_count] = .{
            .fd = socket,
            .revents = 0,
            .events = posix.POLL.IN,
        };
        poll_count += 1;
    }
}

poll tells us the listening socket can accept without blocking (i.e. that it's ready). But it doesn't tell us how many pending connections there are waiting to be accepted. Both approaches, looping and not looping, work because poll is always level-triggered. This means that poll continues to notify us so long as the socket is ready. This is different than edge-triggered which only notifies us when the state changes (we'll learn more about this in future parts).

Put differently, if there are four connections waiting to be accepted when poll returns, but we only accept one, the next call to poll will re-notify us that our listening socket is ready because of the still-pending three connections.

In the above code, when a socket is ready, we simply print whatever bytes we were able to read. But as we've discussed a few times, TCP deals with streams of bytes and isn't "message"-aware. We need to add more state so that we can handle a read returning only part of a message or more than a single message. We already saw a pseudo-implementation of this above, when we looked at non-blocking I/O. To use that same approach, we have a minor problem: poll doesn't allow us to associate arbitrary data with the pollfd, so we need to create and manage this association ourselves.

I think it's time that we introduced a Server struct to help us keep our code tidy. First its fields, init and deinit functions. We'll see a full Client implementation shortly, but for now, it's just a thin wrapper around Reader from previous parts.

const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;

// We're going to start logging errors to a scope logger
const log = std.log.scoped(.tcp_demo);

const Server = struct {
    // Our Client need an allocator to create their read buffer
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    // polls[0] is always our listening socket
    polls: []posix.pollfd,

    // list of clients, only client[0..connected] are valid
    clients: []Client,

    // This is always polls[1..] and it's used to so that we can manipulate
    // clients and client_polls together. Necessary because polls[0] is the
    // listening socket, and we don't ever touch that.
    client_polls: []posix.pollfd,

    fn init(allocator: Allocator, max: usize) !Server {
        // + 1 for the listening socket
        const polls = try allocator.alloc(posix.pollfd, max + 1);
        errdefer allocator.free(polls);

        const clients = try allocator.alloc(Client, max);
        errdefer allocator.free(clients);

        return .{
            .polls = polls,
            .clients = clients,
            .client_polls = polls[1..],
            .connected = 0,
            .allocator = allocator,
        };
    }

    fn deinit(self: *Server) void {
        // TODO: Close connected sockets? We'll talk about shutdowns in
        // a future part.
        self.allocator.free(self.polls);
        self.allocator.free(self.clients);
    }
};

The first client that connects will obviously be at clients[0], but it'll be at polls[1] because our listening socket always at polls[0]. This isn't a requirement, it's merely our own convention. But given that we want to poll both our listening socket and connected sockets, it's an efficient way to do things. However, it means that client[N] corresponds the polls[N+1]. This +1 offset is error prone and annoying, so we also create a client_polls which will be set to polls[1..]. Now when clients connected and disconnect, we just touch clients and client_polls which both share the same offset.

Next we can add a run function. This is almost the same code as our first example with poll, but I've extracted some of the functionality into their own function (which we'll see next):

fn run(self: *Server, address: std.net.Address) !void {
    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // One of the polling slots (the first one) is reserved for our listening
    // socket.
    self.polls[0] = .{
        .fd = listener,
        .revents = 0,
        .events = posix.POLL.IN,
    };

    while (true) {
        // Oops, still have a +1 here, since we want to poll all our connected
        // clients, plus our listening socket
        _ = try posix.poll(self.polls[0..self.connected + 1], -1);

        if (self.polls[0].revents != 0) {
            // listening socket is ready
            self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
        }

        var i: usize = 0;
        while (i < self.connected) {
            const revents = self.client_polls[i].revents;
            if (revents == 0) {
                // this socket isn't ready, move on to the next one
                i += 1;
                continue;
            }

            var client = &self.clients[i];
            if (revents & posix.POLL.IN == posix.POLL.IN) {
                // this socket is ready to be read
                while (true) {
                    const msg = client.readMessage() catch {
                        // we don't increment `i` when we remove the client
                        // because removeClient does a swap and puts the last
                        // client at position i
                        self.removeClient(i);
                        break;
                    } orelse {
                        // no more messages, but this client is still connected
                        i += 1;
                        break;
                    };
                    std.debug.print("got: {s}\n", .{msg});
                }
            }
        }
    }
}

The goal here is that the client at client[N] is being monitored by polls[N+1]. But, to avoid that nasty +1, we use client_polls which is slice of polls: polls[1..]. This means that when client_polls[N] is ready, we can access the corresponding client at clients[N].

I'm not sure I love the idea of extracting accept and removeClient into their own function. After all, they're only called from a single place and I like being able to read code without having to chase after it. But, ask me another day and I'll give you another answer:

fn accept(self: *Server, listener: posix.socket_t) !void {
    while (true) {
        // we'll continue to accept until we get error.WouldBlock
        // or until our program crashes because we overflow self.clients and self.polls
        // (we really should fix that!)

        var address: net.Address = undefined;
        var address_len: posix.socklen_t = @sizeOf(net.Address);
        const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
            error.WouldBlock => return,
            else => return err,
        };

        const client = Client.init(self.allocator, socket, address) catch |err| {
            posix.close(socket);
            log.err("failed to initialize client: {}", .{err});
            return;
        };

        const connected = self.connected;
        self.clients[connected] = client;
        self.client_polls[connected] = .{
            .fd = socket,
            .revents = 0,
            .events = posix.POLL.IN,
        };
        self.connected = connected + 1;
    }
}

fn removeClient(self: *Server, at: usize) void {
    var client = self.clients[at];
    posix.close(client.socket);
    client.deinit(self.allocator);

    // Swap the client we're removing with the last one
    // So that when we set connected -= 1, it'll effectively "remove"
    // the client from our slices.
    const last_index = self.connected - 1;
    self.clients[at] = self.clients[last_index];
    self.client_polls[at] = self.client_polls[last_index];

    self.connected = last_index;
}

It seems like a lot more code, but the approach is the same as our initial example that used poll. Rather than having two slices, clients and client_polls which are linked by index, we could have use a std.AutoHashMap(posix.socket_t, Client). Then code would have looked something like:

for (self.polls[1..]) |p| {
    const revents = p.revents;
    if (revents == 0) {
        // same as before
        continue;
    }
    const client = self.clients.getPtr(p.fd) orelse unreachable;
    // Like before, we now have a *Client for the socket which is ready
}

Other parts of the code would also have been simplified - hashmaps have a way of doing that. But it isn't how I'd do it - because it would probably be less efficient - and this series isn't about taking the easy path.

Although the poll system call is simple we had to make substantial changes to our code, and our mindset, to accommodate this different way of interacting with sockets. State becomes both more critical and challenging to maintain compared to previous approaches where we had a thread-per-connection.

The next part will continue to expand on the above, adding writes, timeouts, connection limits and looking at how we can combine what we've learnt here with our previous experience with multi-threading. Future parts will then look at platform-specific alternatives to poll: epoll for Linux and kqueue for BSD/MacOS. These are not only faster and more scalable than poll but also offer additional features. However, fundamentally, most of what we're learning here will translate directly to those APIs. Although it's relatively simple to build a system that supports both platform-specific APIs, poll has the significant benefit of being cross-platform. If you don't need the extra performance/scale or features of the platform-specific APIs, I suggest you stick with poll.

As a quick aside, there's also the select system call which is older and more limited than poll. Unless you're targeting a very old platform, you should always use poll instead of select. But you will see "select" mentioned/referenced now and again, so it's good to at least be aware of it.

const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("127.0.0.1", 5882);
    try server.run(address);
}

const Server = struct {
    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    // polls[0] is always our listening socket
    polls: []posix.pollfd,


    // list of clients, only client[0..connected] are valid
    clients: []Client,

    // This is always polls[1..] and it's used to so that we can manipulate
    // clients and client_polls together. Necessary because polls[0] is the
    // listening socket, and we don't ever touch that.
    client_polls: []posix.pollfd,

    fn init(allocator: Allocator, max: usize) !Server {
        // + 1 for the listening socket
        const polls = try allocator.alloc(posix.pollfd, max + 1);
        errdefer allocator.free(polls);

        const clients = try allocator.alloc(Client, max);
        errdefer allocator.free(clients);

        return .{
            .polls = polls,
            .clients = clients,
            .client_polls = polls[1..],
            .connected = 0,
            .allocator = allocator,
        };
    }

    fn deinit(self: *Server) void {
        // TODO: Close connected sockets? We'll talk about shutdowns in
        // a future part.
        self.allocator.free(self.polls);
        self.allocator.free(self.clients);
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);

        // One of the polling slots (the first one) is reserved for our listening
        // socket.
        self.polls[0] = .{
            .fd = listener,
            .revents = 0,
            .events = posix.POLL.IN,
        };

        while (true) {
            // Oops, still have a +1 here, since we want to poll all our connected
            // clients, plus our listening socket
            _ = try posix.poll(self.polls[0..self.connected + 1], -1);

            if (self.polls[0].revents != 0) {
                // listening socket is ready
                self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
            }

            var i: usize = 0;
            while (i < self.connected) {
                const revents = self.client_polls[i].revents;
                if (revents == 0) {
                    // this socket isn't ready, move on to the next one
                    i += 1;
                    continue;
                }

                var client = &self.clients[i];
                if (revents & posix.POLL.IN == posix.POLL.IN) {
                    // this socket is ready to be read
                    while (true) {
                        const msg = client.readMessage() catch {
                            // we don't increment `i` when we remove the client
                            // because removeClient does a swap and puts the last
                            // client at position i
                            self.removeClient(i);
                            break;
                        } orelse {
                            // no more messages, but this client is still connected
                            i += 1;
                            break;
                        };

                        std.debug.print("got: {s}\n", .{msg});
                    }
                }
            }
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        while (true) {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = Client.init(self.allocator, socket, address) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };

            const connected = self.connected;
            self.clients[connected] = client;
            self.client_polls[connected] = .{
                .fd = socket,
                .revents = 0,
                .events = posix.POLL.IN,
            };
            self.connected = connected + 1;
        }
    }

    fn removeClient(self: *Server, at: usize) void {
        var client = self.clients[at];

        posix.close(client.socket);
        client.deinit(self.allocator);

        // Swap the client we're removing with the last one
        // So that when we set connected -= 1, it'll effectively "remove"
        // the client from our slices.
        const last_index = self.connected - 1;
        self.clients[at] = self.clients[last_index];
        self.client_polls[at] = self.client_polls[last_index];

        self.connected = last_index;
    }
};


const Client = struct {
    reader: Reader,
    socket: posix.socket_t,
    address: std.net.Address,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        return .{
            .reader = reader,
            .socket = socket,
            .address = address,
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};