TCP Server in Zig - Part 5a - Poll
Oct 15, 2024
One of the reasons we introduced multithreading was to get around that fact that our read
and, to a lesser extent, accept
and write
, block. In our initial single-threaded implementation, rather than pushing our server to its limits, we spent a lot of time idle, waiting for data to come in. Multithreading helped to unblock the main thread so that new connections could be accepted - as long as we had enough workers to handle them - while processing existing connections. But threads are relatively heavyweight constructs and it isn't particularly efficient to spawn them and then have them blocked waiting for data.
There are two complimentary parts to improving our design: non-blocking I/O and event-notification.
It's possible to put a socket in non-blocking mode. When enabled, functions which normally block, such as accept
, read
or write
, will return error.WouldBlock
(or EAGAIN
in C) rather than blocking. As we're about to see, it's hard to take advantage of this on its own, but we're looking at it first, to get a feel for it. Consider what happens if we go to one of our earlier single-threaded implementations and enable non-blocking sockets (two lines have been changed, both commented):
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var buf: [128]u8 = undefined;
while (true) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
std.debug.print("error accept: {}\n", .{err});
continue;
};
defer posix.close(socket);
const stream = std.net.Stream{.handle = socket};
const read = try stream.read(&buf);
if (read == 0) {
continue;
}
try stream.writeAll(buf[0..read]);
}
}
When we create our listening socket, using posix.socket
, we now set the SOCK.NONBLOCK
flag. Similarly, when we accept
we now pass that same flag, SOCK.NONBLOCK
, as our fourth parameter. The first usage puts our listening socket in non-blocking mode (we'll see what that accomplishes shortly). The second usage puts a newly connected socket in non-blocking mode. This second usage is a special-case. There are actually two accept
system calls: accept
and accept4
. The first one, accept
only takes 3 parameters whereas accept4
takes a 4th parameter. That 4th parameter is for any flags, like SOCK.NONBLOCK
, we want to apply to the connected socket. If Zig's posix.accept
detects that accept4
is available, it uses it, otherwise it calls accept
and then calls fcntl
to set the appropriate flags. accept4
does what Part 3 talked about (minimizing system calls) by combining accept
with fcntl
.
If you try to run the above code, you should get a rapid and endless streams of:
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
We said that SOCK.NONBLOCK
made it so functions like accept
return an error instead of blocking, given that our accept
looks like:
while (true) {
const socket = posix.accept(listener, &client_address.any, &client_address_len, posix.SOCK.NONBLOCK) catch |err| {
std.debug.print("error accept: {}\n", .{err});
continue;
};
}
The output makes sense. There's no obvious solution either. We could sleep whenever accept
returns error.WouldBlock
, but that would be worse than leaving the socket in blocking mode - at least in blocking mode we're woken up as soon as their an connection waiting to be accepted.
If we keep our listening socket in blocking mode, but put connected sockets in non-blocking mode, we'll face the same problems. When our read
returns error.WouldBlock
what can we realistically do? All of our implementations so far depend on one thread processing one connection. We need to fundamentally rethink our approach.
We need to break out of the one-thread-per-connection pattern, which is something we can only do now that we've discovered non-blocking I/O. We probably need to start organizing our code a little better, but you can start thinking about having an array of sockets that we can loop through and try to read from. As an incomplete prototype, something like:
for (sockets) |s| {
const n = posix.read(s, &buf) catch |err| {
switch (err) {
error.WouldBlock => {},
else => {
posix.close(s);
},
}
continue;
}
if (n == 0) {
posix.close(s);
continue;
}
process(buf([0..n]))
}
This is possible because posix.read
won't block when no data is available. If error.WouldBlock
is returned, we can skip to the next socket in our list. There are at least two major problems with this prototype. The first is that this will result in a tight loop when no socket is ready. We previously talked about the importance of minimizing system calls, but the above code would result in a massive number of calls to read
as we constantly poll each socket, hoping that one has data.
A larger issue is the need to associate state with each connection. This was trivial in our thread-per-connection model, but gets more complicated now that we need to track multiple connections. For example, if we consider that a read
is likely to return less than or more than a single "message" (see Part 2), it seems unlikely that we can share a buffer for all sockets. This is something that we started solving when we introduced a Client
. Our above prototype can be improved if we think clients
rather than sockets
:
for (clients) |client| {
while (true) {
const message = client.readMessage() catch |err| {
client.close();
} orelse break;
client.process(message) catch {
client.close();
};
}
}
We're getting further and further away from working code, but the key change here is that a Client
encapsulates the state necessary for reading and [eventually] writing messages. Our fictional readMessage
could fail, but it could also return null to indicate that there isn't enough data to form a complete message yet - maybe because we don't have all the bytes, or maybe because posix.read
returned error.WouldBlock
. If you're wondering why we have an inner-loop, recall that we're not just concerned about reading less than a whole message, but also about reading more than a single message. It's possible that readMessage
, which would eventually call posix.read
, fills our buffer with multiple messages. We need to process them all.
So far, this is admittedly very abstract, but remember in Part 3 where we create a stateful message-aware Reader
. Our client.readMessage
can be a wrapper around reader.readMessage
which translates error.WouldBlock
into null
:
fn readMessage(self: *Client) !?[]byte {
return self.reader.readMessage() catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
While this seems promising, constantly looping through our clients, hoping that one or more has data, isn't going to be efficient.
The poll
system call lets us register file descriptors (like sockets) with the operating system and be notified when certain events, like reading or writing, can be done without blocking. For example, we can give poll
an array of sockets and it'll block until one is ready to be read. It's a simple API, a single function, but it's a change in how we think about serving clients. We're going to start by looking at a basic implementation which is unconcerned with things like message boundaries. Our goal, for now, is to get familiar with poll
's API and this new way of working with sockets.
const std = @import("std");
const net = std.net;
const posix = std.posix;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var polls: [4096]posix.pollfd = undefined;
polls[0] = .{
.fd = listener,
.events = posix.POLL.IN,
.revents = 0,
};
var poll_count: usize = 1;
while (true) {
var active = polls[0..poll_count + 1];
_ = try posix.poll(active, -1);
if (active[0].revents != 0) {
const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
polls[poll_count] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
poll_count += 1;
}
var i: usize = 1;
while (i < active.len) {
const polled = active[i];
const revents = polled.revents;
if (revents == 0) {
i += 1;
continue;
}
var closed = false;
if (revents & posix.POLL.IN == posix.POLL.IN) {
var buf: [4096]u8 = undefined;
const read = posix.read(polled.fd, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{polled.fd, buf[0..read]});
}
}
if (closed or (revents & posix.POLL.HUP == posix.POLL.HUP)) {
posix.close(polled.fd);
const last_index = active.len - 1;
active[i] = active[last_index];
active = active[0..last_index];
poll_count -= 1;
} else {
i += 1;
}
}
}
}
The above is a working example. You can run it and connect up to 4095 clients - any more and it'll crash, something we can and will eventually fix. The above code is documented, but there are a number of things to go over. The posix.pollfd
structure has three fields:
fd
- The file descriptor that we're polling,
events
- A bitwise list of events we care about, for now that's only POLL.IN
but later, when we look at writing, it'll be POLL.IN | POLL.OUT
,
revents
- The ready events set by poll
system call. This tells us which, if any, events are ready.
The first thing we do is setup a pollfd
for our listening socket, registering our interest in POLL.IN
. For a listening socket POLL.IN
indicates that we can accept
without blocking. We set and keep this at index zero throughout the lifetime of our program. When posix.poll
returns, it means that at least 1 of the monitored file descriptor is ready. We iterate through them all, looking for any where revents != 0
. We special-case our listening socket, always at active[0]
- we need to call accept
and process the new connection. As an optimization, we've moved that out of our loop. This means we don't have to check if (i == 0)
for each iteration of our loop.
We could have used an std.ArrayList(posix.pollfd)
to make our life of adding and removing entries a little easier, but, at least for the purpose of learning, I prefer the explicit code to handle new connections and disconnects.
You might be wondering why we check if revents
has the POLL.HUP
flag, even though we didn't register our interest in POLL.HUP
. POLL.HUP
is always monitored even if we don't explicitly ask for it. We check POLL.IN
first, but we could check POLL.HUP
first instead. For example, an HTTP server might prefer checking POLL.HUP
and remove disconnected clients, ignoring any pending data they've sent. When POLL.IN
is set, you might be wondering if read
can fail and/or return zero bytes. The simple answer is: I'm not sure. It would certainly be possible for read
to return error.WouldBlock
if another thread called read
on the same socket, draining it. And, we could get zero bytes if the supplied buffer passed into read
was zero-length. In this single-threaded example, with a fixed-length buffer, neither of those cases is possible. But I think it's better to be safe than sorry, and we should always assume read
can return zero bytes or an error.
The poll
call unblocks when at least one of the monitored file descriptor is ready. But the only way to know which are ready is to iterate through all of them, checking if revents != 0
. Even with a modest upper limit of 4K clients, this isn't ideal. There isn't much we can do about. However, poll
does return the number of entries that have a non-zero revents
. So while our code will still be O(N), we can at least stop iterating once we've processed the number of ready sockets
while (true) {
var active = polls[0..poll_count + 1];
var pending = try posix.poll(active, -1);
if (active[0].revents != 0) {
pending -= 1;
}
var i: usize = 1;
while (pending > 0) {
const polled = active[i];
const revent = polled.revents;
if (revent == 0) {
i += 1;
continue;
}
pending -= 1;
}
}
Both epoll and kqueue, platform-specific alternatives to poll, elegantly solve this problem. We'll cover both those APIs in following parts.
When the listening socket is ready to accept a new connection, that is when polls[0].revents != 0
, we call accept
once. But because our socket is in non-blocking mode, we could call it until we receive an error.WouldBlock
error:
if (active[0].revents != 0) {
while (true) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => break,
else => return err,
};
polls[poll_count] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
poll_count += 1;
}
}
poll
tells us the listening socket can accept without blocking (i.e. that it's ready). But it doesn't tell us how many pending connections there are waiting to be accepted. Both approaches, looping and not looping, work because poll is always level-triggered. This means that poll
continues to notify us so long as the socket is ready. This is different than edge-triggered which only notifies us when the state changes (we'll learn more about this in future parts).
Put differently, if there are four connections waiting to be accepted when poll
returns, but we only accept one, the next call to poll
will re-notify us that our listening socket is ready because of the still-pending three connections.
In the above code, when a socket is ready, we simply print whatever bytes we were able to read. But as we've discussed a few times, TCP deals with streams of bytes and isn't "message"-aware. We need to add more state so that we can handle a read
returning only part of a message or more than a single message. We already saw a pseudo-implementation of this above, when we looked at non-blocking I/O. To use that same approach, we have a minor problem: poll
doesn't allow us to associate arbitrary data with the pollfd
, so we need to create and manage this association ourselves.
I think it's time that we introduced a Server
struct to help us keep our code tidy. First its fields, init
and deinit
functions. We'll see a full Client
implementation shortly, but for now, it's just a thin wrapper around Reader
from previous parts.
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
const Server = struct {
allocator: Allocator,
connected: usize,
polls: []posix.pollfd,
clients: []Client,
client_polls: []posix.pollfd,
fn init(allocator: Allocator, max: usize) !Server {
const polls = try allocator.alloc(posix.pollfd, max + 1);
errdefer allocator.free(polls);
const clients = try allocator.alloc(Client, max);
errdefer allocator.free(clients);
return .{
.polls = polls,
.clients = clients,
.client_polls = polls[1..],
.connected = 0,
.allocator = allocator,
};
}
fn deinit(self: *Server) void {
self.allocator.free(self.polls);
self.allocator.free(self.clients);
}
};
The first client that connects will obviously be at clients[0]
, but it'll be at polls[1]
because our listening socket always at polls[0]
. This isn't a requirement, it's merely our own convention. But given that we want to poll both our listening socket and connected sockets, it's an efficient way to do things. However, it means that client[N]
corresponds the polls[N+1]
. This +1 offset is error prone and annoying, so we also create a client_polls
which will be set to polls[1..]
. Now when clients connected and disconnect, we just touch clients
and client_polls
which both share the same offset.
Next we can add a run
function. This is almost the same code as our first example with poll
, but I've extracted some of the functionality into their own function (which we'll see next):
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
self.polls[0] = .{
.fd = listener,
.revents = 0,
.events = posix.POLL.IN,
};
while (true) {
_ = try posix.poll(self.polls[0..self.connected + 1], -1);
if (self.polls[0].revents != 0) {
self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
}
var i: usize = 0;
while (i < self.connected) {
const revents = self.client_polls[i].revents;
if (revents == 0) {
i += 1;
continue;
}
var client = &self.clients[i];
if (revents & posix.POLL.IN == posix.POLL.IN) {
while (true) {
const msg = client.readMessage() catch {
self.removeClient(i);
break;
} orelse {
i += 1;
break;
};
std.debug.print("got: {s}\n", .{msg});
}
}
}
}
}
The goal here is that the client at client[N]
is being monitored by polls[N+1]
. But, to avoid that nasty +1, we use client_polls
which is slice of polls
: polls[1..]
. This means that when client_polls[N]
is ready, we can access the corresponding client at clients[N]
.
I'm not sure I love the idea of extracting accept
and removeClient
into their own function. After all, they're only called from a single place and I like being able to read code without having to chase after it. But, ask me another day and I'll give you another answer:
fn accept(self: *Server, listener: posix.socket_t) !void {
while (true) {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = Client.init(self.allocator, socket, address) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
const connected = self.connected;
self.clients[connected] = client;
self.client_polls[connected] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
self.connected = connected + 1;
}
}
fn removeClient(self: *Server, at: usize) void {
var client = self.clients[at];
posix.close(client.socket);
client.deinit(self.allocator);
const last_index = self.connected - 1;
self.clients[at] = self.clients[last_index];
self.client_polls[at] = self.client_polls[last_index];
self.connected = last_index;
}
It seems like a lot more code, but the approach is the same as our initial example that used poll
. Rather than having two slices, clients
and client_polls
which are linked by index, we could have use a std.AutoHashMap(posix.socket_t, Client)
. Then code would have looked something like:
for (self.polls[1..]) |p| {
const revents = p.revents;
if (revents == 0) {
continue;
}
const client = self.clients.getPtr(p.fd) orelse unreachable;
}
Other parts of the code would also have been simplified - hashmaps have a way of doing that. But it isn't how I'd do it - because it would probably be less efficient - and this series isn't about taking the easy path.
Although the poll
system call is simple we had to make substantial changes to our code, and our mindset, to accommodate this different way of interacting with sockets. State becomes both more critical and challenging to maintain compared to previous approaches where we had a thread-per-connection.
The next part will continue to expand on the above, adding writes, timeouts, connection limits and looking at how we can combine what we've learnt here with our previous experience with multi-threading. Future parts will then look at platform-specific alternatives to poll
: epoll
for Linux and kqueue
for BSD/MacOS. These are not only faster and more scalable than poll
but also offer additional features. However, fundamentally, most of what we're learning here will translate directly to those APIs. Although it's relatively simple to build a system that supports both platform-specific APIs, poll
has the significant benefit of being cross-platform. If you don't need the extra performance/scale or features of the platform-specific APIs, I suggest you stick with poll
.
As a quick aside, there's also the select
system call which is older and more limited than poll
. Unless you're targeting a very old platform, you should always use poll
instead of select
. But you will see "select" mentioned/referenced now and again, so it's good to at least be aware of it.
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
try server.run(address);
}
const Server = struct {
allocator: Allocator,
connected: usize,
polls: []posix.pollfd,
clients: []Client,
client_polls: []posix.pollfd,
fn init(allocator: Allocator, max: usize) !Server {
const polls = try allocator.alloc(posix.pollfd, max + 1);
errdefer allocator.free(polls);
const clients = try allocator.alloc(Client, max);
errdefer allocator.free(clients);
return .{
.polls = polls,
.clients = clients,
.client_polls = polls[1..],
.connected = 0,
.allocator = allocator,
};
}
fn deinit(self: *Server) void {
self.allocator.free(self.polls);
self.allocator.free(self.clients);
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
self.polls[0] = .{
.fd = listener,
.revents = 0,
.events = posix.POLL.IN,
};
while (true) {
_ = try posix.poll(self.polls[0..self.connected + 1], -1);
if (self.polls[0].revents != 0) {
self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
}
var i: usize = 0;
while (i < self.connected) {
const revents = self.client_polls[i].revents;
if (revents == 0) {
i += 1;
continue;
}
var client = &self.clients[i];
if (revents & posix.POLL.IN == posix.POLL.IN) {
while (true) {
const msg = client.readMessage() catch {
self.removeClient(i);
break;
} orelse {
i += 1;
break;
};
std.debug.print("got: {s}\n", .{msg});
}
}
}
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
while (true) {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = Client.init(self.allocator, socket, address) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
const connected = self.connected;
self.clients[connected] = client;
self.client_polls[connected] = .{
.fd = socket,
.revents = 0,
.events = posix.POLL.IN,
};
self.connected = connected + 1;
}
}
fn removeClient(self: *Server, at: usize) void {
var client = self.clients[at];
posix.close(client.socket);
client.deinit(self.allocator);
const last_index = self.connected - 1;
self.clients[at] = self.clients[last_index];
self.client_polls[at] = self.client_polls[last_index];
self.connected = last_index;
}
};
const Client = struct {
reader: Reader,
socket: posix.socket_t,
address: std.net.Address,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
return .{
.reader = reader,
.socket = socket,
.address = address,
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};