homedark

Writing a task scheduler in Zig

May 09, 2024

I'm working on an application that needs the ability to schedule tasks. Many applications have a similar need, but requirements can vary greatly. Advanced cases might require persistence and distribution, typically depending on external systems (like a database or queue) to do much of the heavy lifting. My needs are simpler: I don't have a huge variety of tasks or a high number of them.

Thread-Per-Task

At its simplest, we could use a thread with sleep to schedule some work at some future point:

const std = @import("std");

pub fn main() !void {
  const thread = try std.Thread.spawn(.{}, say, .{"hello world!", 3 * std.time.ns_per_s});
  thread.join();
}

fn say(msg: []const u8, delay: u64) void {
  std.time.sleep(delay);
  std.debug.print("{s}\n", .{msg});
}

Above is a standalone example in case you want to run it, but the same approach could easily be incorporated into a larger application. If you're new to Zig, the parameters to Thread.spawn are probably a little confusing. spawn takes 3 arguments: a configuration structure, the function to run, and the arguments to pass to the function. The first parameter, .{}, is something you'll see often in Zig and is the result of two language features: type inference and default field values. Imagine you have this struct:

const Config = struct {
  stack_size: usize =  16_384,
  allocator: ?std.mem.Allocator = null,
};

And this function:

fn doSomething(config: Config) !void {
  ...
}

These four calls to run are equivalent:

// 1 - explicit config and explicit field values
run(Config{
  .stack_size = 16_384,
  .allocator = null,
});

// 2 - explicit Config, using the default field values
run(Config{});

// 3 - implicit (inferred) Config type and explicit field values
run(.{
  .stack_size = 16_384,
  .allocator = null,
});

// 4 - implicit (inferred) Config type, using the default field values
run(.{});

In short, run(Config{...}) and run(.{...}) are the same, with the compiler inferring the type of the paramter based on the definition of run.

It's also worth remembering that structure methods are normal functions called with the dot syntax. Above, we specified run as the function spawn should call along with the arguments. But you could (and in often cases would want to) call a structure's method:

const std = @import("std");

pub fn main() !void {
  var person = Person{.name = "Vegeta"};
  const thread = try std.Thread.spawn(
    .{},
    Person.say,
    .{&person, "it's over 9000!", 3 * std.time.ns_per_s}
  );
  thread.join();
}

const Person = struct {
  name: []const u8,

  fn say(p: *const Person, msg: []const u8, when: u64) void {
    std.time.sleep(when);
    std.debug.print("{s} said: {s}\n", .{p.name, msg});
  }
};

Be warned though that this example greatly increases the risk of introducing thread-unsafe code. Like our original code, this code has two running threads: the main thread and the thread launched by our call to spawn. However, in this version, the two threads have access to the same memory, the address of person. Since our launched thread reads name, if we changed our main thread to write to name, we'd have to introduce some form of protection (for example, by using a mutex).

Both examples calls thread.join();. When we start a thread, we should call either detach or join. These don't have to be called immediately. The thread which calls join blocks until the joined thread ends. Conversely, calling detach allows the calling thread to continue. If you replace thread.join() with thread.detach() the program will exit before the launched thread has time to finish executing. This is because the thread that calls detach is, in this case, our main thread, and after calling detach it continues to execute normally. For this program, that means reaching the end of main and exiting. Programs don't wait for all threads to finish, in this regards, the main / original thread is special.

As a final point, a while (true) loop can be added to run a recurring job:

fn say(p: *const Person, msg: []const u8, when: u64) void {
  while (true) {
    std.time.sleep(when);
    std.debug.print("{s} said: {s}\n", .{p.name, msg});
  }
}

Naive Scheduler

If we're doing more than firing the occasional one-off task, we might run into some limitations with the above approach. Maybe we want some insights/metrics into the pending tasks, such as how many we have and when the next one is scheduled to run. If we have a lot of tasks, or a dynamic number of tasks, it might not be ideal to dedicate a whole thread to each.

To start solving these requirements, we'll create a Scheduler that can take an arbitrary number of tasks each with their own schedule, and execute them within a single thread. If necessary, we could add a thread pool, but we'll start with a single thread.

