homedark

Introduction To Golang: Buffered Channels

Apr 27, 2013

In previous posts, we've seen some simple and not so simple examples of using channels. So far, our experimentations have been limited to unbuffered channels. Unbuffered channels block receivers until data is available on the channel and senders until a receiver is available. As you might be able to guess from the name, buffered channels will only block a sender once the buffer fills up.

Let's start with a meaningless example:

var c = make(chan int, 5)

func main() {
  go worker(1)
  for i := 0; i < 10; i++ {
    c <- i
    println(i)
  }
}

func worker(id int) {
  for {
    _ = <-c
    time.Sleep(time.Second)
  }
}

The above code creates a buffered channel with 5 slots (I'm not sure what the official Go terminology for this is, but I think of them as slots) along with a worker which slowly receives data from this channel. What do you think happens to the sender, which is writing to the channel? It'll quickly take up all the slots, then block until the worker frees one up by receiving from the channel. The output that you'll see is 0,1,2,3,4,5 printed right away, then 6,7,8 and 9 printed every second. Buffered channels are 0-based (which is why you see 0-5 and not 0-4 as you might have expected).

How's this useful? Let's look at a simple but real example: logging requests for a web server. We have hundreds of requests coming in per second, which we'd like to log. For performance reasons, we want to buffer these logs in memory and only periodically write then to disk. Our first approach, without a buffered channel, might look something like:

var channel = make(chan []byte)
func init() {
  go worker(8192)
}

func Log(req *http.Request) {
  channel <- createLog(req)
}

func createLog(req *http.Request) []byte {
  var buffer bytes.Buffer
  buffer.Write([]byte(req.RemoteAddr))
  buffer.Write([]byte("\t"))
  buffer.Write([]byte(req.URL.Path))
  buffer.Write([]byte("\n"))
  return buffer.Bytes()
}

func worker(size int) {
  buffer := make([]byte, size)
  position := 0
  for {
    entry := <- channel
    length := len(entry)
    if length > size {
      //todo handle a message that we can't fit in our buffer (log an error? save directly to disk?)
    }
    if (length + position) > size {
      //todo flush buffer to disk
      //we'll want to write buffer[0:position] to avoid including data from a previous pass
      position = 0
    }
    copy(buffer[position:], entry)
    position += length
  }
}

There's a bit going on here. First, we create a channel and start a worker. Our Log method takes a request, converts that into entry which is sent to the channel. When the worker gets an entry it'll append it to its buffer. Before it can do that though, it needs to make sure it has enough free space. If it doesn't, it writes the buffer to disk.

(As an aside, we re-use our []byte array like this to avoid having to reallocate small chunks of memory over and over again. This reduces fragmentation and GC. It isn't relevant to the discussion of of channel buffered, but I despise code that over-allocates buffers/byte arrays).

Under normal conditions things run smoothly since all we are doing is copying bytes around. However, whenever we flush to disk, our main request handling goroutine will block while sending to the channel (channel <- createLog(req)). The solution? More workers and a buffered channel. The change is slight:

const workerCount = 4
var channel = make(chan []byte, workerCount)
func init() {
  for i := 0; i < workerCount; i++ {
    go worker(8192)
  }
}

While one worker is busy flushing to disk, others will [hopefully] be available to process events.