Building A Queue - Part 2
In Part 1, we looked at the building blocks of our queue. Today, we'll look at the code used to write data. As you'll recall, data is written as length-prefixed bytes into a segment. We're using memory mapped files. We'll explore this in greater detail in tomorrow's post. For now, simply know that the memory mapped file is exposed as a []byte
and what we write to that array gets written to disk.
The simplest implementation might look something like:
func (t *Topic) Write(message []byte) error {
l := len(message)
binary.LittleEndian.PutUint32(t.segment.data[t.position:], uint32(l))
t.position += 4
copy(t.segment.data[t.position:], message)
t.position += l
return nil
}
We essentially write the 4-byte length prefix and then write the message. Of course, we need to protect against concurrent writes and handle the case where there's no more space in the current segment:
func (t *Topic) Write(message []byte) error {
l := len(message)
t.Lock()
defer t.Unlock()
if t.position + l + 4 > MAX_SEGMENT_SIZE {
t.expand()
}
binary.LittleEndian.PutUint32(t.segment.data[t.position:], uint32(l))
t.position += 4
copy(t.segment.data[t.position:], message)
t.position += l
return nil
}
func (t *Topic) expand() {
segment := NewSegment(t)
t.segment.nextId = segment.Id
t.segment = segment
t.position = SEGMENT_HEADER_SIZE
}
The expand
method creates a new segment, links the old to the new and reposition our offset.
There are other edge cases to worry about: like what happens if the message is larger than a single segment can accommodate. Instead of tackling those types of issues, I'd rather focus on more subtle challenges.
You see, while the above code is a good start, writes to our segment.data
, which is a memory mapped region, aren't synchronously flushed to disk. As-is, it's up to the OS to persist the data to the underlying storage. So, when our function returns, we haven't actually guaranteed that the data is persisted.
We can force a sync via the msync
system call. msync
takes an offset and length which indicates the part of the data that needs to be flushed. The offset must be given in terms of OS page size. For example, if we were writing 5 bytes starting from position 10, we'd give an offset of 0 and a length of 15. If we were writing 10 bytes at position 5000, we'd give an offset of 4096 and a length 914 (assuming the OS had a page size of 4096).
The write function now looks like:
var pageSize = os.Getpagesize()
func (t *Topic) Write(message []byte) error {
l := len(message)
t.Lock()
defer t.Unlock()
if t.position + l + 4 > MAX_SEGMENT_SIZE {
t.expand()
}
// where did we start writing from
start := t.position
binary.LittleEndian.PutUint32(t.segment.data[t.position:], uint32(l))
t.position += 4
copy(t.segment.data[t.position:], message)
t.position += l
from := start / pageSize * pageSize
to := dataStart + l - from
_, _, errno := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&t.segment.data[from])), uintptr(to), syscall.MS_SYNC)
if errno != 0 {
return syscall.Errno(errno)
}
return nil
We're getting closer, but what isn't clear to me is whether msync is atomic. That is, if we ask the OS to sync, does it guarantee that either everything will be written (on success) or nothing will be (on failure)? I believe atomicity is only guaranteed on a per-page basis. However, even a 2-byte message can span 2 pages (if t.position
is at position 4095, for example).
This is a serious problem. What happens if we want to write "hello world". The proper byte sequence for this is: 0B 00 00 00 68 65 6C 6C 6F 20 77 6F 72 6C 64
with 0B 00 00 00
being the little-endian encoding for 11. What if all that gets flushed is 0B 00 00 00 68 65 6C
(and the reset of the file remains 00)? When the system restarts, we have no way to tell that the last message is corrupt...in terms of bytes, 0B 00 00 00 68 65 6C 00 00 00 00 00 00 00 00
is completely valid.
So far, the only solution that I've come up with is to append a marker byte at the end of each message. So "hello world" actually looks like: 0B 00 00 00 68 65 6C 6C 6F 20 77 6F 72 6C 64 FF
(note the extra FF
at the end).
With the marker byte, we can tell whether a message was fully flushed to disk. Or can we? What if a message spans 3 pages. The marker byte would obviously be on the 3rd page. Is it possible for pages 1 and 3 to be successfully flushed, while 2 fails? I don't know. I hope not. I need to look at how other systems handle this (and whether using a marker byte is even remotely close to a good solution).
The above issue aside, that's it for now. We still need to notify consumers that a new message is available, and we still need to look at how the topic's position is persisted (else we risk overwriting existing message on a restart). All in good time.