homedark

TCP Server in Zig - Part 6 - Epoll

Oct 23, 2024

In the last two parts we introduced the poll system call and, with it, patterns around evented I/O. poll has the advantage of being simple to use and supported on most platforms. However, as we saw, we need to iterate through all monitored sockets to figure out which is ready.

Another awkwardness with poll's API is associating application-specific data with each pollfd being monitored. To know which Client is ready, we need to create a mapping between the pollfd and Client. While this isn't the end of the world, it is something that poll alternatives address.

The first alternative API that we'll look at is epoll. The bulk of what we've learnt about evented I/O is still applicable as-is. However, epoll is a Linux-specific system call. In order to support it as well as other platforms, we'll need to leverage Zig's comptime as well as some layer of abstraction. This is something we'll look at after exploring kqueue, a BSD/MacOS-specific system call.

The epoll API is three separate system calls. epoll_create1 is used to create an epoll file descriptor. You can think of this as our epoll instance. epoll_ctl is used to modify our instance. With it, we can add, modify and remove sockets that we want our instance to monitor. Finally, we have epoll_wait which blocks until one of the monitored sockets is ready or until the configured timeout is reached.

With poll we specify the sockets to monitor on the blocking call to poll itself. This means we need to maintain our own array of pollfd. With epoll, because we have distinct functions to modify and wait, this ugliness is gone.

Here's a bare-bonned runnable (on Linux) example:

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // epoll_create1 takes flags. We aren't using any in these examples
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    var ready_list: [128]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = client_socket}};
                try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
            } else {
                var closed = false;
                var buf: [4096]u8 = undefined;
                const read = posix.read(ready_socket, &buf) catch 0;
                if (read == 0) {
                    closed = true;
                } else {
                    std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
                }

                if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
                    posix.close(ready_socket);
                }
            }
        }
    }
}

There's a number of interesting things in the above code, especially compared to our first runnable code that used poll. The list of epoll_event's that we pass into the function is going to be filled by epoll_wait itself. The return value tells us how many events were filled. So instead of having to loop through every monitored event, seeing which is ready, we're given a list of ready sockets, i.e. ready_list[0..ready_count]. It's significantly more efficient.

Another change is how much simpler it is to remove a connection. We can explicitly remove a monitor via epoll_ctl and the CTL_DEL operation. But just closing the socket is enough to remove it.

From the above, we can tell that epoll_ctl takes the epoll file descriptor created via epoll_create1, the operation (CTL_ADD, CTL_DEL or CTL_MOD), the file descriptor to monitor (sockets in our case) and an epoll_event. The epoll_event structure has two fields, the events which is like a pollfd.events and data. The data field is a untagged union and allows us to associate arbitrary information with an epoll_event. The value we store in data can be retrieved when iterating through the ready list. Above, we store the socket and retrieve the socket from data.fd. Storing the socket directly in data is a good start, but we can do more with this field.

One of the challenge we faced with poll was to associate an pollfd with its corresponding client. We managed to get this working by keeping two slices in sync, but never bothered figuring out how to get a pollfd from a client. With epoll this is much simpler. Specifically, the epoll_event.data field lets us store the numeric representation of a pointer. Or, put differently, we can associate anything we want with an epoll_event.

The benefit of this might not be immediately obvious, but let's add a simple Client and use data to associate our epoll_event directly with a client:

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

// Added this simple client
const Client = struct {
    socket: posix.socket_t,
};

pub fn main() !void {
    // We now need an allocator,
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    const address = try std.net.Address.parseIp("0.0.0.0", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    const efd = try posix.epoll_create1(0);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.ptr = 0}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    var ready_list: [128]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            // we're using ptr instead of fd
            switch (ready.data.ptr) {
                0 => {
                    const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                    errdefer posix.close(socket);

                    // We create a client
                    const client = try allocator.create(Client);
                    errdefer allocator.destroy(client);
                    client.* = .{.socket = socket};

                    var event = linux.epoll_event{
                        .events = linux.EPOLL.IN,
                        // instead of setting fd, we set ptr which is a usize
                        .data = .{.ptr = @intFromPtr(client)}
                    };

                    try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, socket, &event);
                },
                else => |nptr| {
                    var closed = false;
                    var buf: [4096]u8 = undefined;

                    // we can get the client from data.ptr by reversing
                    // @intFromPtr with a call to @ptrFromInt
                    const client: *Client = @ptrFromInt(nptr);

                    // We should use client.readMessage() when we add our Reader
                    // back into our Client.
                    const read = posix.read(client.socket, &buf) catch 0;
                    if (read == 0) {
                        closed = true;
                    } else {
                        std.debug.print("[{d}] got: {any}\n", .{client.socket, buf[0..read]});
                    }

                    if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
                        posix.close(client.socket);
                        allocator.destroy(client);
                    }
                },
            }
        }
    }
}