We can start with an implementation that takes our above simple example and extends it with a queue of tasks to be run. Before we do that though, let's move beyond our hardcoded run task and use something more flexible. Specifically, we'll use a tagged union which will be application-specific:

const Task = union(enum) {
  say: Say,
  db_cleaner: void,

  const Say = struct {
    person: *Person,
    msg: []const u8,
  };

  pub fn run(task: Task) void {
    switch (task) {
      .say => |s| std.debug.print("{s} said: {s}\n", .{s.person.name, s.msg}),
      .db_cleaner => {
        std.debug.print("Cleaning old records from the database\n", .{});
        // TODO,
      }
    }
  }
};

In this case, we support two (dummy) tasks. The run method is important as this will be the function our scheduler will call. The result will be a generic scheduler that can operate on any type of task or sets of tasks.

Next, we create a Job, which is just a Task with a run_at field which is the unix timestamp (in milliseconds) that the task should be run at:

fn Job(comptime T: type) type {
  return struct {
    task: T,
    run_at: i64,
  };
}

The last component is our actual Scheduler. We'll build it up over time, but we can start with a skeleton:

fn Scheduler(comptime T: type) type {
  return struct {
    queue: Queue, // Queue is defined a few lines down
    allocator: Allocator,

    // "Self" is a nicer name than "Scheduler(T)"
    const Self = @This();

    // "Queue" is a nicer name than "std.DoublyLinkedList(Job(T))"
    const Queue = std.DoublyLinkedList(Job(T));

    pub fn init(allocator: Allocator) Self {
      return .{
        .queue = Queue{},
        .allocator = allocator,
      };
    }

    pub fn deinit(self: *Self) void {
      // free any linked list nodes we have in our queue
      // for any jobs we haven't run yet.
      while (self.queue.pop()) |node| {
        self.allocator.destroy(node);
      }
    }
  };
}

Our scheduler will use a doubly linked list for its queue. Later, we'll need to change this, but for now, the ability to add new tasks at the end of the queue and process tasks at the start of the queue is good enough. Our scheduler is missing two parts: the ability to schedule a task and the code to actually run those tasks. Keeping in mind that this is our naive implementation, the first part is easy to implement:

fn Scheduler(comptime T: type) type {
  return struct {
    ...

    pub fn schedule(self: *Self, task: T, run_at: i64) !void {
      const node = try self.allocator.create(Queue.Node);
      node.data = Job(T){
        .task = task,
        .run_at = run_at,
      };
      self.queue.append(node);
    }

    pub fn scheduleIn(self: *Self, task: T, ms: i64) !void {
      return self.schedule(task, std.time.milliTimestamp() + ms);
    }
  };
}

We've added two function. The first, schedule takes an absolute timestamp. The second, scheduleIn takes a relative time (from now) in milliseconds. We take an i64 rather than a u64 because this is what std.time.milliTimestamp() returns (in order to represent dates prior to Jan 1, 1970) so it makes our methods easier to use.

The code to run the tasks is more complicated. We'll start with a buggy implementation:

