TCP Server in Zig - Part 7 - Kqueue
Oct 27, 2024
kqueue
is a BSD/MacOS alternative over poll
. In most ways, kqueue
is similar to the Linux-specific epoll
, which itself is important, but important, incremental upgrade to poll
. Because kqueue
has a single function it superficially looks like poll
. But, as we'll soon see, that single function can behave in two different ways, making its API and the integration into our code very similar to epoll
.
Because kqueue
is rather similar to epoll
, this part is shorter as it assumes that you're familiar with topics discussed in part 6, such as edge-triggering and @intToPtr
.
Where epoll
has one function to modify the epoll file descriptor (epoll_ctl
) and one to wait for notifications (epoll_wait
), kqueue
uses a single function for both purposes: kevent
. However, depending on the values passed to kevent
, it can either modify the kqueue instance or wait for notifications or both. Thus, the single function can act like either of the epoll
functions or combine both in a single call. The kevent
function takes 4 parameters:
- The kqueue file descriptor which is the kqueue instance that we're modifying and/or waiting on. Created using
posix.kqueue
.
- A list of
posix.Kevent
that represents notifications we want to add/change/delete. Known as the changelist
. Can be empty.
- A list of
posix.Kevent
that indicate readiness. Known as the eventlist
. Can be empty.
- A timeout as a
posix.timespec
. Can be null.
The key to understanding this API is knowing that when the eventlist
is empty, kevent
immediately returns. Thus, callingkevent
with an empty eventlist
is like calling epoll_ctl
. Therefore, like epoll
and unlike poll
, as long as we have the kqueue instance, we can easily add, remove and change monitors.
The kqueue
API has one advantage over epoll
: we can apply modification in bulk. Where epoll_ctl
takes a single epoll_event
, kevent
takes an array of Kevent
. In other words, with kqueue
it should be possible to make fewer system calls.
This is a working example (on BSD / MacOS). To keep it simple and similar to our first epoll
sample, we're not leveraging the bulk-modification capabilities of the API but rather add one event at a time (the final example does add them in bulk):
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const kfd = try posix.kqueue();
defer posix.close(kfd);
{
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = @intCast(listener),
}}, &.{}, null);
}
var ready_list: [128]posix.Kevent = undefined;
while (true) {
const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
for (ready_list[0..ready_count]) |ready| {
const ready_socket: i32 = @intCast(ready.udata);
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(client_socket),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(client_socket),
}}, &.{}, null);
} else {
var closed = false;
var buf: [4096]u8 = undefined;
const read = posix.read(ready_socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
}
if (closed) {
posix.close(ready_socket);
}
}
}
}
}
Like with epoll
, we can attach arbitrary information via the udata
field. Above we're using the file descriptor, but in a more complete example we'd likely use @intFromPtr
to get a usize
representation of an application-specific "Client" struct. The Kevent
struct has two additional fields: fflags
and data
. These hold flags and data to use for different filters. With sockets, where we're only interested in the READ
and WRITE
filters, these should be set to zero. In a future part, we'll see a brief example of a different filter which does leverage the fflags
field.
With epoll
the monitors we add, modify are identified by the 3rd parameter we pass to epoll_ctl
. In all the code we've seen so far, that was either the listening socket or the client socket, but more generally, it's the file descriptor to monitor. With kqueue
the identifier is the combination of the ident
and filter
fields. With epoll
we toggled a client from read-mode to write-mode by modifying the existing notifier (identified by the socket) with the CTL_MOD
operation. In kqueue
we'd need to delete the read monitor and then add a write monitor. Or, and this is what we do in the full example given at the end, we add both a read and write monitor, but disable the write monitor. We can toggle the mode by disabling the active one and enabling the disabled one:
Read Mode:
key=(ident: socket1, filter: read), enabled=true
key=(ident: socket1, filter: write), enabled=false
Write Mode:
key=(ident: socket1, filter: read), enabled=false
key=(ident: socket1, filter: write), enabled=true
This also means that filter
isn't a bitwise flag. To check if the socket is ready for reading, we just have to compare ready.filter == posix.system.EVFILT.READ
.
In addition to using the EV.READ
and EV.WRITE
flags, we can also set EV.ONESHOT
, EV.DISPATCH
and EV.CLEAR
.
EV.ONESHOT
removes the notification after readiness has been reported, making it one-time-only. The notification has to be re-added using the EV.ADD
flag.
EV.DISPATCH
is similar but rather than removing the notification, it disables it (thus, EV.DISPATCH
is like EPOLL.ONESHOT
). To re-arm the notification, EV.ENABLE
or EV.ADD
have to be called ("adding" an already added entry, whether it's disabled or not, does not create a duplicate, and will re-enable it if disabled). The difference between removing (ONESHOT
) and disabling (DISPATCH
) is that disabling and re-enabling is faster but takes a bit more memory since the internal structure is kept. If you intend to frequently re-arm the notification, EV.DISPATCH
might be a better choice.
EV.CLEAR
is similar to EPOLL.ET
, causing kevent
to signal state change rather than readiness.
As before, we can take our above code, strip out the "read" handling to see the various behaviors. Only the end of the code was changed
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const kfd = try posix.kqueue();
{
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(listener),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(listener),
}}, &.{}, null);
}
var ready_list: [128]posix.Kevent = undefined;
while (true) {
const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
for (ready_list[0..ready_count]) |ready| {
const ready_socket: i32 = @intCast(ready.udata);
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(client_socket),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(client_socket),
}}, &.{}, null);
} else {
std.debug.print(".", .{});
}
}
}
}
If you connect to the above and send a message, your screen will get flooded with dots (.
) as kevent
will continuously notify about the sockets readiness (since we never read from it). Changing the filter for the added client socket from: posix.system.EV.ADD
to one of:
posix.system.EV.ADD | posix.system.EV.ONESHOT
,
posix.system.EV.ADD | posix.system.EV.DISPATCH
, or
posix.system.EV.ADD | posix.system.EV.CLEAR
.
will show how each behaves. For the first two, ONESHOT
and DISPATCH
no matter how much data we send, we'll only ever get a single notification. We'd need to re-add or re-enable (aka, re-arm) the notification. For CLEAR
we'll get a single notification each time new data becomes ready.
Although kqueue
and epoll
are platform-specific, they're quite similar, allowing us to create a simple abstraction to target either platform - the topic of our next part. Furthermore, their similarity has the benefit of resulting in a rather short post!
A more complete example is included below, including our Server
, Client
and writes. Here you'll see the udata
field used to aClient
(via @intFromPtr
and @ptrFromInt
).
This code leverages the bulk-modification capabilities of kevent
. When we add or modify a notification, we "stage" these in a local change_list
. Only when the change_list
is full or KQueue.wait
is called do we apply the changes. In the latter case, applying the changes and waiting for readiness is done in a single system call. All of this is a simple but effective way to reduce the number of system calls we must make.
const std = @import("std");
const net = std.net;
const posix = std.posix;
const system = std.posix.system;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("0.0.0.0", 5882);
try server.run(address);
std.debug.print("STOPPED\n", .{});
}
const READ_TIMEOUT_MS = 60_000;
const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;
const Server = struct {
max: usize,
loop: KQueue,
allocator: Allocator,
connected: usize,
read_timeout_list: ClientList,
client_pool: std.heap.MemoryPool(Client),
client_node_pool: std.heap.MemoryPool(ClientList.Node),
fn init(allocator: Allocator, max: usize) !Server {
const loop = try KQueue.init();
errdefer loop.deinit();
const clients = try allocator.alloc(*Client, max);
errdefer allocator.free(clients);
return .{
.max = max,
.loop = loop,
.connected = 0,
.allocator = allocator,
.read_timeout_list = .{},
.client_pool = std.heap.MemoryPool(Client).init(allocator),
.client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
};
}
fn deinit(self: *Server) void {
self.loop.deinit();
self.client_pool.deinit();
self.client_node_pool.deinit();
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var read_timeout_list = &self.read_timeout_list;
try self.loop.addListener(listener);
while (true) {
const next_timeout = self.enforceTimeout();
const ready_events = try self.loop.wait(next_timeout);
for (ready_events) |ready| {
switch (ready.udata) {
0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
else => |nptr| {
const filter = ready.filter;
const client: *Client = @ptrFromInt(nptr);
if (filter == system.EVFILT.READ) {
while (true) {
const msg = client.readMessage() catch {
self.closeClient(client);
break;
} orelse break;
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
read_timeout_list.remove(client.read_timeout_node);
read_timeout_list.append(client.read_timeout_node);
client.writeMessage(msg) catch {
self.closeClient(client);
break;
};
}
} else if (filter == system.EVFILT.WRITE) {
client.write() catch self.closeClient(client);
}
}
}
}
}
}
fn enforceTimeout(self: *Server) i32 {
const now = std.time.milliTimestamp();
var node = self.read_timeout_list.first;
while (node) |n| {
const client = n.data;
const diff = client.read_timeout - now;
if (diff > 0) {
return @intCast(diff);
}
posix.shutdown(client.socket, .recv) catch {};
node = n.next;
} else {
return -1;
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
const space = self.max - self.connected;
for (0..space) |_| {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = try self.client_pool.create();
errdefer self.client_pool.destroy(client);
client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
errdefer client.deinit(self.allocator);
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
client.read_timeout_node = try self.client_node_pool.create();
errdefer self.client_node_pool.destroy(client.read_timeout_node);
client.read_timeout_node.* = .{
.next = null,
.prev = null,
.data = client,
};
self.read_timeout_list.append(client.read_timeout_node);
try self.loop.newClient(client);
self.connected += 1;
} else {
try self.loop.removeListener(listener);
}
}
fn closeClient(self: *Server, client: *Client) void {
self.read_timeout_list.remove(client.read_timeout_node);
posix.close(client.socket);
self.client_node_pool.destroy(client.read_timeout_node);
client.deinit(self.allocator);
self.client_pool.destroy(client);
}
};
const Client = struct {
loop: *KQueue,
socket: posix.socket_t,
address: std.net.Address,
reader: Reader,
to_write: []u8,
write_buf: []u8,
read_timeout: i64,
read_timeout_node: *ClientNode,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *KQueue) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
const write_buf = try allocator.alloc(u8, 4096);
errdefer allocator.free(write_buf);
return .{
.loop = loop,
.reader = reader,
.socket = socket,
.address = address,
.to_write = &.{},
.write_buf = write_buf,
.read_timeout = 0,
.read_timeout_node = undefined,
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
allocator.free(self.write_buf);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
fn writeMessage(self: *Client, msg: []const u8) !void {
if (self.to_write.len > 0) {
return error.PendingMessage;
}
if (msg.len + 4 > self.write_buf.len) {
return error.MessageTooLarge;
}
std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
const end = msg.len + 4;
@memcpy(self.write_buf[4..end], msg);
self.to_write = self.write_buf[0..end];
return self.write();
}
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return self.loop.writeMode(self),
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
return self.loop.readMode(self);
}
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};
const KQueue = struct {
kfd: posix.fd_t,
event_list: [128]system.Kevent = undefined,
change_list: [16]system.Kevent = undefined,
change_count: usize = 0,
fn init() !KQueue {
const kfd = try posix.kqueue();
return .{.kfd = kfd};
}
fn deinit(self: KQueue) void {
posix.close(self.kfd);
}
fn wait(self: *KQueue, timeout_ms: i32) ![]system.Kevent {
const timeout = posix.timespec{
.sec = @intCast(@divTrunc(timeout_ms, 1000)),
.nsec = @intCast(@mod(timeout_ms, 1000) * 1000000),
};
const count = try posix.kevent(self.kfd, self.change_list[0..self.change_count], &self.event_list, &timeout);
self.change_count = 0;
return self.event_list[0..count];
}
fn addListener(self: *KQueue, listener: posix.socket_t) !void {
try self.queueChange(.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = 0,
});
}
fn removeListener(self: *KQueue, listener: posix.socket_t) !void {
try self.queueChange(.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
}
fn newClient(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.WRITE,
.flags = posix.system.EV.ADD | posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn readMode(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.WRITE,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ENABLE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn writeMode(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.flags = posix.system.EV.ENABLE,
.filter = posix.system.EVFILT.WRITE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn queueChange(self: *KQueue, event: system.Kevent) !void {
var count = self.change_count;
if (count == self.change_list.len) {
_ = try posix.kevent(self.kfd, &self.change_list, &.{}, null);
count = 0;
}
self.change_list[count] = event;
self.change_count = count + 1;
}
};