The data.ptr is used to store the usize-representation of our client pointer. We use Zig's built-in @intFromPtr to turn a pointer into an integer and then @ptrFromInt to reverse the process. A real hack in the above code is setting the data.ptr for our listening socket to 0. Remember that data is an untagged union, if we use data.fd = listener for our listening socket, but then data.ptr = @intFormPtr(client) for client connections, we won't be able to tell which tag is active. The "right" way to fix this is to create our own tagged union:

const EventType = union(enum) {
    listener: posix.socket_t,
    recv: *Client,
};

But that's another layer of allocations and indirection. Instead, we use 0 and hope that the operating system never allocate a Client at memory address 0 (this is more than "hope", and aside from embedded systems, most platforms would never allocate memory at address 0).

@intFromPtr is a built-in function which takes a pointer and gives you the usize value. Now, when a socket is ready, we can get the corresponding client by using the reverse function, @ptrFromInt on the data.ptr field to get the client:

const client: Client = @ptrFromInt(nptr);

It might be obvious to some, but for this to work, a client has to be allocated on the heap. While we can call @intToPtr on a stack-allocated value, that value won't magically remain valid beyond its scope.

Being able to associate arbitrary data with an epoll_event tightens up our code, but the real benefit is the ease with which we can modify the epoll instance from a client. The way we previously structured our poll-based server, it wasn't possible for writeMessage to be called externally (i.e. from application code) because the Client had no way to alter the server's polls list. We could have had each Client reference a *Server, but it would have been tricky to deal with the moving indexes.

With epoll we still need each Client to have the epoll instance (either directly, or via a reference to the *Server), but can use epoll_ctl to modify the monitor for the client. You'll recall that previously our write function would return a false to signal an incomplete write and true to signal a completed write. This value was used by our run function to switch to and from write-mode and read-mode. We can now bake this into write:

fn write(self: *Client) !void {
    var buf = self.to_write;
    defer self.to_write = buf;
    while (buf.len > 0) {
        const n = posix.write(self.socket, buf) catch |err| switch (err) {
            error.WouldBlock => {
                // switch to write-mode
                var event = linux.epoll_event{
                    .events = linux.EPOLL.OUT,
                    .data = .{.ptr = @intFromPtr(self)},
                };
                return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
            }
            else => return err,
        };

        if (n == 0) {
            return error.Closed;
        }
        buf = buf[n..];
    } else {
        // done writing, switch to read-mode
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = @intFromPtr(self)},
        };
        return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
    }
}

That might seem like a minor change, but it enables something you'll almost certainly need: the ability to initiate a write from the application. There is one inefficiency in the above code: on a complete write, we revert to read-mode. However, most of the time, we'll probably never have gone into write-mode to begin with. As we've said before, unless your server is very busy, your client is lagging, or you're sending very large messages, there's a reasonable chance that posix.write will succeed in a single call. In such cases, reverting to read-mode is an unnecessary and wasteful system call. I'll leave this up to you to fix, but one solution would be to add an IOMode enum with two values: read and write and track which the client is currently in. Then we can check if self.mode == .write before switching to read-mode.

We mentioned that poll is always level-triggered. That is, as long as a condition holds, we'll get a notification. For example, if one call to poll notifies us that a certain socket is ready and we then callpoll again, without reading the socket, we'll get another notification. This is also how epoll works by default. But we can make epoll edge-triggered on a per-socket basis.

Let's go back to our first epoll program, and change the code that handles a client's readiness:

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // epoll_create1 takes flags. We aren't using any in these examples
    const efd = try posix.epoll_create1(0);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    var ready_list: [128]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                var event = linux.epoll_event{
                    .events = linux.EPOLL.IN,
                    .data = .{.fd = client_socket}
                };
                try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
            } else {
                // THIS HAS CHANGED
                std.debug.print(".", .{});
            }
        }
    }
}

For now, only the end of the code has changed. Instead of trying to read the message and echo'ing it back, we write a dot .. If you run this code (again, Linux only), connect and send a message, your screen should get filled with dots. This is because epoll is, by default, in level-triggered mode and continues to notify us of the socket's readiness.

When adding the monitor for the client socket, if we change events from linux.EPOLL.IN to linux.EPOLL.IN | linux.EPOLL.ET, i.e.:

var event = linux.epoll_event{
    // THIS IS CHANGED
    .events = linux.EPOLL.IN | linux.EPOLL.ET,
    .data = .{.fd = client_socket}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);

