TCP Server in Zig - Part 6 - Epoll
In the last two parts we introduced the poll
system call and, with it, patterns around evented I/O. poll
has the advantage of being simple to use and supported on most platforms. However, as we saw, we need to iterate through all monitored sockets to figure out which is ready.
Another awkwardness with poll
's API is associating application-specific data with each pollfd
being monitored. To know which Client
is ready, we need to create a mapping between the pollfd
and Client
. While this isn't the end of the world, it is something that poll alternatives address.
The first alternative API that we'll look at is epoll
. The bulk of what we've learnt about evented I/O is still applicable as-is. However, epoll
is a Linux-specific system call. In order to support it as well as other platforms, we'll need to leverage Zig's comptime as well as some layer of abstraction. This is something we'll look at after exploring kqueue
, a BSD/MacOS-specific system call.
epoll
The epoll
API is three separate system calls. epoll_create1
is used to create an epoll file descriptor. You can think of this as our epoll instance. epoll_ctl
is used to modify our instance. With it, we can add, modify and remove sockets that we want our instance to monitor. Finally, we have epoll_wait
which blocks until one of the monitored sockets is ready or until the configured timeout is reached.
With poll
we specify the sockets to monitor on the blocking call to poll
itself. This means we need to maintain our own array of pollfd
. With epoll
, because we have distinct functions to modify and wait, this ugliness is gone.
Here's a bare-bonned runnable (on Linux) example:
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
// epoll_create1 takes flags. We aren't using any in these examples
const efd = try posix.epoll_create1(0);
defer posix.close(efd);
{
// monitor our listening socket
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
}
var ready_list: [128]linux.epoll_event = undefined;
while (true) {
const ready_count = posix.epoll_wait(efd, &ready_list, -1);
for (ready_list[0..ready_count]) |ready| {
const ready_socket = ready.data.fd;
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = client_socket}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
} else {
var closed = false;
var buf: [4096]u8 = undefined;
const read = posix.read(ready_socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
}
if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
posix.close(ready_socket);
}
}
}
}
}
There's a number of interesting things in the above code, especially compared to our first runnable code that used poll
. The list of epoll_event
's that we pass into the function is going to be filled by epoll_wait
itself. The return value tells us how many events were filled. So instead of having to loop through every monitored event, seeing which is ready, we're given a list of ready sockets, i.e. ready_list[0..ready_count]
. It's significantly more efficient.
Another change is how much simpler it is to remove a connection. We can explicitly remove a monitor via epoll_ctl
and the CTL_DEL
operation. But just closing the socket is enough to remove it.
From the above, we can tell that epoll_ctl
takes the epoll
file descriptor created via epoll_create1
, the operation (CTL_ADD
, CTL_DEL
or CTL_MOD
), the file descriptor to monitor (sockets in our case) and an epoll_event
. The epoll_event
structure has two fields, the events
which is like a pollfd.events
and data
. The data
field is a untagged union and allows us to associate arbitrary information with an epoll_event
. The value we store in data
can be retrieved when iterating through the ready list. Above, we store the socket and retrieve the socket from data.fd
. Storing the socket directly in data
is a good start, but we can do more with this field.
epoll.data.ptr
One of the challenge we faced with poll
was to associate an pollfd
with its corresponding client. We managed to get this working by keeping two slices in sync, but never bothered figuring out how to get a pollfd from a client. With epoll
this is much simpler. Specifically, the epoll_event.data
field lets us store the numeric representation of a pointer. Or, put differently, we can associate anything we want with an epoll_event
.
The benefit of this might not be immediately obvious, but let's add a simple Client
and use data
to associate our epoll_event
directly with a client:
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
// Added this simple client
const Client = struct {
socket: posix.socket_t,
};
pub fn main() !void {
// We now need an allocator,
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
const address = try std.net.Address.parseIp("0.0.0.0", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
const efd = try posix.epoll_create1(0);
{
// monitor our listening socket
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.ptr = 0}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
}
var ready_list: [128]linux.epoll_event = undefined;
while (true) {
const ready_count = posix.epoll_wait(efd, &ready_list, -1);
for (ready_list[0..ready_count]) |ready| {
// we're using ptr instead of fd
switch (ready.data.ptr) {
0 => {
const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(socket);
// We create a client
const client = try allocator.create(Client);
errdefer allocator.destroy(client);
client.* = .{.socket = socket};
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
// instead of setting fd, we set ptr which is a usize
.data = .{.ptr = @intFromPtr(client)}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, socket, &event);
},
else => |nptr| {
var closed = false;
var buf: [4096]u8 = undefined;
// we can get the client from data.ptr by reversing
// @intFromPtr with a call to @ptrFromInt
const client: *Client = @ptrFromInt(nptr);
// We should use client.readMessage() when we add our Reader
// back into our Client.
const read = posix.read(client.socket, &buf) catch 0;
if (read == 0) {
closed = true;
} else {
std.debug.print("[{d}] got: {any}\n", .{client.socket, buf[0..read]});
}
if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
posix.close(client.socket);
allocator.destroy(client);
}
},
}
}
}
}
The data.ptr
is used to store the usize-representation of our client pointer. We use Zig's built-in @intFromPtr
to turn a pointer into an integer and then @ptrFromInt
to reverse the process. A real hack in the above code is setting the data.ptr
for our listening socket to 0
. Remember that data
is an untagged union, if we use data.fd = listener
for our listening socket, but then data.ptr = @intFormPtr(client)
for client connections, we won't be able to tell which tag is active. The "right" way to fix this is to create our own tagged union:
const EventType = union(enum) {
listener: posix.socket_t,
recv: *Client,
};
But that's another layer of allocations and indirection. Instead, we use 0
and hope that the operating system never allocate a Client at memory address 0 (this is more than "hope", and aside from embedded systems, most platforms would never allocate memory at address 0).
@intFromPtr
is a built-in function which takes a pointer and gives you the usize
value. Now, when a socket is ready, we can get the corresponding client by using the reverse function, @ptrFromInt
on the data.ptr
field to get the client:
const client: Client = @ptrFromInt(nptr);
It might be obvious to some, but for this to work, a client
has to be allocated on the heap. While we can call @intToPtr
on a stack-allocated value, that value won't magically remain valid beyond its scope.
Client to epoll_event
Being able to associate arbitrary data with an epoll_event
tightens up our code, but the real benefit is the ease with which we can modify the epoll instance from a client. The way we previously structured our poll
-based server, it wasn't possible for writeMessage
to be called externally (i.e. from application code) because the Client
had no way to alter the server's polls
list. We could have had each Client
reference a *Server
, but it would have been tricky to deal with the moving indexes.
With epoll
we still need each Client
to have the epoll instance (either directly, or via a reference to the *Server
), but can use epoll_ctl
to modify the monitor for the client. You'll recall that previously our write
function would return a false
to signal an incomplete write and true
to signal a completed write. This value was used by our run
function to switch to and from write-mode and read-mode. We can now bake this into write
:
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => {
// switch to write-mode
var event = linux.epoll_event{
.events = linux.EPOLL.OUT,
.data = .{.ptr = @intFromPtr(self)},
};
return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
}
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
// done writing, switch to read-mode
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(self)},
};
return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
}
}
That might seem like a minor change, but it enables something you'll almost certainly need: the ability to initiate a write from the application. There is one inefficiency in the above code: on a complete write, we revert to read-mode. However, most of the time, we'll probably never have gone into write-mode to begin with. As we've said before, unless your server is very busy, your client is lagging, or you're sending very large messages, there's a reasonable chance that posix.write
will succeed in a single call. In such cases, reverting to read-mode is an unnecessary and wasteful system call. I'll leave this up to you to fix, but one solution would be to add an IOMode
enum with two values: read
and write
and track which the client is currently in. Then we can check if self.mode == .write
before switching to read-mode.
Edge-Triggered
We mentioned that poll
is always level-triggered. That is, as long as a condition holds, we'll get a notification. For example, if one call to poll
notifies us that a certain socket is ready and we then callpoll
again, without reading the socket, we'll get another notification. This is also how epoll
works by default. But we can make epoll
edge-triggered on a per-socket basis.
Let's go back to our first epoll
program, and change the code that handles a client's readiness:
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
pub fn main() !void {
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
// epoll_create1 takes flags. We aren't using any in these examples
const efd = try posix.epoll_create1(0);
{
// monitor our listening socket
var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
}
var ready_list: [128]linux.epoll_event = undefined;
while (true) {
const ready_count = posix.epoll_wait(efd, &ready_list, -1);
for (ready_list[0..ready_count]) |ready| {
const ready_socket = ready.data.fd;
if (ready_socket == listener) {
const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
errdefer posix.close(client_socket);
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.fd = client_socket}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
} else {
// THIS HAS CHANGED
std.debug.print(".", .{});
}
}
}
}
For now, only the end of the code has changed. Instead of trying to read the message and echo'ing it back, we write a dot .
. If you run this code (again, Linux only), connect and send a message, your screen should get filled with dots. This is because epoll
is, by default, in level-triggered mode and continues to notify us of the socket's readiness.
When adding the monitor for the client socket, if we change events
from linux.EPOLL.IN
to linux.EPOLL.IN | linux.EPOLL.ET
, i.e.:
var event = linux.epoll_event{
// THIS IS CHANGED
.events = linux.EPOLL.IN | linux.EPOLL.ET,
.data = .{.fd = client_socket}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
we make the monitor edge-triggered (ET). Now, if we connect a client and send a message, we'll get a single notification. The monitor isn't deleted or even disabled. If we send another message, we'll get a new notification. epoll
is pretty smart too, a new event doesn't have to happen while epoll_wait
is blocked. So there is no race condition before setting a monitor and calling epoll_wait
. In our above code, if the client sends a message right after the socket is added with epoll_ctl
but before epoll_wait
is called, we will get the notification.
We can also use linux.EPOLL.ONESHOT
which disables the notification for the socket after readiness has been signaled once, including for any new messages. epoll_ctl
with the linux.EPOLL.CTL_MOD
operation needs to be called to re-enable the monitor. This is obviously different than using edge-triggered since notifications are disabled even for new events. While ONESHOT
might be useful to make sure that a single message per client is processed, it does have the disadvantage of requiring many more system calls - one every time we want to re-enable to monitor.
Conclusion
Besides the fact that epoll
is Linux-specific, it's safe to think of it as a better version of poll
. It's more efficient, easier to integrate and has more features. Except for the specifics of the API, everything about evented I/O is unchanged. Using @intFromPtr
and @ptrFromInt
can actually be useful now and again, but it's mostly used as above, when integrating with a C library that lets you associate arbitrary data (often a usize
) with a C structure.
Below, you'll find a full working (in Linux) example. It merges what we've learned about epoll
with our Server
, Client
and Reader
from previous parts. We've also introduced an Epoll
structure. The goal is to start creating some abstraction to eventually deal with having platform-specific implementations (i.e. using epoll on Linux and kqueue on BSD/MacOS). This abstration is incomplete, but it's a start and something we'll dedicate a whole part to.
Appendix A - Code
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_demo);
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
var server = try Server.init(allocator, 4096);
defer server.deinit();
const address = try std.net.Address.parseIp("127.0.0.1", 5882);
try server.run(address);
std.debug.print("STOPPED\n", .{});
}
// 1 minute
const READ_TIMEOUT_MS = 60_000;
const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;
const Server = struct {
// maximum # of allowed clients
max: usize,
loop: Epoll,
// creates our polls and clients slices and is passed to Client.init
// for it to create our read buffer.
allocator: Allocator,
// The number of clients we currently have connected
connected: usize,
read_timeout_list: ClientList,
// for creating client
client_pool: std.heap.MemoryPool(Client),
// for creating nodes for our read_timeout list
client_node_pool: std.heap.MemoryPool(ClientList.Node),
fn init(allocator: Allocator, max: usize) !Server {
const loop = try Epoll.init();
errdefer loop.deinit();
const clients = try allocator.alloc(*Client, max);
errdefer allocator.free(clients);
return .{
.max = max,
.loop = loop,
.connected = 0,
.allocator = allocator,
.read_timeout_list = .{},
.client_pool = std.heap.MemoryPool(Client).init(allocator),
.client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
};
}
fn deinit(self: *Server) void {
self.loop.deinit();
self.client_pool.deinit();
self.client_node_pool.deinit();
}
fn run(self: *Server, address: std.net.Address) !void {
const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
const protocol = posix.IPPROTO.TCP;
const listener = try posix.socket(address.any.family, tpe, protocol);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 128);
var read_timeout_list = &self.read_timeout_list;
try self.loop.addListener(listener);
while (true) {
const next_timeout = self.enforceTimeout();
const ready_events = self.loop.wait(next_timeout);
for (ready_events) |ready| {
switch (ready.data.ptr) {
0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
else => |nptr| {
const events = ready.events;
const client: *Client = @ptrFromInt(nptr);
if (events & linux.EPOLL.IN == linux.EPOLL.IN) {
// this socket is ready to be read
while (true) {
const msg = client.readMessage() catch {
self.closeClient(client);
break;
} orelse break; // no more messages
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
read_timeout_list.remove(client.read_timeout_node);
read_timeout_list.append(client.read_timeout_node);
client.writeMessage(msg) catch {
self.closeClient(client);
break;
};
}
} else if (events & linux.EPOLL.OUT == linux.EPOLL.OUT) {
client.write() catch self.closeClient(client);
}
}
}
}
}
}
fn enforceTimeout(self: *Server) i32 {
const now = std.time.milliTimestamp();
var node = self.read_timeout_list.first;
while (node) |n| {
const client = n.data;
const diff = client.read_timeout - now;
if (diff > 0) {
// this client's timeout is the first one that's in the
// future, so we now know the maximum time we can block on
// poll before having to call enforceTimeout again
return @intCast(diff);
}
// This client's timeout is in the past. Close the socket
// Ideally, we'd call server.removeClient() and just remove the
// client directly. But within this method, we don't know the
// client_polls index. When we move to epoll / kqueue, this problem
// will go away, since we won't need to maintain polls and client_polls
// in sync by index.
posix.shutdown(client.socket, .recv) catch {};
node = n.next;
} else {
// We have no client that times out in the future (if we did
// we would have hit the return above).
return -1;
}
}
fn accept(self: *Server, listener: posix.socket_t) !void {
const space = self.max - self.connected;
for (0..space) |_| {
var address: net.Address = undefined;
var address_len: posix.socklen_t = @sizeOf(net.Address);
const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
const client = try self.client_pool.create();
errdefer self.client_pool.destroy(client);
client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
posix.close(socket);
log.err("failed to initialize client: {}", .{err});
return;
};
errdefer client.deinit(self.allocator);
client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
client.read_timeout_node = try self.client_node_pool.create();
errdefer self.client_node_pool.destroy(client.read_timeout_node);
client.read_timeout_node.* = .{
.next = null,
.prev = null,
.data = client,
};
self.read_timeout_list.append(client.read_timeout_node);
try self.loop.newClient(client);
self.connected += 1;
} else {
// we've run out of space, stop monitoring the listening socket
try self.loop.removeListener(listener);
}
}
fn closeClient(self: *Server, client: *Client) void {
self.read_timeout_list.remove(client.read_timeout_node);
posix.close(client.socket);
self.client_node_pool.destroy(client.read_timeout_node);
client.deinit(self.allocator);
self.client_pool.destroy(client);
}
};
const Client = struct {
// This probably shouldn't be a pointer. Epoll is currently lightweight
// and is safe to copy. But, in Part 8, when we build an abstraction
// for Epoll and Kqueue, this will be necessary.
loop: *Epoll,
socket: posix.socket_t,
address: std.net.Address,
// Used to read length-prefixed messages
reader: Reader,
// Bytes we still need to send. This is a slice of `write_buf`. When
// empty, then we're in "read-mode" and are waiting for a message from the
// client.
to_write: []u8,
// Buffer for storing our lenght-prefixed messaged
write_buf: []u8,
// absolute time, in millisecond, when this client should timeout if
// a message isn't received
read_timeout: i64,
// Node containing this client in the server's read_timeout_list
read_timeout_node: *ClientNode,
fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *Epoll) !Client {
const reader = try Reader.init(allocator, 4096);
errdefer reader.deinit(allocator);
const write_buf = try allocator.alloc(u8, 4096);
errdefer allocator.free(write_buf);
return .{
.loop = loop,
.reader = reader,
.socket = socket,
.address = address,
.to_write = &.{},
.write_buf = write_buf,
.read_timeout = 0, // let the server set this
.read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
};
}
fn deinit(self: *const Client, allocator: Allocator) void {
self.reader.deinit(allocator);
allocator.free(self.write_buf);
}
fn readMessage(self: *Client) !?[]const u8 {
return self.reader.readMessage(self.socket) catch |err| switch (err) {
error.WouldBlock => return null,
else => return err,
};
}
fn writeMessage(self: *Client, msg: []const u8) !void {
if (self.to_write.len > 0) {
// Depending on how you structure your code, this might not be possible
// For example, in an HTTP server, the application might not control
// the actual "writeMessage" call, and thus it would not be possible
// to make more than one writeMessage call per request.
// For this demo, we'll just return an error.
return error.PendingMessage;
}
if (msg.len + 4 > self.write_buf.len) {
// Could allocate a dynamic buffer. Could use a large buffer pool.
return error.MessageTooLarge;
}
// copy our length prefix + message to our buffer
std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
const end = msg.len + 4;
@memcpy(self.write_buf[4..end], msg);
// setup our to_write slice
self.to_write = self.write_buf[0..end];
// immediately write what we can
return self.write();
}
// Returns `false` if we didn't manage to write the whole mssage
// Returns `true` if the message is fully written
fn write(self: *Client) !void {
var buf = self.to_write;
defer self.to_write = buf;
while (buf.len > 0) {
const n = posix.write(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return self.loop.writeMode(self),
else => return err,
};
if (n == 0) {
return error.Closed;
}
buf = buf[n..];
} else {
return self.loop.readMode(self);
}
}
};
const Reader = struct {
buf: []u8,
pos: usize = 0,
start: usize = 0,
fn init(allocator: Allocator, size: usize) !Reader {
const buf = try allocator.alloc(u8, size);
return .{
.pos = 0,
.start = 0,
.buf = buf,
};
}
fn deinit(self: *const Reader, allocator: Allocator) void {
allocator.free(self.buf);
}
fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
var buf = self.buf;
while (true) {
if (try self.bufferedMessage()) |msg| {
return msg;
}
const pos = self.pos;
const n = try posix.read(socket, buf[pos..]);
if (n == 0) {
return error.Closed;
}
self.pos = pos + n;
}
}
fn bufferedMessage(self: *Reader) !?[]u8 {
const buf = self.buf;
const pos = self.pos;
const start = self.start;
std.debug.assert(pos >= start);
const unprocessed = buf[start..pos];
if (unprocessed.len < 4) {
self.ensureSpace(4 - unprocessed.len) catch unreachable;
return null;
}
const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);
// the length of our message + the length of our prefix
const total_len = message_len + 4;
if (unprocessed.len < total_len) {
try self.ensureSpace(total_len);
return null;
}
self.start += total_len;
return unprocessed[4..total_len];
}
fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
const buf = self.buf;
if (buf.len < space) {
return error.BufferTooSmall;
}
const start = self.start;
const spare = buf.len - start;
if (spare >= space) {
return;
}
const unprocessed = buf[start..self.pos];
std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
self.start = 0;
self.pos = unprocessed.len;
}
};
// We'll eventually need to build a platform abstractions between epoll and
// kqueue. This is a rough start.
const Epoll = struct {
efd: posix.fd_t,
ready_list: [128]linux.epoll_event = undefined,
fn init() !Epoll {
const efd = try posix.epoll_create1(0);
return .{.efd = efd};
}
fn deinit(self: Epoll) void {
posix.close(self.efd);
}
fn wait(self: *Epoll, timeout: i32) []linux.epoll_event {
const count = posix.epoll_wait(self.efd, &self.ready_list, timeout);
return self.ready_list[0..count];
}
fn addListener(self: Epoll, listener: posix.socket_t) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = 0},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, listener, &event);
}
fn removeListener(self: Epoll, listener: posix.socket_t) !void {
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_DEL, listener, null);
}
fn newClient(self: Epoll, client: *Client) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(client)},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, client.socket, &event);
}
fn readMode(self: Epoll, client: *Client) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{.ptr = @intFromPtr(client)},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
}
fn writeMode(self: Epoll, client: *Client) !void {
var event = linux.epoll_event{
.events = linux.EPOLL.OUT,
.data = .{.ptr = @intFromPtr(client)},
};
try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
}
};