home

TCP Server in Zig - Part 7 - Kqueue

Oct 27, 2024

kqueue is a BSD/MacOS alternative over poll. In most ways, kqueue is similar to the Linux-specific epoll, which itself is important, but important, incremental upgrade to poll. Because kqueue has a single function it superficially looks like poll. But, as we'll soon see, that single function can behave in two different ways, making its API and the integration into our code very similar to epoll.

Because kqueue is rather similar to epoll, this part is shorter as it assumes that you're familiar with topics discussed in part 6, such as edge-triggering and @intToPtr.

Where epoll has one function to modify the epoll file descriptor (epoll_ctl) and one to wait for notifications (epoll_wait), kqueue uses a single function for both purposes: kevent. However, depending on the values passed to kevent, it can either modify the kqueue instance or wait for notifications or both. Thus, the single function can act like either of the epoll functions or combine both in a single call. The kevent function takes 4 parameters:

  1. The kqueue file descriptor which is the kqueue instance that we're modifying and/or waiting on. Created using posix.kqueue.
  2. A list of posix.Kevent that represents notifications we want to add/change/delete. Known as the changelist. Can be empty.
  3. A list of posix.Kevent that indicate readiness. Known as the eventlist. Can be empty.
  4. A timeout as a posix.timespec. Can be null.

The key to understanding this API is knowing that when the eventlist is empty, kevent immediately returns. Thus, callingkevent with an empty eventlist is like calling epoll_ctl. Therefore, like epoll and unlike poll, as long as we have the kqueue instance, we can easily add, remove and change monitors.

The kqueue API has one advantage over epoll: we can apply modification in bulk. Where epoll_ctl takes a single epoll_event, kevent takes an array of Kevent. In other words, with kqueue it should be possible to make fewer system calls.

This is a working example (on BSD / MacOS). To keep it simple and similar to our first epoll sample, we're not leveraging the bulk-modification capabilities of the API but rather add one event at a time (the final example does add them in bulk):

const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    const kfd = try posix.kqueue();
    defer posix.close(kfd);

    {
        // monitor our listening socket
        _ = try posix.kevent(kfd, &.{.{
            .ident = @intCast(listener),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ADD,
            .fflags = 0,
            .data = 0,
            .udata = @intCast(listener),
        }}, &.{}, null);
    }

    var ready_list: [128]posix.Kevent = undefined;
    while (true) {
        const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket: i32 = @intCast(ready.udata);
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                _ = try posix.kevent(kfd, &.{.{
                    .ident = @intCast(client_socket),
                    .flags = posix.system.EV.ADD,
                    .filter = posix.system.EVFILT.READ,
                    .fflags = 0,
                    .data = 0,
                    .udata = @intCast(client_socket),
                }}, &.{}, null);
            } else {
                var closed = false;
                var buf: [4096]u8 = undefined;
                const read = posix.read(ready_socket, &buf) catch 0;
                if (read == 0) {
                    closed = true;
                } else {
                    std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
                }

                if (closed) {
                    posix.close(ready_socket);
                }
            }
        }
    }
}

Like with epoll, we can attach arbitrary information via the udata field. Above we're using the file descriptor, but in a more complete example we'd likely use @intFromPtr to get a usize representation of an application-specific "Client" struct. The Kevent struct has two additional fields: fflags and data. These hold flags and data to use for different filters. With sockets, where we're only interested in the READ and WRITE filters, these should be set to zero. In a future part, we'll see a brief example of a different filter which does leverage the fflags field.

With epoll the monitors we add, modify are identified by the 3rd parameter we pass to epoll_ctl. In all the code we've seen so far, that was either the listening socket or the client socket, but more generally, it's the file descriptor to monitor. With kqueue the identifier is the combination of the ident and filter fields. With epoll we toggled a client from read-mode to write-mode by modifying the existing notifier (identified by the socket) with the CTL_MOD operation. In kqueue we'd need delete the read monitor and then add a write monitor. Or, and this is what we do in the full example given at the end, we add both a read and write monitor, but disable the write monitor. We can toggle the mode by disabling the active one and enabling the disabled one:

Read Mode:
key=(ident: socket1, filter: read), enabled=true
key=(ident: socket1, filter: write), enabled=false

Write Mode:
key=(ident: socket1, filter: read), enabled=false
key=(ident: socket1, filter: write), enabled=true

This also means that filter isn't a bitwise flag. To check if the socket is ready for reading, we just have to compare ready.filter == posix.system.EVFILT.READ.

In addition to using the EV.READ and EV.WRITE flags, we can also set EV.ONESHOT, EV.DISPATCH and EV.CLEAR.

EV.ONESHOT removes the notification after readiness has been reported, making it one-time-only. The notification has to be re-added using the EV.ADD flag.

EV.DISPATCH is similar but rather than removing the notification, it disables it (thus, EV.DISPATCH is like EPOLL.ONESHOT). To re-arm the notification, EV.ENABLE or EV.ADD have to be called ("adding" an already added entry, whether it's disabled or not, does not create a duplicate, and will re-enable it if disabled). The difference between removing (ONESHOT) and disabling (DISPATCH) is that disabling and re-enabling is faster but takes a bit more memory since the internal structure is kept. If you intend to frequently re-arm the notification, EV.DISPATCH might be a better choice.

EV.CLEAR is similar to EPOLL.ET, causing kevent to signal state change rather than readiness.

As before, we can take our above code, strip out the "read" handling to see the various behaviors. Only the end of the code was changed

const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    const kfd = try posix.kqueue();

    {
        // monitor our listening socket
        _ = try posix.kevent(kfd, &.{.{
            .ident = @intCast(listener),
            .flags = posix.system.EV.ADD,
            .filter = posix.system.EVFILT.READ,
            .fflags = 0,
            .data = 0,
            .udata = @intCast(listener),
        }}, &.{}, null);
    }

    var ready_list: [128]posix.Kevent = undefined;
    while (true) {
        const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket: i32 = @intCast(ready.udata);
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                _ = try posix.kevent(kfd, &.{.{
                    .ident = @intCast(client_socket),
                    .flags = posix.system.EV.ADD,
                    .filter = posix.system.EVFILT.READ,
                    .fflags = 0,
                    .data = 0,
                    .udata = @intCast(client_socket),
                }}, &.{}, null);
            } else {
                // THIS WAS CHANGED
                std.debug.print(".", .{});
            }
        }
    }
}

If you connect to the above and send a message, your screen will get flooded with dots (.) as kevent will continuously notify about the sockets readiness (since we never read from it). Changing the filter for the added client socket from: posix.system.EV.ADD to one of:

will show how each behaves. For the first two, ONESHOT and DISPATCH no matter how much data we send, we'll only ever get a single notification. We'd need to re-add or re-enable (aka, re-arm) the notification. For CLEAR we'll get a single notification each time new data becomes ready.

Although kqueue and epoll are platform-specific, they're quite similar, allowing us to create a simple abstraction to target either platform - the topic of our next part. Furthermore, their similarity has the benefit of resulting in a rather short post!

A more complete example is included below, including our Server, Client and writes. Here you'll see the udata field used to aClient (via @intFromPtr and @ptrFromInt).

This code leverages the bulk-modification capabilities of kevent. When we add or modify a notification, we "stage" these in a local change_list. Only when the change_list is full or KQueue.wait is called do we apply the changes. In the latter case, applying the changes and waiting for readiness is done in a single system call. All of this is a simple but effective way to reduce the number of system calls we must make.

const std = @import("std");
const net = std.net;
const posix = std.posix;
const system = std.posix.system;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("0.0.0.0", 5882);
    try server.run(address);
    std.debug.print("STOPPED\n", .{});
}

// 1 minute
const READ_TIMEOUT_MS = 60_000;

const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;

