home

Building A Queue - Part 3

Mar 12, 2015

In Part 2, we saw the code used to write messages into our queue, including the code to make sure it's properly persisted. Our approach relies on appending data to memory mapped file and switching to a new file once we reach a specific size. To accomplish this, we keep a reference to the current file and the position within that file for the next write (as an offset from the start).

If ever our queue restarts, we'll be in big trouble unless we persist this positional information. Since segments are linked, all we need is the name of any segment (we could even just scan the file system). Once we have 1 segment, we can walk through the queue, first by finding the newest segment, then by finding the end of the messages (as suggested by ayende I switched the marker byte to a crc32 checksum):

func (t *Topic) findWritePosition(segment *Segment) {
  // find the latest segment
  for segment.nextId != 0 {
    segment = openSegment(segment.nextId)
  }
  t.segment = segment

  // find the end of the data
  // which will be either a 0 length message
  // or the the first invalid message (based on a crc checksum)
  position := SEGMENT_HEADER_SIZE
  for position < uint32(t.segmentSize) {
    l := encoder.Uint32(segment.data[position:])
    if l == 0 {
      break
    }
    hash := encoder.Uint32(segment.data[position+4:])
    dataStart := position + 8
    dataEnd := dataStart + l
    if hash != crc32.ChecksumIEEE(segment.data[dataStart:dataEnd]) {
      break
    }
    position += l + 8
  }
  t.position = position
}

For our writer, we need a way to persist the current segment reference. For our consumers, we need both the segment and the position (since a consumer hasn't necessarily delivered every pending message).

This is where our other type of file, the state file, comes into play. The state file is basically a name, a segment id and a position. The topic's state is the same, but the name happens to be blank and it's always the first entry in the file.

What's really cool about this is that, like the segments, we're using a memory mapped file. But where it made sense to think of the segment file as just a []byte that we write into, things are a bit more indirect with our state file. Really, what we want, is a structure that we can update the position and segment fields:

type State struct {
  name [32]byte
  segment uint64
  position uint64
}

One approach would be to serialize and deserialize states to the state file. The problem with that is that we plan on updating the data hundreds of times per second. Therefore, we'll map the byte array directly to our structure. Consider:

package main

import (
  "fmt"
  "unsafe"
)

type Sample struct {
  flag byte
  id   uint32
}

func main() {
  data := []byte{10, 0, 0, 0, 20, 1, 0, 0}
  sample := (*Sample)(unsafe.Pointer(&data[0]))

  fmt.Println(sample.flag, sample.id)
  //prints 10 and 276

  sample.flag = 3
  fmt.Println(data)
  //prints [3 0 0 0 20 1 0 0]
}

As you can see, this is powerful. Just by doing t.position += 4 the underlying byte array is updated which, in turn, updates the file. It's also dangerous. Notice that Sample is 8 bytes, not 5 as you might have expected. That's because the structure is padded. As far as I know, there are no guarantees about the byte representation of a structure. It could change from one platform to another or from one version of Go to another.

Our approach explains why we're using a fixed-length byte array for the name rather than the more traditional slice. We need a flat, reference free structure which maps to a contiguous block of memory. This also means that, if we wanted to, we could load a specific state based on its offset:

type States struct {
  data     []byte
}

type State struct {
  name [32]byte
  segment uint64
  position uint64
}

var stateSize = int(unsafe.Sizeof(State{}))

func (s *States) loadState(index int) *State {
  return (*State)(unsafe.Pointer(&s.data[index * stateSize]))
}

So, that's all there is to tracking our state. While the technique might not be useful in every day programming, it does expose some fundamental things about how things actually work.

As a final note, can you guess what the following code outputs (and why)?

import (
  "fmt"
  "unsafe"
)

type A struct {
  a byte
  b int
  c byte
}

type B struct {
  a byte
  b byte
  c int
}

func main() {
  fmt.Println(unsafe.Sizeof(A{}))
  fmt.Println(unsafe.Sizeof(B{}))
}