fn Scheduler(comptime T: type) type {
  return struct {
    ...
    // This is the public method that's called to start the scheduler.
    // It launches a new thread.
    pub fn start(self: *Self) !void {
      const thread = try std.Thread.spawn(.{}, Self.run, .{self});
      thread.detach();
    }

    fn run(self: *Self) void {
      while (true) {
        const ms = self.timeUntilNextTask() orelse 1000;
        std.time.sleep(@intCast(ms * std.time.ns_per_ms));

        if (self.queue.popFirst()) |node| {
          // make sure we free the linked list node once we're
          // done running the task
          defer self.allocator.destroy(node);

          // run the task! The node data is the Job, which we set in
          // the schedule method above
          node.data.task.run();
        }
      }
    }

    fn timeUntilNextTask(self: *Self) ?i64 {
      if (self.queue.first) |first| {
        return std.time.milliTimestamp() - first.data.run_at;
      }
      return null;
    }

If you were to put it all together, it would probably work, but it has a lot of issues, from incorrect functionality to being thread-unsafe. One obvious issue is our handling of an empty queue. Right now, we sleep for 1 second and see if there's any tasks to run. Consider what that means for the following usage:

var s = Scheduler(Task).init(allocator);
defer s.deinit();

try s.start();
var person = Person{.name = "Vegeta"};
try s.scheduleIn(.{.say = .{.person = &person, .msg = "over 9000!"}}, 8000);

Do you see the issue? We schedule the job to run in 8 seconds, but there's a good chance that run will sleep for 1 second (due to the queue being empty), wake up and process the newly added task immediately. To fix this, we need to double check the first job:

while (true) {
  const ms = self.timeUntilNextTask() orelse 1000;
  std.time.sleep(@intCast(ms * std.time.ns_per_ms));

  const first = self.queue.first orelse {
    // queue is still empty, go back to the start of the loop
    // and try again
    continue;
  }
  const job = first.data;
  if (job.run_at > std.time.milliTimestamp()) {
    // we woke up too earlier, go back to the start of tte loop
    // and try again.
    continue;
  }
  self.queue.remove(first);
  self.allocator.destroy(first);
  job.data.task.run();
}

But this is hardly the only timing issue we have. We also need to handle the case where timeUntilNextTask returns a negative value. More seriously, our scheduler assumes that the tasks are inserted in order that they should be run. If we schedule a task to be run in 1 hour, then schedule a task to be run in 10 seconds, our scheduler will sleep for 1 hour and our 2nd task will be run much too late. Finally, our handling of the empty queue isn't ideal; it would be nice to come up with an alternative to waking the thread every second.

The final major issue with this implementation is that it isn't thread-safe. The run method runs in its own thread, started by the start method. The schedule and scheduleIn methods are meant to be called from the main thread, or any other thread. However, both share and operate on the same queue field. This type of access has to be synchronized. If one thread happens to append to the queue via a call to schedule, while run calls remove, the behavior is undefined; the best possible outcome is a crash.

The threading issue can be fixed by adding a mutex, but the scheduling issue requires more fundamental changes. So while this was a good first step, it's time to look at a more correct implementation.

Scheduler

At a high level, the shortcomings of our naive implementation can be solved by:

  1. Using a sortable data structure to order pending jobs by their run_at field
  2. Using a thread-safe signal to notify our scheduler when new jobs are added
  3. Ensuring that all access to our shared data structure is synchornized by a mutex

We begin by replacing our std.DoublyLinkedList with an std.PriorityQueue. Like our linked list, the priority queue allows us to append new jobs and pop existing ones off. It also requires that we provide a function to compare two entries to control how entries are ordered. We begin with a new skeleton, which is similar to what we started with before:

fn Scheduler(comptime T: type) type {
  return struct {
    queue: Queue,
    mutex: std.Thread.Mutex,

    const Self = @This();
    const Queue = std.PriorityQueue(Job(T), void, compare);

    fn compare(_: void, a: Job(T), b: Job(T)) std.math.Order {
      return std.math.order(a.run_at, b.run_at);
    }

    pub fn init(allocator: Allocator) Self {
      return .{
        .mutex = .{},
        .queue = Queue.init(allocator, {}),
      };
    }

    pub fn deinit(self: *Self) void {
      self.queue.deinit();
    }
  };
}

Our Queue has changed type. Not only does the PriorityQueue take the type of value to store (as DoublyLinkedList does), it also takes a context and a function to compare two nodes. The context exits for cases where comparison is stateful. Whatever context we pass to Queue.init gets passed as the first parameter to the compare function. The context can be used to inform how two values should be compared. In many cases, including our own, there will be no context, and thus we specify a void context type and pass a void context value ({}) to init.

We've also added a std.Thread.Mutex, which we'll use in the following code to synchronize access to the queue. The mutex is initialized using the following syntax: .mutex = .{}. This is the same syntax and the same behavior as Thread.spawn's config parameter; again relying on Zig's type inference and default field values.

For now, our schedule method is similar to the previous one:

pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };
  self.mutex.lock();
  defer self.mutex.unlock();
  try self.queue.add(job);
}

// unchanged
pub fn scheduleIn(self: *Self, task: T, ms: i64) !void {
  return self.schedule(task, std.time.milliTimestamp() + ms);
}

When using the DoublyLinkedList, we were responsible for managing the Node memory. We had to create the nodes and free them, both in deinit and after running the task in run. The PriorityQueue has a different interface and manages its nodes internally. Discounting the introduction of our mutex, this makes both our deinit and schedule methods simpler. In fact, you might have noticed that our Scheduler(T) no longer holds a reference to the allocator since we no longer manage any memory direclty ourselves.