const Server = struct {
    // maximum # of allowed clients
    max: usize,

    loop: KQueue,

    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    read_timeout_list: ClientList,

    // for creating client
    client_pool: std.heap.MemoryPool(Client),
    // for creating nodes for our read_timeout list
    client_node_pool: std.heap.MemoryPool(ClientList.Node),

    fn init(allocator: Allocator, max: usize) !Server {
        const loop = try KQueue.init();
        errdefer loop.deinit();

        const clients = try allocator.alloc(*Client, max);
        errdefer allocator.free(clients);

        return .{
            .max = max,
            .loop = loop,
            .connected = 0,
            .allocator = allocator,
            .read_timeout_list = .{},
            .client_pool = std.heap.MemoryPool(Client).init(allocator),
            .client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
        };
    }

    fn deinit(self: *Server) void {
        self.loop.deinit();
        self.client_pool.deinit();
        self.client_node_pool.deinit();
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);
        var read_timeout_list = &self.read_timeout_list;

        try self.loop.addListener(listener);

        while (true) {
            const next_timeout = self.enforceTimeout();
            const ready_events = try self.loop.wait(next_timeout);
            for (ready_events) |ready| {
                switch (ready.udata) {
                    0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
                    else => |nptr| {
                        const filter = ready.filter;
                        const client: *Client = @ptrFromInt(nptr);

                        if (filter == system.EVFILT.READ) {
                            // this socket is ready to be read
                            while (true) {
                                const msg = client.readMessage() catch {
                                    self.closeClient(client);
                                    break;
                                } orelse break;   // no more messages

                                client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
                                read_timeout_list.remove(client.read_timeout_node);
                                read_timeout_list.append(client.read_timeout_node);

                                client.writeMessage(msg) catch {
                                    self.closeClient(client);
                                    break;
                                };
                            }
                        } else if (filter == system.EVFILT.WRITE) {
                            client.write() catch self.closeClient(client);
                        }
                    }
                }
            }
        }
    }

    fn enforceTimeout(self: *Server) i32 {
        const now = std.time.milliTimestamp();
        var node = self.read_timeout_list.first;
        while (node) |n| {
            const client = n.data;
            const diff = client.read_timeout - now;
            if (diff > 0) {
                // this client's timeout is the first one that's in the
                // future, so we now know the maximum time we can block on
                // poll before having to call enforceTimeout again
                return @intCast(diff);
            }

            // This client's timeout is in the past. Close the socket
            // Ideally, we'd call server.removeClient() and just remove the
            // client directly. But within this method, we don't know the
            // client_polls index. When we move to epoll / kqueue, this problem
            // will go away, since we won't need to maintain polls and client_polls
            // in sync by index.
            posix.shutdown(client.socket, .recv) catch {};
            node = n.next;
        } else {
            // We have no client that times out in the future (if we did
            // we would have hit the return above).
            return -1;
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        const space = self.max - self.connected;
        for (0..space) |_| {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = try self.client_pool.create();
            errdefer self.client_pool.destroy(client);
            client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };
            errdefer client.deinit(self.allocator);

            client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
            client.read_timeout_node = try self.client_node_pool.create();
            errdefer self.client_node_pool.destroy(client.read_timeout_node);

            client.read_timeout_node.* = .{
                .next = null,
                .prev = null,
                .data = client,
            };
            self.read_timeout_list.append(client.read_timeout_node);
            try self.loop.newClient(client);
            self.connected += 1;
        } else {
            // we've run out of space, stop monitoring the listening socket
            try self.loop.removeListener(listener);
        }
    }

    fn closeClient(self: *Server, client: *Client) void {
        self.read_timeout_list.remove(client.read_timeout_node);

        posix.close(client.socket);
        self.client_node_pool.destroy(client.read_timeout_node);
        client.deinit(self.allocator);
        self.client_pool.destroy(client);
    }
};

