TCP Server in Zig - Part 6 - Epoll
Oct 23, 2024
In the last two parts we introduced the poll system call and, with it, patterns around evented I/O. poll has the advantage of being simple to use and supported on most platforms. However, as we saw, we need to iterate through all monitored sockets to figure out which is ready.
Another awkwardness with poll's API is associating application-specific data with each pollfd being monitored. To know which Client is ready, we need to create a mapping between the pollfd and Client. While this isn't the end of the world, it is something that poll alternatives address.
The first alternative API that we'll look at is epoll. The bulk of what we've learnt about evented I/O is still applicable as-is. However, epoll is a Linux-specific system call. In order to support it as well as other platforms, we'll need to leverage Zig's comptime as well as some layer of abstraction. This is something we'll look at after exploring kqueue, a BSD/MacOS-specific system call.
The epoll API is three separate system calls. epoll_create1 is used to create an epoll file descriptor. You can think of this as our epoll instance. epoll_ctl is used to modify our instance. With it, we can add, modify and remove sockets that we want our instance to monitor. Finally, we have epoll_wait which blocks until one of the monitored sockets is ready or until the configured timeout is reached.
With poll we specify the sockets to monitor on the blocking call to poll itself. This means we need to maintain our own array of pollfd. With epoll, because we have distinct functions to modify and wait, this ugliness is gone.
Here's a bare-bonned runnable (on Linux) example:
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const efd = try posix.epoll_create1(0);
defer posix.close(efd);
{
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
}
var ready_list: [128]linux.epoll_event = undefined;
while (true) {
const ready_count = posix.epoll_wait(efd, &ready_list, -1);
for (ready_list[0..ready_count]) |ready| {
const ready_socket = ready.data.fd;
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = client_socket}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
} else {
var closed = false;
var buf: [4096]u8 = undefined;
const read = posix.read(ready_socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
}
if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
posix.close(ready_socket);
}
}
}
}
}
There's a number of interesting things in the above code, especially compared to our first runnable code that used poll. The list of epoll_event's that we pass into the function is going to be filled by epoll_wait itself. The return value tells us how many events were filled. So instead of having to loop through every monitored event, seeing which is ready, we're given a list of ready sockets, i.e. ready_list[0..ready_count]. It's significantly more efficient.
Another change is how much simpler it is to remove a connection. We can explicitly remove a monitor via epoll_ctl and the CTL_DEL operation. But just closing the socket is enough to remove it.
From the above, we can tell that epoll_ctl takes the epoll file descriptor created via epoll_create1, the operation (CTL_ADD, CTL_DEL or CTL_MOD), the file descriptor to monitor (sockets in our case) and an epoll_event. The epoll_event structure has two fields, the events which is like a pollfd.events and data. The data field is a untagged union and allows us to associate arbitrary information with an epoll_event. The value we store in data can be retrieved when iterating through the ready list. Above, we store the socket and retrieve the socket from data.fd. Storing the socket directly in data is a good start, but we can do more with this field.
One of the challenge we faced with poll was to associate an pollfd with its corresponding client. We managed to get this working by keeping two slices in sync, but never bothered figuring out how to get a pollfd from a client. With epoll this is much simpler. Specifically, the epoll_event.data field lets us store the numeric representation of a pointer. Or, put differently, we can associate anything we want with an epoll_event.
The benefit of this might not be immediately obvious, but let's add a simple Client and use data to associate our epoll_event directly with a client:
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
const Client = struct {
socket: posix.socket_t,
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
const address = try std.net.Address.parseIp("0.0.0.0", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const efd = try posix.epoll_create1(0);
{
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.ptr = 0}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
}
var ready_list: [128]linux.epoll_event = undefined;
while (true) {
const ready_count = posix.epoll_wait(efd, &ready_list, -1);
for (ready_list[0..ready_count]) |ready| {
switch (ready.data.ptr) {
0 => {
const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(socket);
const client = try allocator.create(Client);
errdefer allocator.destroy(client);
client.* = .{.socket = socket};
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(client)}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, socket, &event);
},
else => |nptr| {
var closed = false;
var buf: [4096]u8 = undefined;
const client: *Client = @ptrFromInt(nptr);
const read = posix.read(client.socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{client.socket, buf[0..read]});
}
if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
posix.close(client.socket);
allocator.destroy(client);
}
},
}
}
}
}
The data.ptr is used to store the usize-representation of our client pointer. We use Zig's built-in @intFromPtr to turn a pointer into an integer and then @ptrFromInt to reverse the process. A real hack in the above code is setting the data.ptr for our listening socket to 0. Remember that data is an untagged union, if we use data.fd = listener for our listening socket, but then data.ptr = @intFormPtr(client) for client connections, we won't be able to tell which tag is active. The "right" way to fix this is to create our own tagged union:
const EventType = union(enum) {
listener: posix.socket_t,
recv: *Client,
};
But that's another layer of allocations and indirection. Instead, we use 0 and hope that the operating system never allocate a Client at memory address 0 (this is more than "hope", and aside from embedded systems, most platforms would never allocate memory at address 0).
@intFromPtr is a built-in function which takes a pointer and gives you the usize value. Now, when a socket is ready, we can get the corresponding client by using the reverse function, @ptrFromInt on the data.ptr field to get the client:
const client: Client = @ptrFromInt(nptr);
It might be obvious to some, but for this to work, a client has to be allocated on the heap. While we can call @intToPtr on a stack-allocated value, that value won't magically remain valid beyond its scope.
Being able to associate arbitrary data with an epoll_event tightens up our code, but the real benefit is the ease with which we can modify the epoll instance from a client. The way we previously structured our poll-based server, it wasn't possible for writeMessage to be called externally (i.e. from application code) because the Client had no way to alter the server's polls list. We could have had each Client reference a *Server, but it would have been tricky to deal with the moving indexes.
With epoll we still need each Client to have the epoll instance (either directly, or via a reference to the *Server), but can use epoll_ctl to modify the monitor for the client. You'll recall that previously our write function would return a false to signal an incomplete write and true to signal a completed write. This value was used by our run function to switch to and from write-mode and read-mode. We can now bake this into write:
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => {
var event = linux.epoll_event{
.events = linux.EPOLL.OUT,
.data = .{.ptr = @intFromPtr(self)},
};
return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
}
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(self)},
};
return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
}
}
That might seem like a minor change, but it enables something you'll almost certainly need: the ability to initiate a write from the application. There is one inefficiency in the above code: on a complete write, we revert to read-mode. However, most of the time, we'll probably never have gone into write-mode to begin with. As we've said before, unless your server is very busy, your client is lagging, or you're sending very large messages, there's a reasonable chance that posix.write will succeed in a single call. In such cases, reverting to read-mode is an unnecessary and wasteful system call. I'll leave this up to you to fix, but one solution would be to add an IOMode enum with two values: read and write and track which the client is currently in. Then we can check if self.mode == .write before switching to read-mode.
We mentioned that poll is always level-triggered. That is, as long as a condition holds, we'll get a notification. For example, if one call to poll notifies us that a certain socket is ready and we then callpoll again, without reading the socket, we'll get another notification. This is also how epoll works by default. But we can make epoll edge-triggered on a per-socket basis.
Let's go back to our first epoll program, and change the code that handles a client's readiness:
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const efd = try posix.epoll_create1(0);
{
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
}
var ready_list: [128]linux.epoll_event = undefined;
while (true) {
const ready_count = posix.epoll_wait(efd, &ready_list, -1);
for (ready_list[0..ready_count]) |ready| {
const ready_socket = ready.data.fd;
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.fd = client_socket}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
} else {
std.debug.print(".", .{});
}
}
}
}
For now, only the end of the code has changed. Instead of trying to read the message and echo'ing it back, we write a dot .. If you run this code (again, Linux only), connect and send a message, your screen should get filled with dots. This is because epoll is, by default, in level-triggered mode and continues to notify us of the socket's readiness.
When adding the monitor for the client socket, if we change events from linux.EPOLL.IN to linux.EPOLL.IN | linux.EPOLL.ET, i.e.:
var event = linux.epoll_event{
.events = linux.EPOLL.IN | linux.EPOLL.ET,
.data = .{.fd = client_socket}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
we make the monitor edge-triggered (ET). Now, if we connect a client and send a message, we'll get a single notification. The monitor isn't deleted or even disabled. If we send another message, we'll get a new notification. epoll is pretty smart too, a new event doesn't have to happen while epoll_wait is blocked. So there is no race condition before setting a monitor and calling epoll_wait. In our above code, if the client sends a message right after the socket is added with epoll_ctl but before epoll_wait is called, we will get the notification.
We can also use linux.EPOLL.ONESHOT which disables the notification for the socket after readiness has been signaled once, including for any new messages. epoll_ctl with the linux.EPOLL.CTL_MOD operation needs to be called to re-enable the monitor. This is obviously different than using edge-triggered since notifications are disabled even for new events. While ONESHOT might be useful to make sure that a single message per client is processed, it does have the disadvantage of requiring many more system calls - one every time we want to re-enable to monitor.
Besides the fact that epoll is Linux-specific, it's safe to think of it as a better version of poll. It's more efficient, easier to integrate and has more features. Except for the specifics of the API, everything about evented I/O is unchanged. Using @intFromPtr and @ptrFromInt can actually be useful now and again, but it's mostly used as above, when integrating with a C library that lets you associate arbitrary data (often a usize) with a C structure.
Below, you'll find a full working (in Linux) example. It merges what we've learned about epoll with our Server, Client and Reader from previous parts. We've also introduced an Epoll structure. The goal is to start creating some abstraction to eventually deal with having platform-specific implementations (i.e. using epoll on Linux and kqueue on BSD/MacOS). This abstration is incomplete, but it's a start and something we'll dedicate a whole part to.
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
try server.run(address);
std.debug.print("STOPPED\n", .{});
}
const READ_TIMEOUT_MS = 60_000;
const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;
const Server = struct {
max: usize,
loop: Epoll,
allocator: Allocator,
connected: usize,
read_timeout_list: ClientList,
client_pool: std.heap.MemoryPool(Client),
client_node_pool: std.heap.MemoryPool(ClientList.Node),
fn init(allocator: Allocator, max: usize) !Server {
const loop = try Epoll.init();
errdefer loop.deinit();
return .{
.max = max,
.loop = loop,
.connected = 0,
.allocator = allocator,
.read_timeout_list = .{},
.client_pool = std.heap.MemoryPool(Client).init(allocator),
.client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
};
}
fn deinit(self: *Server) void {
self.loop.deinit();
self.client_pool.deinit();
self.client_node_pool.deinit();
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var read_timeout_list = &self.read_timeout_list;
try self.loop.addListener(listener);
while (true) {
const next_timeout = self.enforceTimeout();
const ready_events = self.loop.wait(next_timeout);
for (ready_events) |ready| {
switch (ready.data.ptr) {
0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
else => |nptr| {
const events = ready.events;
const client: *Client = @ptrFromInt(nptr);
if (events & linux.EPOLL.IN == linux.EPOLL.IN) {
while (true) {
const msg = client.readMessage() catch {
self.closeClient(client);
break;
} orelse break;
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
read_timeout_list.remove(client.read_timeout_node);
read_timeout_list.append(client.read_timeout_node);
client.writeMessage(msg) catch {
self.closeClient(client);
break;
};
}
} else if (events & linux.EPOLL.OUT == linux.EPOLL.OUT) {
client.write() catch self.closeClient(client);
}
}
}
}
}
}
fn enforceTimeout(self: *Server) i32 {
const now = std.time.milliTimestamp();
var node = self.read_timeout_list.first;
while (node) |n| {
const client = n.data;
const diff = client.read_timeout - now;
if (diff > 0) {
return @intCast(diff);
}
posix.shutdown(client.socket, .recv) catch {};
node = n.next;
} else {
return -1;
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
const space = self.max - self.connected;
for (0..space) |_| {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = try self.client_pool.create();
errdefer self.client_pool.destroy(client);
client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
errdefer client.deinit(self.allocator);
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
client.read_timeout_node = try self.client_node_pool.create();
errdefer self.client_node_pool.destroy(client.read_timeout_node);
client.read_timeout_node.* = .{
.next = null,
.prev = null,
.data = client,
};
self.read_timeout_list.append(client.read_timeout_node);
try self.loop.newClient(client);
self.connected += 1;
} else {
try self.loop.removeListener(listener);
}
}
fn closeClient(self: *Server, client: *Client) void {
self.read_timeout_list.remove(client.read_timeout_node);
posix.close(client.socket);
self.client_node_pool.destroy(client.read_timeout_node);
client.deinit(self.allocator);
self.client_pool.destroy(client);
}
};
const Client = struct {
loop: *Epoll,
socket: posix.socket_t,
address: std.net.Address,
reader: Reader,
to_write: []u8,
write_buf: []u8,
read_timeout: i64,
read_timeout_node: *ClientNode,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *Epoll) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
const write_buf = try allocator.alloc(u8, 4096);
errdefer allocator.free(write_buf);
return .{
.loop = loop,
.reader = reader,
.socket = socket,
.address = address,
.to_write = &.{},
.write_buf = write_buf,
.read_timeout = 0,
.read_timeout_node = undefined,
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
allocator.free(self.write_buf);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
fn writeMessage(self: *Client, msg: []const u8) !void {
if (self.to_write.len > 0) {
return error.PendingMessage;
}
if (msg.len + 4 > self.write_buf.len) {
return error.MessageTooLarge;
}
std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
const end = msg.len + 4;
@memcpy(self.write_buf[4..end], msg);
self.to_write = self.write_buf[0..end];
return self.write();
}
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return self.loop.writeMode(self),
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
return self.loop.readMode(self);
}
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};
const Epoll = struct {
efd: posix.fd_t,
ready_list: [128]linux.epoll_event = undefined,
fn init() !Epoll {
const efd = try posix.epoll_create1(0);
return .{.efd = efd};
}
fn deinit(self: Epoll) void {
posix.close(self.efd);
}
fn wait(self: *Epoll, timeout: i32) []linux.epoll_event {
const count = posix.epoll_wait(self.efd, &self.ready_list, timeout);
return self.ready_list[0..count];
}
fn addListener(self: Epoll, listener: posix.socket_t) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = 0},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, listener, &event);
}
fn removeListener(self: Epoll, listener: posix.socket_t) !void {
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_DEL, listener, null);
}
fn newClient(self: Epoll, client: *Client) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(client)},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, client.socket, &event);
}
fn readMode(self: Epoll, client: *Client) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(client)},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
}
fn writeMode(self: Epoll, client: *Client) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.OUT,
.data = .{.ptr = @intFromPtr(client)},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
}
};