TCP Server in Zig - Part 7 - Kqueue
kqueue
is a BSD/MacOS alternative over poll
. In most ways, kqueue
is similar to the Linux-specific epoll
, which itself is important, but important, incremental upgrade to poll
. Because kqueue
has a single function it superficially looks like poll
. But, as we'll soon see, that single function can behave in two different ways, making its API and the integration into our code very similar to epoll
.
Because kqueue
is rather similar to epoll
, this part is shorter as it assumes that you're familiar with topics discussed in part 6, such as edge-triggering and @intToPtr
.
kevent
Where epoll
has one function to modify the epoll file descriptor (epoll_ctl
) and one to wait for notifications (epoll_wait
), kqueue
uses a single function for both purposes: kevent
. However, depending on the values passed to kevent
, it can either modify the kqueue instance or wait for notifications or both. Thus, the single function can act like either of the epoll
functions or combine both in a single call. The kevent
function takes 4 parameters:
- The kqueue file descriptor which is the kqueue instance that we're modifying and/or waiting on. Created using
posix.kqueue
. - A list of
posix.Kevent
that represents notifications we want to add/change/delete. Known as thechangelist
. Can be empty. - A list of
posix.Kevent
that indicate readiness. Known as theeventlist
. Can be empty. - A timeout as a
posix.timespec
. Can be null.
The key to understanding this API is knowing that when the eventlist
is empty, kevent
immediately returns. Thus, callingkevent
with an empty eventlist
is like calling epoll_ctl
. Therefore, like epoll
and unlike poll
, as long as we have the kqueue instance, we can easily add, remove and change monitors.
The kqueue
API has one advantage over epoll
: we can apply modification in bulk. Where epoll_ctl
takes a single epoll_event
, kevent
takes an array of Kevent
. In other words, with kqueue
it should be possible to make fewer system calls.
This is a working example (on BSD / MacOS). To keep it simple and similar to our first epoll
sample, we're not leveraging the bulk-modification capabilities of the API but rather add one event at a time (the final example does add them in bulk):
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const kfd = try posix.kqueue();
defer posix.close(kfd);
{
// monitor our listening socket
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = @intCast(listener),
}}, &.{}, null);
}
var ready_list: [128]posix.Kevent = undefined;
while (true) {
const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
for (ready_list[0..ready_count]) |ready| {
const ready_socket: i32 = @intCast(ready.udata);
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(client_socket),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(client_socket),
}}, &.{}, null);
} else {
var closed = false;
var buf: [4096]u8 = undefined;
const read = posix.read(ready_socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
}
if (closed) {
posix.close(ready_socket);
}
}
}
}
}
Like with epoll
, we can attach arbitrary information via the udata
field. Above we're using the file descriptor, but in a more complete example we'd likely use @intFromPtr
to get a usize
representation of an application-specific "Client" struct. The Kevent
struct has two additional fields: fflags
and data
. These hold flags and data to use for different filters. With sockets, where we're only interested in the READ
and WRITE
filters, these should be set to zero. In a future part, we'll see a brief example of a different filter which does leverage the fflags
field.
With epoll
the monitors we add, modify are identified by the 3rd parameter we pass to epoll_ctl
. In all the code we've seen so far, that was either the listening socket or the client socket, but more generally, it's the file descriptor to monitor. With kqueue
the identifier is the combination of the ident
and filter
fields. With epoll
we toggled a client from read-mode to write-mode by modifying the existing notifier (identified by the socket) with the CTL_MOD
operation. In kqueue
we'd need delete the read monitor and then add a write monitor. Or, and this is what we do in the full example given at the end, we add both a read and write monitor, but disable the write monitor. We can toggle the mode by disabling the active one and enabling the disabled one:
Read Mode:
key=(ident: socket1, filter: read), enabled=true
key=(ident: socket1, filter: write), enabled=false
Write Mode:
key=(ident: socket1, filter: read), enabled=false
key=(ident: socket1, filter: write), enabled=true
This also means that filter
isn't a bitwise flag. To check if the socket is ready for reading, we just have to compare ready.filter == posix.system.EVFILT.READ
.
Edge-Triggered
In addition to using the EV.READ
and EV.WRITE
flags, we can also set EV.ONESHOT
, EV.DISPATCH
and EV.CLEAR
.
EV.ONESHOT
removes the notification after readiness has been reported, making it one-time-only. The notification has to be re-added using the EV.ADD
flag.
EV.DISPATCH
is similar but rather than removing the notification, it disables it (thus, EV.DISPATCH
is like EPOLL.ONESHOT
). To re-arm the notification, EV.ENABLE
or EV.ADD
have to be called ("adding" an already added entry, whether it's disabled or not, does not create a duplicate, and will re-enable it if disabled). The difference between removing (ONESHOT
) and disabling (DISPATCH
) is that disabling and re-enabling is faster but takes a bit more memory since the internal structure is kept. If you intend to frequently re-arm the notification, EV.DISPATCH
might be a better choice.
EV.CLEAR
is similar to EPOLL.ET
, causing kevent
to signal state change rather than readiness.
As before, we can take our above code, strip out the "read" handling to see the various behaviors. Only the end of the code was changed
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const kfd = try posix.kqueue();
{
// monitor our listening socket
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(listener),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(listener),
}}, &.{}, null);
}
var ready_list: [128]posix.Kevent = undefined;
while (true) {
const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
for (ready_list[0..ready_count]) |ready| {
const ready_socket: i32 = @intCast(ready.udata);
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
_ = try posix.kevent(kfd, &.{.{
.ident = @intCast(client_socket),
.flags = posix.system.EV.ADD,
.filter = posix.system.EVFILT.READ,
.fflags = 0,
.data = 0,
.udata = @intCast(client_socket),
}}, &.{}, null);
} else {
// THIS WAS CHANGED
std.debug.print(".", .{});
}
}
}
}
If you connect to the above and send a message, your screen will get flooded with dots (.
) as kevent
will continuously notify about the sockets readiness (since we never read from it). Changing the filter for the added client socket from: posix.system.EV.ADD
to one of:
posix.system.EV.ADD | posix.system.EV.ONESHOT
,posix.system.EV.ADD | posix.system.EV.DISPATCH
, orposix.system.EV.ADD | posix.system.EV.CLEAR
.
will show how each behaves. For the first two, ONESHOT
and DISPATCH
no matter how much data we send, we'll only ever get a single notification. We'd need to re-add or re-enable (aka, re-arm) the notification. For CLEAR
we'll get a single notification each time new data becomes ready.
Conclusion
Although kqueue
and epoll
are platform-specific, they're quite similar, allowing us to create a simple abstraction to target either platform - the topic of our next part. Furthermore, their similarity has the benefit of resulting in a rather short post!
A more complete example is included below, including our Server
, Client
and writes. Here you'll see the udata
field used to aClient
(via @intFromPtr
and @ptrFromInt
).
This code leverages the bulk-modification capabilities of kevent
. When we add or modify a notification, we "stage" these in a local change_list
. Only when the change_list
is full or KQueue.wait
is called do we apply the changes. In the latter case, applying the changes and waiting for readiness is done in a single system call. All of this is a simple but effective way to reduce the number of system calls we must make.
Appendix A - Code
const std = @import("std");
const net = std.net;
const posix = std.posix;
const system = std.posix.system;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("0.0.0.0", 5882);
try server.run(address);
std.debug.print("STOPPED\n", .{});
}
// 1 minute
const READ_TIMEOUT_MS = 60_000;
const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;
const Server = struct {
// maximum # of allowed clients
max: usize,
loop: KQueue,
// creates our polls and clients slices and is passed to Client.init
// for it to create our read buffer.
allocator: Allocator,
// The number of clients we currently have connected
connected: usize,
read_timeout_list: ClientList,
// for creating client
client_pool: std.heap.MemoryPool(Client),
// for creating nodes for our read_timeout list
client_node_pool: std.heap.MemoryPool(ClientList.Node),
fn init(allocator: Allocator, max: usize) !Server {
const loop = try KQueue.init();
errdefer loop.deinit();
const clients = try allocator.alloc(*Client, max);
errdefer allocator.free(clients);
return .{
.max = max,
.loop = loop,
.connected = 0,
.allocator = allocator,
.read_timeout_list = .{},
.client_pool = std.heap.MemoryPool(Client).init(allocator),
.client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
};
}
fn deinit(self: *Server) void {
self.loop.deinit();
self.client_pool.deinit();
self.client_node_pool.deinit();
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var read_timeout_list = &self.read_timeout_list;
try self.loop.addListener(listener);
while (true) {
const next_timeout = self.enforceTimeout();
const ready_events = try self.loop.wait(next_timeout);
for (ready_events) |ready| {
switch (ready.udata) {
0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
else => |nptr| {
const filter = ready.filter;
const client: *Client = @ptrFromInt(nptr);
if (filter == system.EVFILT.READ) {
// this socket is ready to be read
while (true) {
const msg = client.readMessage() catch {
self.closeClient(client);
break;
} orelse break; // no more messages
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
read_timeout_list.remove(client.read_timeout_node);
read_timeout_list.append(client.read_timeout_node);
client.writeMessage(msg) catch {
self.closeClient(client);
break;
};
}
} else if (filter == system.EVFILT.WRITE) {
client.write() catch self.closeClient(client);
}
}
}
}
}
}
fn enforceTimeout(self: *Server) i32 {
const now = std.time.milliTimestamp();
var node = self.read_timeout_list.first;
while (node) |n| {
const client = n.data;
const diff = client.read_timeout - now;
if (diff > 0) {
// this client's timeout is the first one that's in the
// future, so we now know the maximum time we can block on
// poll before having to call enforceTimeout again
return @intCast(diff);
}
// This client's timeout is in the past. Close the socket
// Ideally, we'd call server.removeClient() and just remove the
// client directly. But within this method, we don't know the
// client_polls index. When we move to epoll / kqueue, this problem
// will go away, since we won't need to maintain polls and client_polls
// in sync by index.
posix.shutdown(client.socket, .recv) catch {};
node = n.next;
} else {
// We have no client that times out in the future (if we did
// we would have hit the return above).
return -1;
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
const space = self.max - self.connected;
for (0..space) |_| {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = try self.client_pool.create();
errdefer self.client_pool.destroy(client);
client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
errdefer client.deinit(self.allocator);
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
client.read_timeout_node = try self.client_node_pool.create();
errdefer self.client_node_pool.destroy(client.read_timeout_node);
client.read_timeout_node.* = .{
.next = null,
.prev = null,
.data = client,
};
self.read_timeout_list.append(client.read_timeout_node);
try self.loop.newClient(client);
self.connected += 1;
} else {
// we've run out of space, stop monitoring the listening socket
try self.loop.removeListener(listener);
}
}
fn closeClient(self: *Server, client: *Client) void {
self.read_timeout_list.remove(client.read_timeout_node);
posix.close(client.socket);
self.client_node_pool.destroy(client.read_timeout_node);
client.deinit(self.allocator);
self.client_pool.destroy(client);
}
};
const Client = struct {
loop: *KQueue,
socket: posix.socket_t,
address: std.net.Address,
// Used to read length-prefixed messages
reader: Reader,
// Bytes we still need to send. This is a slice of `write_buf`. When
// empty, then we're in "read-mode" and are waiting for a message from the
// client.
to_write: []u8,
// Buffer for storing our lenght-prefixed messaged
write_buf: []u8,
// absolute time, in millisecond, when this client should timeout if
// a message isn't received
read_timeout: i64,
// Node containing this client in the server's read_timeout_list
read_timeout_node: *ClientNode,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *KQueue) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
const write_buf = try allocator.alloc(u8, 4096);
errdefer allocator.free(write_buf);
return .{
.loop = loop,
.reader = reader,
.socket = socket,
.address = address,
.to_write = &.{},
.write_buf = write_buf,
.read_timeout = 0, // let the server set this
.read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
allocator.free(self.write_buf);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
fn writeMessage(self: *Client, msg: []const u8) !void {
if (self.to_write.len > 0) {
// Depending on how you structure your code, this might not be possible
// For example, in an HTTP server, the application might not control
// the actual "writeMessage" call, and thus it would not be possible
// to make more than one writeMessage call per request.
// For this demo, we'll just return an error.
return error.PendingMessage;
}
if (msg.len + 4 > self.write_buf.len) {
// Could allocate a dynamic buffer. Could use a large buffer pool.
return error.MessageTooLarge;
}
// copy our length prefix + message to our buffer
std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
const end = msg.len + 4;
@memcpy(self.write_buf[4..end], msg);
// setup our to_write slice
self.to_write = self.write_buf[0..end];
// immediately write what we can
return self.write();
}
// Returns `false` if we didn't manage to write the whole mssage
// Returns `true` if the message is fully written
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return self.loop.writeMode(self),
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
return self.loop.readMode(self);
}
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
// the length of our message + the length of our prefix
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};
// We'll eventually need to build a platform abstractions between epoll and
// kqueue. This is a rough start.
const KQueue = struct {
kfd: posix.fd_t,
event_list: [128]system.Kevent = undefined,
change_list: [16]system.Kevent = undefined,
change_count: usize = 0,
fn init() !KQueue {
const kfd = try posix.kqueue();
return .{.kfd = kfd};
}
fn deinit(self: KQueue) void {
posix.close(self.kfd);
}
fn wait(self: *KQueue, timeout_ms: i32) ![]system.Kevent {
const timeout = posix.timespec{
.sec = @intCast(@divTrunc(timeout_ms, 1000)),
.nsec = @intCast(@mod(timeout_ms, 1000) * 1000000),
};
const count = try posix.kevent(self.kfd, self.change_list[0..self.change_count], &self.event_list, &timeout);
self.change_count = 0;
return self.event_list[0..count];
}
fn addListener(self: *KQueue, listener: posix.socket_t) !void {
// ok to use EV.ADD to renable the listener if it was previous
// disabled via removeListener
try self.queueChange(.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = 0,
});
}
fn removeListener(self: *KQueue, listener: posix.socket_t) !void {
try self.queueChange(.{
.ident = @intCast(listener),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
}
fn newClient(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ADD,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.WRITE,
.flags = posix.system.EV.ADD | posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn readMode(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.WRITE,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.ENABLE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn writeMode(self: *KQueue, client: *Client) !void {
try self.queueChange(.{
.ident = @intCast(client.socket),
.filter = posix.system.EVFILT.READ,
.flags = posix.system.EV.DISABLE,
.fflags = 0,
.data = 0,
.udata = 0,
});
try self.queueChange(.{
.ident = @intCast(client.socket),
.flags = posix.system.EV.ENABLE,
.filter = posix.system.EVFILT.WRITE,
.fflags = 0,
.data = 0,
.udata = @intFromPtr(client),
});
}
fn queueChange(self: *KQueue, event: system.Kevent) !void {
var count = self.change_count;
if (count == self.change_list.len) {
// our change_list batch is full, apply it
_ = try posix.kevent(self.kfd, &self.change_list, &.{}, null);
count = 0;
}
self.change_list[count] = event;
self.change_count = count + 1;
}
};