const Client = struct {
    loop: *KQueue,

    socket: posix.socket_t,
    address: std.net.Address,

    // Used to read length-prefixed messages
    reader: Reader,

    // Bytes we still need to send. This is a slice of `write_buf`. When
    // empty, then we're in "read-mode" and are waiting for a message from the
    // client.
    to_write: []u8,

    // Buffer for storing our lenght-prefixed messaged
    write_buf: []u8,

    // absolute time, in millisecond, when this client should timeout if
    // a message isn't received
    read_timeout: i64,

    // Node containing this client in the server's read_timeout_list
    read_timeout_node: *ClientNode,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *KQueue) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        const write_buf = try allocator.alloc(u8, 4096);
        errdefer allocator.free(write_buf);

        return .{
            .loop = loop,
            .reader = reader,
            .socket = socket,
            .address = address,
            .to_write = &.{},
            .write_buf = write_buf,
            .read_timeout = 0, // let the server set this
            .read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
        allocator.free(self.write_buf);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }

    fn writeMessage(self: *Client, msg: []const u8) !void {
        if (self.to_write.len > 0) {
            // Depending on how you structure your code, this might not be possible
            // For example, in an HTTP server, the application might not control
            // the actual "writeMessage" call, and thus it would not be possible
            // to make more than one writeMessage call per request.
            // For this demo, we'll just return an error.
            return error.PendingMessage;
        }

        if (msg.len + 4 > self.write_buf.len) {
            // Could allocate a dynamic buffer. Could use a large buffer pool.
            return error.MessageTooLarge;
        }

        // copy our length prefix + message to our buffer
        std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
        const end = msg.len + 4;
        @memcpy(self.write_buf[4..end], msg);

        // setup our to_write slice
        self.to_write = self.write_buf[0..end];

        // immediately write what we can
        return self.write();
    }

    // Returns `false` if we didn't manage to write the whole mssage
    // Returns `true` if the message is fully written
    fn write(self: *Client) !void {
        var buf = self.to_write;
        defer self.to_write = buf;
        while (buf.len > 0) {
            const n = posix.write(self.socket, buf) catch |err| switch (err) {
                error.WouldBlock => return self.loop.writeMode(self),
                else => return err,
            };

            if (n == 0) {
                return error.Closed;
            }
            buf = buf[n..];
        } else {
            return self.loop.readMode(self);
        }
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};

// We'll eventually need to build a platform abstractions between epoll and
// kqueue. This is a rough start.
const KQueue = struct {
    kfd: posix.fd_t,
    event_list: [128]system.Kevent = undefined,
    change_list: [16]system.Kevent = undefined,
    change_count: usize = 0,

    fn init() !KQueue {
        const kfd = try posix.kqueue();
        return .{.kfd = kfd};
    }

    fn deinit(self: KQueue) void {
        posix.close(self.kfd);
    }

    fn wait(self: *KQueue, timeout_ms: i32) ![]system.Kevent {
        const timeout = posix.timespec{
            .sec = @intCast(@divTrunc(timeout_ms, 1000)),
            .nsec = @intCast(@mod(timeout_ms, 1000) * 1000000),
        };
        const count = try posix.kevent(self.kfd, self.change_list[0..self.change_count], &self.event_list, &timeout);
        self.change_count = 0;
        return self.event_list[0..count];
    }

    fn addListener(self: *KQueue, listener: posix.socket_t) !void {
        // ok to use EV.ADD to renable the listener if it was previous
        // disabled via removeListener
        try self.queueChange(.{
            .ident = @intCast(listener),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ADD,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });
    }

    fn removeListener(self: *KQueue, listener: posix.socket_t) !void {
        try self.queueChange(.{
            .ident = @intCast(listener),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });
    }

    fn newClient(self: *KQueue, client: *Client) !void {
        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ADD,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });

        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.WRITE,
            .flags = posix.system.EV.ADD | posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });
    }

    fn readMode(self: *KQueue, client: *Client) !void {
        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.WRITE,
            .flags = posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });

        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ENABLE,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });

    }

    fn writeMode(self: *KQueue, client: *Client) !void {
        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });

        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .flags = posix.system.EV.ENABLE,
            .filter = posix.system.EVFILT.WRITE,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });
    }

    fn queueChange(self: *KQueue, event: system.Kevent) !void {
        var count = self.change_count;
        if (count == self.change_list.len) {
            // our change_list batch is full, apply it
            _ = try posix.kevent(self.kfd, &self.change_list, &.{}, null);
            count = 0;
        }
        self.change_list[count] = event;
        self.change_count = count + 1;
    }
};