Writing a task scheduler in Zig
May 09, 2024
I'm working on an application that needs the ability to schedule tasks. Many applications have a similar need, but requirements can vary greatly. Advanced cases might require persistence and distribution, typically depending on external systems (like a database or queue) to do much of the heavy lifting. My needs are simpler: I don't have a huge variety of tasks or a high number of them.
Thread-Per-Task
At its simplest, we could use a thread with sleep
to schedule some work at some future point:
const std = @import("std");
pub fn main() !void {
const thread = try std.Thread.spawn(.{}, say, .{"hello world!", 3 * std.time.ns_per_s});
thread.join();
}
fn say(msg: []const u8, delay: u64) void {
std.time.sleep(delay);
std.debug.print("{s}\n", .{msg});
}
Above is a standalone example in case you want to run it, but the same approach could easily be incorporated into a larger application. If you're new to Zig, the parameters to Thread.spawn
are probably a little confusing. spawn
takes 3 arguments: a configuration structure, the function to run, and the arguments to pass to the function. The first parameter, .{}
, is something you'll see often in Zig and is the result of two language features: type inference and default field values. Imagine you have this struct:
const Config = struct {
stack_size: usize = 16_384,
allocator: ?std.mem.Allocator = null,
};
And this function:
fn doSomething(config: Config) !void {
...
}
These four calls to run
are equivalent:
run(Config{
.stack_size = 16_384,
.allocator = null,
});
run(Config{});
run(.{
.stack_size = 16_384,
.allocator = null,
});
run(.{});
In short, run(Config{...})
and run(.{...})
are the same, with the compiler inferring the type of the paramter based on the definition of run
.
It's also worth remembering that structure methods are normal functions called with the dot syntax. Above, we specified run
as the function spawn
should call along with the arguments. But you could (and in often cases would want to) call a structure's method:
const std = @import("std");
pub fn main() !void {
var person = Person{.name = "Vegeta"};
const thread = try std.Thread.spawn(
.{},
Person.say,
.{&person, "it's over 9000!", 3 * std.time.ns_per_s}
);
thread.join();
}
const Person = struct {
name: []const u8,
fn say(p: *const Person, msg: []const u8, when: u64) void {
std.time.sleep(when);
std.debug.print("{s} said: {s}\n", .{p.name, msg});
}
};
Be warned though that this example greatly increases the risk of introducing thread-unsafe code. Like our original code, this code has two running threads: the main thread and the thread launched by our call to spawn
. However, in this version, the two threads have access to the same memory, the address of person
. Since our launched thread reads name
, if we changed our main thread to write to name
, we'd have to introduce some form of protection (for example, by using a mutex).
Both examples calls thread.join();
. When we start a thread, we should call either detach
or join
. These don't have to be called immediately. The thread which calls join
blocks until the joined thread ends. Conversely, calling detach
allows the calling thread to continue. If you replace thread.join()
with thread.detach()
the program will exit before the launched thread has time to finish executing. This is because the thread that calls detach
is, in this case, our main thread, and after calling detach
it continues to execute normally. For this program, that means reaching the end of main
and exiting. Programs don't wait for all threads to finish, in this regards, the main / original thread is special.
As a final point, a while (true)
loop can be added to run a recurring job:
fn say(p: *const Person, msg: []const u8, when: u64) void {
while (true) {
std.time.sleep(when);
std.debug.print("{s} said: {s}\n", .{p.name, msg});
}
}
Naive Scheduler
If we're doing more than firing the occasional one-off task, we might run into some limitations with the above approach. Maybe we want some insights/metrics into the pending tasks, such as how many we have and when the next one is scheduled to run. If we have a lot of tasks, or a dynamic number of tasks, it might not be ideal to dedicate a whole thread to each.
To start solving these requirements, we'll create a Scheduler
that can take an arbitrary number of tasks each with their own schedule, and execute them within a single thread. If necessary, we could add a thread pool, but we'll start with a single thread.
We can start with an implementation that takes our above simple example and extends it with a queue of tasks to be run. Before we do that though, let's move beyond our hardcoded run
task and use something more flexible. Specifically, we'll use a tagged union which will be application-specific:
const Task = union(enum) {
say: Say,
db_cleaner: void,
const Say = struct {
person: *Person,
msg: []const u8,
};
pub fn run(task: Task) void {
switch (task) {
.say => |s| std.debug.print("{s} said: {s}\n", .{s.person.name, s.msg}),
.db_cleaner => {
std.debug.print("Cleaning old records from the database\n", .{});
}
}
}
};
In this case, we support two (dummy) tasks. The run
method is important as this will be the function our scheduler will call. The result will be a generic scheduler that can operate on any type of task or sets of tasks.
Next, we create a Job
, which is just a Task
with a run_at
field which is the unix timestamp (in milliseconds) that the task should be run at:
fn Job(comptime T: type) type {
return struct {
task: T,
run_at: i64,
};
}
The last component is our actual Scheduler
. We'll build it up over time, but we can start with a skeleton:
fn Scheduler(comptime T: type) type {
return struct {
queue: Queue,
allocator: Allocator,
const Self = @This();
const Queue = std.DoublyLinkedList(Job(T));
pub fn init(allocator: Allocator) Self {
return .{
.queue = Queue{},
.allocator = allocator,
};
}
pub fn deinit(self: *Self) void {
while (self.queue.pop()) |node| {
self.allocator.destroy(node);
}
}
};
}
Our scheduler will use a doubly linked list for its queue. Later, we'll need to change this, but for now, the ability to add new tasks at the end of the queue and process tasks at the start of the queue is good enough. Our scheduler is missing two parts: the ability to schedule a task and the code to actually run those tasks. Keeping in mind that this is our naive implementation, the first part is easy to implement:
fn Scheduler(comptime T: type) type {
return struct {
...
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
const node = try self.allocator.create(Queue.Node);
node.data = Job(T){
.task = task,
.run_at = run_at,
};
self.queue.append(node);
}
pub fn scheduleIn(self: *Self, task: T, ms: i64) !void {
return self.schedule(task, std.time.milliTimestamp() + ms);
}
};
}
We've added two function. The first, schedule
takes an absolute timestamp. The second, scheduleIn
takes a relative time (from now) in milliseconds. We take an i64
rather than a u64
because this is what std.time.milliTimestamp()
returns (in order to represent dates prior to Jan 1, 1970) so it makes our methods easier to use.
The code to run the tasks is more complicated. We'll start with a buggy implementation:
fn Scheduler(comptime T: type) type {
return struct {
...
pub fn start(self: *Self) !void {
const thread = try std.Thread.spawn(.{}, Self.run, .{self});
thread.detach();
}
fn run(self: *Self) void {
while (true) {
const ms = self.timeUntilNextTask() orelse 1000;
std.time.sleep(@intCast(ms * std.time.ns_per_ms));
if (self.queue.popFirst()) |node| {
defer self.allocator.destroy(node);
node.data.task.run();
}
}
}
fn timeUntilNextTask(self: *Self) ?i64 {
if (self.queue.first) |first| {
return std.time.milliTimestamp() - first.data.run_at;
}
return null;
}
If you were to put it all together, it would probably work, but it has a lot of issues, from incorrect functionality to being thread-unsafe. One obvious issue is our handling of an empty queue. Right now, we sleep for 1 second and see if there's any tasks to run. Consider what that means for the following usage:
var s = Scheduler(Task).init(allocator);
defer s.deinit();
try s.start();
var person = Person{.name = "Vegeta"};
try s.scheduleIn(.{.say = .{.person = &person, .msg = "over 9000!"}}, 8000);
Do you see the issue? We schedule the job to run in 8 seconds, but there's a good chance that run
will sleep for 1 second (due to the queue being empty), wake up and process the newly added task immediately. To fix this, we need to double check the first job:
while (true) {
const ms = self.timeUntilNextTask() orelse 1000;
std.time.sleep(@intCast(ms * std.time.ns_per_ms));
const first = self.queue.first orelse {
continue;
}
const job = first.data;
if (job.run_at > std.time.milliTimestamp()) {
continue;
}
self.queue.remove(first);
self.allocator.destroy(first);
job.data.task.run();
}
But this is hardly the only timing issue we have. We also need to handle the case where timeUntilNextTask
returns a negative value. More seriously, our scheduler assumes that the tasks are inserted in order that they should be run. If we schedule a task to be run in 1 hour, then schedule a task to be run in 10 seconds, our scheduler will sleep for 1 hour and our 2nd task will be run much too late. Finally, our handling of the empty queue isn't ideal; it would be nice to come up with an alternative to waking the thread every second.
The final major issue with this implementation is that it isn't thread-safe. The run
method runs in its own thread, started by the start
method. The schedule
and scheduleIn
methods are meant to be called from the main thread, or any other thread. However, both share and operate on the same queue
field. This type of access has to be synchronized. If one thread happens to append
to the queue
via a call to schedule
, while run
calls remove
, the behavior is undefined; the best possible outcome is a crash.
The threading issue can be fixed by adding a mutex, but the scheduling issue requires more fundamental changes. So while this was a good first step, it's time to look at a more correct implementation.
Scheduler
At a high level, the shortcomings of our naive implementation can be solved by:
- Using a sortable data structure to order pending jobs by their
run_at
field
- Using a thread-safe signal to notify our scheduler when new jobs are added
- Ensuring that all access to our shared data structure is synchornized by a mutex
We begin by replacing our std.DoublyLinkedList
with an std.PriorityQueue
. Like our linked list, the priority queue allows us to append new jobs and pop existing ones off. It also requires that we provide a function to compare two entries to control how entries are ordered. We begin with a new skeleton, which is similar to what we started with before:
fn Scheduler(comptime T: type) type {
return struct {
queue: Queue,
mutex: std.Thread.Mutex,
const Self = @This();
const Queue = std.PriorityQueue(Job(T), void, compare);
fn compare(_: void, a: Job(T), b: Job(T)) std.math.Order {
return std.math.order(a.run_at, b.run_at);
}
pub fn init(allocator: Allocator) Self {
return .{
.mutex = .{},
.queue = Queue.init(allocator, {}),
};
}
pub fn deinit(self: *Self) void {
self.queue.deinit();
}
};
}
Our Queue
has changed type. Not only does the PriorityQueue
take the type of value to store (as DoublyLinkedList
does), it also takes a context and a function to compare two nodes. The context exits for cases where comparison is stateful. Whatever context we pass to Queue.init
gets passed as the first parameter to the compare
function. The context can be used to inform how two values should be compared. In many cases, including our own, there will be no context, and thus we specify a void
context type and pass a void context value ({}
) to init
.
We've also added a std.Thread.Mutex
, which we'll use in the following code to synchronize access to the queue
. The mutex is initialized using the following syntax: .mutex = .{}
. This is the same syntax and the same behavior as Thread.spawn's
config
parameter; again relying on Zig's type inference and default field values.
For now, our schedule
method is similar to the previous one:
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
const job = Job(T){
.task = task,
.run_at = run_at,
};
self.mutex.lock();
defer self.mutex.unlock();
try self.queue.add(job);
}
pub fn scheduleIn(self: *Self, task: T, ms: i64) !void {
return self.schedule(task, std.time.milliTimestamp() + ms);
}
When using the DoublyLinkedList
, we were responsible for managing the Node
memory. We had to create the nodes and free them, both in deinit
and after running the task in run
. The PriorityQueue
has a different interface and manages its nodes internally. Discounting the introduction of our mutex
, this makes both our deinit
and schedule
methods simpler. In fact, you might have noticed that our Scheduler(T)
no longer holds a reference to the allocator since we no longer manage any memory direclty ourselves.
Although this code is useless without the start
and run
methods, it's worth re-iterating how important synchronizing access to self.queue
is. Even with schedule
alone, if our application is multi-threaded, two threads could call schedule
at the same time and thus attempt to mutate self.queue
at the same time, which would be an undefined behavior.
Switching to a PriorityQueue
gets us half way there. We can now be sure that the first task in our queue
is the one that should be run soonest. But we still face the issue of having tasks scheduled out of order. The core of our issue is the call to std.time.sleep
in run
. Even if new tasks get properly ordered within our queue, once our run
thread goes to sleep, there's nothing we can do. Recall our previous code that looked something like:
const ms = self.timeUntilNextTask();
std.time.sleep(@intCast(ms * std.time.ns_per_ms));
Once we go to sleep, newly added tasks cannot be processed until this sleep ends. What we need is a way to wake up the thread when a new task gets scheduled. Better yet, we only need to wake the thread up if a newly scheduled task needs to run before the current sleep will end. The behavior we want is:
- We start the scheduler. The queue is empty. It goes to sleep "forever".
- We schedule a task to run in 10 seconds. We wake the scheduler up, it detects that the next job should run in 10 seconds, so goes to sleep for 10 seconds.
- We immediately schedule a new task to run in 5 seconds. We wake the scheduler up, it detects that the next job should run in 5 seconds, so goes to sleep for 5 seconds.
- We immediately schedule yet another job, but this one to run in 7 seconds. We don't need to wake the scheduler up, because this job is meant to run after our next one (7 > 5).
Again, the PriorityQueue
solves the issue of ordering. To solve the issue of waking up our scheduler, we turn to std.Thread.Condition
. A condition variable provides a thread-safe way for threads to signal each other. Condition variables interact in specific ways with a mutex to provide certain guarantees, and this interaction is important to understand. We'll begin by adding our condition variable:
fn Scheduler(comptime T: type) type {
return struct {
queue: Queue,
mutex: std.Thread.Mutex,
cond: std.Thread.Condition,
pub fn init(allocator: Allocator) Self {
return .{
.cond = .{},
.mutex = .{},
.queue = Queue.init(allocator, {}),
};
}
...
};
}
We'll take small steps, starting with part of our run
method:
fn run(self: *Self) void {
while (true) {
self.mutex.lock();
while (self.queue.peek() == null) {
self.cond.wait(&self.mutex);
}
}
}
There's a bit of magic and beauty in the above code. The first thing that should be obvious is that we need to lock mutex
before we can check if queue
is empty. Again, all access to self.queue
has to be synchronized. But if you're unfamiliar with condition variables, you might have a couple questions. For example, why don't we unlock mutex
? Why do we check if the queue is empty in a loop? How is cond.wait
different than sleep
?
The difference between cond.wait
and sleep
is that cond.wait
blocks until it receives a signal. This is the mechanism we need in order to let newly scheduled tasks wake our scheduler. This isn't quite our final solution. We actually need a mix of the two behaviors: sleeping until it's time to run the next task OR until we receive a signal indicating newly added tasks. For this, we'll eventually use cond.timedWait
, which does exactly what we need, but we'll get to that in a bit.
As for the first question, cond.wait
and cond.timedWait
expect a locked mutex, and internally release the mutex before going to sleep. Furthermore, the mutex is always locked before the call returns. By passing in a locked mutex, and receiving a locked mutex, the condition variable can be setup atomically, without the risk of another thread slipping in.
As for the while loop, it serves two purposes. First, it's possible for multiple threads to wait on the same condition and be woken up by the same signal (called a broadcast in that case). Second, condition variables can be victims of spurious wakeups, that is, being woken up when it really shouldn't. Both cases are solved by re-checking the condition, which is what our inner while
loop does. Keep in mind that when we do wake up, the mutex
will be locked for us. So whenever you see a call to cond.wait
or cond.timedWait
, your mental model should be something like:
fn wait(c: *std.Thread.Condition, mutex: *std.Thread.Mutex) void {
mutex.unlock();
defer mutex.lock();
}
Having run
wait for a signal is only useful if we actually have something that sends a signal. Thus, we change schedule
:
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
const job = Job(T){
.task = task,
.run_at = run_at,
};
self.mutex.lock();
defer self.mutex.unlock();
try self.queue.add(job);
self.cond.signal();
}
The only thing we've changed is to add the last line. This will wake up run
whenever a task is added. But, we can do better than this. As we discussed earlier, we only want to send a signal under two conditions: the queue was empty or the new task is meant to run before the previous earliest task:
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
const job = Job(T){
.task = task,
.run_at = run_at,
};
self.mutex.lock();
defer self.mutex.unlock();
var reschedule = false;
if (self.queue.peek()) |*next| {
if (run_at < next.run_at) {
reschedule = true;
}
} else {
reschedule = true;
}
try self.queue.add(job);
if (reschedule) {
self.cond.signal();
}
}
This is the final implementation of schedule
. But I do want to highlight the fact that cond.signal
doesn't have to be called under lock. We could rewrite the above code like so:
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
const job = Job(T){
.task = task,
.run_at = run_at,
};
var reschedule = false;
{
self.mutex.lock();
defer self.mutex.unlock();
if (self.queue.peek()) |*next| {
if (run_at < next.run_at) {
reschedule = true;
}
} else {
reschedule = true;
}
try self.queue.add(job);
}
if (reschedule) {
self.cond.signal();
}
}
The self.cond.signal()
call is no longer called under lock. This is a tiny bit more efficient since we hold the lock for a shorter amount of time. But it does highlight something important: in Zig, defer
is run at the end of the current block. The above would be riskier to do in Go where defer
is run at the end of the function. Instead, in Go, we wouldn't be able to use defer
and would have to place calls to unlock
at the correct places, keeping in mind that self.queue.add
could fail, and new control flow could be added in the future.
With schedule
finished, all that's left is to fully implement run
:
fn run(self: *Self) void {
while (true) {
self.mutex.lock();
if (self.timeUntilNextTask()) |ms| {
if (ms > 0) {
const ns: u64 = @intCast(std.time.ns_per_ms * ms);
self.cond.timedWait(&self.mutex, ns) catch {
std.debug.assert(err == error.Timeout);
};
}
} else {
self.cond.wait(&self.mutex);
}
const next = self.queue.peek() orelse continue;
if (next.run_at > std.time.milliTimestamp()) {
continue;
}
const job = self.queue.remove();
self.mutex.unlock();
job.task.run();
}
}
fn timeUntilNextTask(self: *Self) ?i64 {
if (self.queue.peek()) |*next| {
return next.run_at - std.time.milliTimestamp();
}
return null;
}
This is a big leap, but it mostly combined everything we've seen so far. When the queue is empty we use cond.wait
, meaning we'll sleep until a task is scheduled and cond.signal
is called. When the queue isn't empty, we use cond.timedWait
. In this case there are two ways for us to wake up. The first is, again, via a signal sent by the schedule
method. The second is if our timedWait
times out, indicating that we have a task to run.
Once we unblock, the logic is the same in either case. First, we're defensive and make sure that (a) we do indeed have a task and (b) it should be run. We can them remove the task from the queue and unlock the queue while the task is run. Unlocking here is important as it allows other threads to schedule new tasks.
Conclusion
There are a lot of features we might want to add, for example metrics or the ability to stop and restart the scheduler. Much like the PriorityQueue
takes a context that is passed back to our compare
function, it makes sense for our Scheduler(T)
to do the same: taking a context on initialization which is passed to each task.
My Zig Utility Library has the complete implementation that can largely be copy and pasted. And while part of the goal was to write a task scheduler, I hope some readers found insight with respect to multi-threaded programming.