Although this code is useless without the start and run methods, it's worth re-iterating how important synchronizing access to self.queue is. Even with schedule alone, if our application is multi-threaded, two threads could call schedule at the same time and thus attempt to mutate self.queue at the same time, which would be an undefined behavior.

Switching to a PriorityQueue gets us half way there. We can now be sure that the first task in our queue is the one that should be run soonest. But we still face the issue of having tasks scheduled out of order. The core of our issue is the call to std.time.sleep in run. Even if new tasks get properly ordered within our queue, once our run thread goes to sleep, there's nothing we can do. Recall our previous code that looked something like:

const ms = self.timeUntilNextTask();
std.time.sleep(@intCast(ms * std.time.ns_per_ms));

Once we go to sleep, newly added tasks cannot be processed until this sleep ends. What we need is a way to wake up the thread when a new task gets scheduled. Better yet, we only need to wake the thread up if a newly scheduled task needs to run before the current sleep will end. The behavior we want is:

  1. We start the scheduler. The queue is empty. It goes to sleep "forever".
  2. We schedule a task to run in 10 seconds. We wake the scheduler up, it detects that the next job should run in 10 seconds, so goes to sleep for 10 seconds.
  3. We immediately schedule a new task to run in 5 seconds. We wake the scheduler up, it detects that the next job should run in 5 seconds, so goes to sleep for 5 seconds.
  4. We immediately schedule yet another job, but this one to run in 7 seconds. We don't need to wake the scheduler up, because this job is meant to run after our next one (7 > 5).

Again, the PriorityQueue solves the issue of ordering. To solve the issue of waking up our scheduler, we turn to std.Thread.Condition. A condition variable provides a thread-safe way for threads to signal each other. Condition variables interact in specific ways with a mutex to provide certain guarantees, and this interaction is important to understand. We'll begin by adding our condition variable:

fn Scheduler(comptime T: type) type {
  return struct {
    queue: Queue,
    mutex: std.Thread.Mutex,
    cond: std.Thread.Condition,

    pub fn init(allocator: Allocator) Self {
      return .{
        .cond = .{},
        .mutex = .{},
        .queue = Queue.init(allocator, {}),
      };
    }

    ...
  };
}

We'll take small steps, starting with part of our run method:

fn run(self: *Self) void {
  while (true) {
    self.mutex.lock();
    while (self.queue.peek() == null) {
      self.cond.wait(&self.mutex);
    }
    // TODO
  }
}

There's a bit of magic and beauty in the above code. The first thing that should be obvious is that we need to lock mutex before we can check if queue is empty. Again, all access to self.queue has to be synchronized. But if you're unfamiliar with condition variables, you might have a couple questions. For example, why don't we unlock mutex? Why do we check if the queue is empty in a loop? How is cond.wait different than sleep?

The difference between cond.wait and sleep is that cond.wait blocks until it receives a signal. This is the mechanism we need in order to let newly scheduled tasks wake our scheduler. This isn't quite our final solution. We actually need a mix of the two behaviors: sleeping until it's time to run the next task OR until we receive a signal indicating newly added tasks. For this, we'll eventually use cond.timedWait, which does exactly what we need, but we'll get to that in a bit.

As for the first question, cond.wait and cond.timedWait expect a locked mutex, and internally release the mutex before going to sleep. Furthermore, the mutex is always locked before the call returns. By passing in a locked mutex, and receiving a locked mutex, the condition variable can be setup atomically, without the risk of another thread slipping in.

As for the while loop, it serves two purposes. First, it's possible for multiple threads to wait on the same condition and be woken up by the same signal (called a broadcast in that case). Second, condition variables can be victims of spurious wakeups, that is, being woken up when it really shouldn't. Both cases are solved by re-checking the condition, which is what our inner while loop does. Keep in mind that when we do wake up, the mutex will be locked for us. So whenever you see a call to cond.wait or cond.timedWait, your mental model should be something like:

fn wait(c: *std.Thread.Condition, mutex: *std.Thread.Mutex) void {
  // do some setup
  // ...

  mutex.unlock();

  // whatever happens, we'll always return with this locked
  defer mutex.lock();

  // wait for signal
  // or timeout if calling timedWait
  // ...
}