we make the monitor edge-triggered (ET). Now, if we connect a client and send a message, we'll get a single notification. The monitor isn't deleted or even disabled. If we send another message, we'll get a new notification. epoll is pretty smart too, a new event doesn't have to happen while epoll_wait is blocked. So there is no race condition before setting a monitor and calling epoll_wait. In our above code, if the client sends a message right after the socket is added with epoll_ctl but before epoll_wait is called, we will get the notification.

We can also use linux.EPOLL.ONESHOT which disables the notification for the socket after readiness has been signaled once, including for any new messages. epoll_ctl with the linux.EPOLL.CTL_MOD operation needs to be called to re-enable the monitor. This is obviously different than using edge-triggered since notifications are disabled even for new events. While ONESHOT might be useful to make sure that a single message per client is processed, it does have the disadvantage of requiring many more system calls - one every time we want to re-enable to monitor.

Besides the fact that epoll is Linux-specific, it's safe to think of it as a better version of poll. It's more efficient, easier to integrate and has more features. Except for the specifics of the API, everything about evented I/O is unchanged. Using @intFromPtr and @ptrFromInt can actually be useful now and again, but it's mostly used as above, when integrating with a C library that lets you associate arbitrary data (often a usize) with a C structure.

Below, you'll find a full working (in Linux) example. It merges what we've learned about epoll with our Server, Client and Reader from previous parts. We've also introduced an Epoll structure. The goal is to start creating some abstraction to eventually deal with having platform-specific implementations (i.e. using epoll on Linux and kqueue on BSD/MacOS). This abstration is incomplete, but it's a start and something we'll dedicate a whole part to.

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("127.0.0.1", 5882);
    try server.run(address);
    std.debug.print("STOPPED\n", .{});
}

// 1 minute
const READ_TIMEOUT_MS = 60_000;

const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;

const Server = struct {
    // maximum # of allowed clients
    max: usize,

    loop: Epoll,

    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    read_timeout_list: ClientList,

    // for creating client
    client_pool: std.heap.MemoryPool(Client),
    // for creating nodes for our read_timeout list
    client_node_pool: std.heap.MemoryPool(ClientList.Node),

    fn init(allocator: Allocator, max: usize) !Server {
        const loop = try Epoll.init();
        errdefer loop.deinit();

        const clients = try allocator.alloc(*Client, max);
        errdefer allocator.free(clients);

        return .{
            .max = max,
            .loop = loop,
            .connected = 0,
            .allocator = allocator,
            .read_timeout_list = .{},
            .client_pool = std.heap.MemoryPool(Client).init(allocator),
            .client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
        };
    }

    fn deinit(self: *Server) void {
        self.loop.deinit();
        self.client_pool.deinit();
        self.client_node_pool.deinit();
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);
        var read_timeout_list = &self.read_timeout_list;

        try self.loop.addListener(listener);

        while (true) {
            const next_timeout = self.enforceTimeout();
            const ready_events = self.loop.wait(next_timeout);
            for (ready_events) |ready| {
                switch (ready.data.ptr) {
                    0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
                    else => |nptr| {
                        const events = ready.events;
                        const client: *Client = @ptrFromInt(nptr);

                        if (events & linux.EPOLL.IN == linux.EPOLL.IN) {
                            // this socket is ready to be read
                            while (true) {
                                const msg = client.readMessage() catch {
                                    self.closeClient(client);
                                    break;
                                } orelse break;   // no more messages

                                client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
                                read_timeout_list.remove(client.read_timeout_node);
                                read_timeout_list.append(client.read_timeout_node);

                                client.writeMessage(msg) catch {
                                    self.closeClient(client);
                                    break;
                                };
                            }
                        } else if (events & linux.EPOLL.OUT == linux.EPOLL.OUT) {
                            client.write() catch self.closeClient(client);
                        }
                    }
                }
            }
        }
    }

    fn enforceTimeout(self: *Server) i32 {
        const now = std.time.milliTimestamp();
        var node = self.read_timeout_list.first;
        while (node) |n| {
            const client = n.data;
            const diff = client.read_timeout - now;
            if (diff > 0) {
                // this client's timeout is the first one that's in the
                // future, so we now know the maximum time we can block on
                // poll before having to call enforceTimeout again
                return @intCast(diff);
            }

            // This client's timeout is in the past. Close the socket
            // Ideally, we'd call server.removeClient() and just remove the
            // client directly. But within this method, we don't know the
            // client_polls index. When we move to epoll / kqueue, this problem
            // will go away, since we won't need to maintain polls and client_polls
            // in sync by index.
            posix.shutdown(client.socket, .recv) catch {};
            node = n.next;
        } else {
            // We have no client that times out in the future (if we did
            // we would have hit the return above).
            return -1;
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        const space = self.max - self.connected;
        for (0..space) |_| {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = try self.client_pool.create();
            errdefer self.client_pool.destroy(client);
            client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };
            errdefer client.deinit(self.allocator);

            client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
            client.read_timeout_node = try self.client_node_pool.create();
            errdefer self.client_node_pool.destroy(client.read_timeout_node);

            client.read_timeout_node.* = .{
                .next = null,
                .prev = null,
                .data = client,
            };
            self.read_timeout_list.append(client.read_timeout_node);
            try self.loop.newClient(client);
            self.connected += 1;
        } else {
            // we've run out of space, stop monitoring the listening socket
            try self.loop.removeListener(listener);
        }
    }

    fn closeClient(self: *Server, client: *Client) void {
        self.read_timeout_list.remove(client.read_timeout_node);

        posix.close(client.socket);
        self.client_node_pool.destroy(client.read_timeout_node);
        client.deinit(self.allocator);
        self.client_pool.destroy(client);
    }
};

