home
20 Jan 2013 - By Karl Seguin

Introduction To Golang: Channels (Again)

It can take some time to wrap your head around channels. For this reason, I wanted to expand on the previous post with another, more advanced, example. It's a bit complicated, but quite neat. The problem we are trying to solve is simple though. We have a reverse proxy, in Go, and we want to avoid having multiple threads fetch the same file at the same time. Instead, we'd like one of the threads to download it and block the other threads until the file is ready.

For this example, our entry point will be the Download function. This will get called by multiple threads:

func Download(remotePath string, saveAs string) {

}

To solve this problem we'll create a map which'll map the remotePath to a downloader. If no thread is currently downloading the file we'll register ourselves as the downloader. If someone is already downloading it, we'll block until they are done:

import (
  "time"
)

var downloaders = make(map[string] *downloader)
type downloader struct {
  saveAs string
}

func Download(remotePath string, saveAs string) {
  d, ok := downloaders[remotePath]
  if ok == false {
    d = &downloader{saveAs}
    downloaders[remotePath] = d
    d.download(remotePath)
  } else {
    //TODO 1: need to figure out how to block until d is done
  }
}

func (d *downloader) download(remotePath string) {
  time.Sleep(5 * time.Second) //simuate downloading the file to disk
  //the file is now available at d.saveAs
  delete(downloaders, remotePath)

  //TODO 2: need to signal all blocked threads
}

Our rough skeleton already has a major issue. It's completely possible for two threads to register themselves as a downloader for the same path at the same time. Access to downloaders must be synchronized. We can accomplish this with the synchronization primitives offered by the sync package, such as a mutex or, better yet, a read-write mutex. For this post we'll keep our code unsafe so we can focus on channel (we could also use channels to synchronize access, but the sync package is great for this type of simple stuff)

We have two missing pieces: blocking the latecomers and letting them know when the file is ready. What we want here is an unbuffered channel. An unbuffered channel synchronizes communication; or, put differently, it blocks the sender until the receiver reads form it. We'll add this channel to our downloader:

type downloader struct {
  saveAs string
  observers chan bool
}

And change the code that creates our downloaders:

//old
d = &downloader{saveAs}

//new
d = &downloader{saveAs, make(chan bool)}

With our channel created, the simplest way to block our latecomers is to write to our channel. Todo 1 becomes:

} else {
  d.observers <- true
  //once we get here, d.saveAs will be set
}

Remember, that first line (writing to the channel) will block until the other end reads from. No where in our code are we doing that yet, so for now, it'll block forever. Before we skip to that part, there's at least one way we can improve this. We can make it timeout. The last thing we want is to have a bunch of threads blocked forever because something went wrong in the downloader.

To do this, we'll use Go's select construct. select looks a lot like a switch statement, which can really throw you off at first. What select does, is that it selects a channel from a list. If no channel is ready, it blocks or executes defualt if it has been provided. If multiple channels are ready, it randomly picks one. Let's look at it:

select {
  case d.observers <- true:
    //d.saveAs is ready
  default:
    //handle the timeout (return an error or download the file yourself?)
}

This isn't very good. We know that the other end won't be reading from the channel until the download is done, so we'll immediate jump to the default, without blocking, and get an error. What we really want is to delay the execution of default. To do this we use the time.After function, which'll give us a channel after the specified time:

select {
  case d.observers <- true:
    //d.saveAs is ready
  case <- time.After(5 * time.Second): //adjust the time as needed
    //handle the timeout (return an error or download the file yourself?)
}

We'll go into select and block. We'll unblock under two conditions: our write to observers is read, or 5 seconds goes by and the channel created by time.After writes and unblocks us.

The last part is Todo 2, which is to notify all our blocked latecomers. Now, since these are blocked waiting for a reader, we simply need to read from observers:

func (d *downloader) download(remotePath string) {
  time.Sleep(5 * time.Second) //simuate downloading the file to disk
  //the file is now available at d.saveAs
  delete(downloaders, remotePath)

  for _ := range d.observers {
  }
}

range, when applied to a channel, loops through and reads the channel (when applied to an array or a map, it loops through or reads the array/map). We don't actually care what we are reading, so we discard it by assigning it to _. The above code actually unblocks our latecomers, but it blocks this code. If we have 3 observers, we'll loop 3 times and set them free, but we'll block indefinitely. The solution? select again:

func (d *downloader) download(remotePath string) {
  time.Sleep(5 * time.Second) //simuate downloading the file to disk
  //the file is now available at d.saveAs
  delete(downloaders, remotePath)
  for {
    select{
      case <- d.observers:
      default: return
    }
  }
}

This code will read all the values on d.observers and then, when there are no more values, it'll simply return.

That's it. There's a bunch of improvements we could make. Access to downloaders needs to be safe. Our notification code (the last example we looked at), could be run in its own goroutine so that we don't block the delay it from returning.

A more advanced version could download chunks of data and broadcast that to all the observers as the chunks become available. As-is, our solution blocks until the entire file is downloaded. However, depending on what you are doing, and how big the files are, maybe it's better to stream the data back as it becomes available.

Hopefully this helps you understand how channels can be used and where some of Go's language constructs, like range and select fit.

post tags: concurrencygolang
blog comments powered by Disqus