Having run wait for a signal is only useful if we actually have something that sends a signal. Thus, we change schedule:

pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };

  self.mutex.lock();
  defer self.mutex.unlock();
  try self.queue.add(job);
  self.cond.signal(); // <-- added this
}

The only thing we've changed is to add the last line. This will wake up run whenever a task is added. But, we can do better than this. As we discussed earlier, we only want to send a signal under two conditions: the queue was empty or the new task is meant to run before the previous earliest task:

pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };

  self.mutex.lock();
  defer self.mutex.unlock();

  var reschedule = false;
  if (self.queue.peek()) |*next| {
    if (run_at < next.run_at) {
      reschedule = true;
    }
  } else {
    // the queue was empty
    reschedule = true;
  }

  try self.queue.add(job);

  if (reschedule) {
    // only send the signal under the above two conditions
    self.cond.signal();
  }
}

This is the final implementation of schedule. But I do want to highlight the fact that cond.signal doesn't have to be called under lock. We could rewrite the above code like so:

pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };

  var reschedule = false;
  {
    self.mutex.lock();
    defer self.mutex.unlock();

    if (self.queue.peek()) |*next| {
      if (run_at < next.run_at) {
        reschedule = true;
      }
    } else {
      reschedule = true;
    }
    try self.queue.add(job);

    // mutex is unlocked here by the above defer
  }

  if (reschedule) {
    // Our new job is scheduled before our previous earlier job
    // (or we had no previous jobs)
    // We need to reset our schedule
    self.cond.signal();
  }
}

The self.cond.signal() call is no longer called under lock. This is a tiny bit more efficient since we hold the lock for a shorter amount of time. But it does highlight something important: in Zig, defer is run at the end of the current block. The above would be riskier to do in Go where defer is run at the end of the function. Instead, in Go, we wouldn't be able to use defer and would have to place calls to unlock at the correct places, keeping in mind that self.queue.add could fail, and new control flow could be added in the future.

With schedule finished, all that's left is to fully implement run:

fn run(self: *Self) void {
  while (true) {
    self.mutex.lock();
    if (self.timeUntilNextTask()) |ms| {
      if (ms > 0) {
        const ns: u64 = @intCast(std.time.ns_per_ms * ms);
        self.cond.timedWait(&self.mutex, ns) catch {
          // only error possible from timedWait
          std.debug.assert(err == error.Timeout);
        };
      }
    } else {
      self.cond.wait(&self.mutex);
    }
    // at this point, mutex is locked

    // possible that we were victims of a spurious wakeup
    // so we must always be defensive

    // do we even has a job...?
    const next = self.queue.peek() orelse continue;

    // ... and if so, should it be run now?
    if (next.run_at > std.time.milliTimestamp()) {
      continue;
    }

    // ok, this should be run
    const job = self.queue.remove();

    // don't want to hold the lock while processing the job
    self.mutex.unlock();

    // run it
    job.task.run();
  }
}

fn timeUntilNextTask(self: *Self) ?i64 {
  // assumed to be called while mutex.lock is held
  if (self.queue.peek()) |*next| {
    return next.run_at - std.time.milliTimestamp();
  }
  return null;
}

This is a big leap, but it mostly combined everything we've seen so far. When the queue is empty we use cond.wait, meaning we'll sleep until a task is scheduled and cond.signal is called. When the queue isn't empty, we use cond.timedWait. In this case there are two ways for us to wake up. The first is, again, via a signal sent by the schedule method. The second is if our timedWait times out, indicating that we have a task to run.

Once we unblock, the logic is the same in either case. First, we're defensive and make sure that (a) we do indeed have a task and (b) it should be run. We can them remove the task from the queue and unlock the queue while the task is run. Unlocking here is important as it allows other threads to schedule new tasks.

Conclusion

There are a lot of features we might want to add, for example metrics or the ability to stop and restart the scheduler. Much like the PriorityQueue takes a context that is passed back to our compare function, it makes sense for our Scheduler(T) to do the same: taking a context on initialization which is passed to each task.

My Zig Utility Library has the complete implementation that can largely be copy and pasted. And while part of the goal was to write a task scheduler, I hope some readers found insight with respect to multi-threaded programming.