const Client = struct {
    // This probably shouldn't be a pointer. Epoll is currently lightweight
    // and is safe to copy. But, in Part 8, when we build an abstraction
    // for Epoll and Kqueue, this will be necessary.
    loop: *Epoll,

    socket: posix.socket_t,
    address: std.net.Address,

    // Used to read length-prefixed messages
    reader: Reader,

    // Bytes we still need to send. This is a slice of `write_buf`. When
    // empty, then we're in "read-mode" and are waiting for a message from the
    // client.
    to_write: []u8,

    // Buffer for storing our length-prefixed messaged
    write_buf: []u8,

    // absolute time, in millisecond, when this client should timeout if
    // a message isn't received
    read_timeout: i64,

    // Node containing this client in the server's read_timeout_list
    read_timeout_node: *ClientNode,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *Epoll) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        const write_buf = try allocator.alloc(u8, 4096);
        errdefer allocator.free(write_buf);

        return .{
            .loop = loop,
            .reader = reader,
            .socket = socket,
            .address = address,
            .to_write = &.{},
            .write_buf = write_buf,
            .read_timeout = 0, // let the server set this
            .read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
        allocator.free(self.write_buf);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }

    fn writeMessage(self: *Client, msg: []const u8) !void {
        if (self.to_write.len > 0) {
            // Depending on how you structure your code, this might not be possible
            // For example, in an HTTP server, the application might not control
            // the actual "writeMessage" call, and thus it would not be possible
            // to make more than one writeMessage call per request.
            // For this demo, we'll just return an error.
            return error.PendingMessage;
        }

        if (msg.len + 4 > self.write_buf.len) {
            // Could allocate a dynamic buffer. Could use a large buffer pool.
            return error.MessageTooLarge;
        }

        // copy our length prefix + message to our buffer
        std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
        const end = msg.len + 4;
        @memcpy(self.write_buf[4..end], msg);

        // setup our to_write slice
        self.to_write = self.write_buf[0..end];

        // immediately write what we can
        return self.write();
    }

    // Returns `false` if we didn't manage to write the whole mssage
    // Returns `true` if the message is fully written
    fn write(self: *Client) !void {
        var buf = self.to_write;
        defer self.to_write = buf;
        while (buf.len > 0) {
            const n = posix.write(self.socket, buf) catch |err| switch (err) {
                error.WouldBlock => return self.loop.writeMode(self),
                else => return err,
            };

            if (n == 0) {
                return error.Closed;
            }
            buf = buf[n..];
        } else {
            return self.loop.readMode(self);
        }
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};

// We'll eventually need to build a platform abstractions between epoll and
// kqueue. This is a rough start.
const Epoll = struct {
    efd: posix.fd_t,
    ready_list: [128]linux.epoll_event = undefined,

    fn init() !Epoll {
        const efd = try posix.epoll_create1(0);
        return .{.efd = efd};
    }

    fn deinit(self: Epoll) void {
        posix.close(self.efd);
    }

    fn wait(self: *Epoll, timeout: i32) []linux.epoll_event {
        const count = posix.epoll_wait(self.efd, &self.ready_list, timeout);
        return self.ready_list[0..count];
    }

    fn addListener(self: Epoll, listener: posix.socket_t) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = 0},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    fn removeListener(self: Epoll, listener: posix.socket_t) !void {
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_DEL, listener, null);
    }

    fn newClient(self: Epoll, client: *Client) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = @intFromPtr(client)},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, client.socket, &event);
    }

    fn readMode(self: Epoll, client: *Client) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = @intFromPtr(client)},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
    }

    fn writeMode(self: Epoll, client: *Client) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.OUT,
            .data = .{.ptr = @intFromPtr(client)},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
    }
};