Go/Golang is powerful language for developing efficient server side applications. I’ve been playing around with Go lately and decided to share some experiments. So here I present a simple data processing pipeline using Golang channels.

The example is far from being a full framework but it’s rich regarding the usage language features that I hope they can be useful as a getting started guide or a quick reference.

Golang

Go brings a lot of what’s already offered by many other languages (e.g. C, Java, etc) but have some unique characteristics that makes it an appealing language (e.g. ease of deployment, package management, performance, channels).

Channels and Routines

A channel is the way two go routines have to communicate with one another through messaging. They are based on the semantics of Communicating Sequential Processes (CSP).

Go routines are independent lightweight threads designed to run concurrently with each other. The Go runtime will manage the Go routines and multiplex them between real operating system threads as needed, thus optimizing potential parallel execution and CPU usage by avoiding idle times.

This model favors the principle of sharing memory by communicating instead of communicating by sharing memory as traditional approaches (e.g. Java, C++) do, allowing for a more efficient way of designing concurrent applications by avoiding locks and contention accessing shared resources.

Channels and routines are the basis of some really cool tools these days, I’d recommend taking a look at fwd - network port forwarder as an example.

What we are building?

In this example we are building a simple processing pipeline that consumes a text line from a socket and sends it through a series of processes to extract independent words, filter the ones starting with # and printing the result to the console. For this, a set of structures and functions were created so we can try around and build other kind of pipelines at will.

How is this designed?

Below is a picture of how this is designed. The basic elements are collectors, processors and messages. A message is part of the internal domain. It’s basically a set of key value pairs that are created in the collector and exchanged through several processors.

GoStreamer Design

Collectors

A collector is a simple structure handles some commonalities to all collectors like naming and channel creation/closing.

type Collector struct {
    name    string
    collect CollectFunction
}

It calls a custom function that is responsible for actually collecting the data from a source and publish it one or multiple messages to a given output channel. The custom function are required to obey a predetermined signature, defined as below.

type CollectFunction func(name string, out chan Message)

The execute function is responsible for dealing with the creation and closing of the output channel and call the custom collect function in the process. The custom function that writes into the output channel needs to run inside a independent routine. The WaitGroup allows the main process that triggers the Go routine to wait for it to finish. After creating a channel, two routines are triggered. One that does the collection and signals when its done. Another one that waits by the done signal, and closes the channel upon receiving it.

func (collector *Collector) Execute() <- chan Message {
    var wg sync.WaitGroup
    wg.Add(1)

    // create the channel
    out := make(chan Message)

    // triggers the routine that executes the work
    go func() {
        collector.collect(collector.name, out)
        wg.Done()
    }()

    // triggers the routine that waits for the work completion to close the channel
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Processors

A processor is an independent unit that have those same commonalities like naming and channel creation/closing.

type Processor struct {
    name    string
    process ProcessFunction
    demux   ChannelDemux
}

Unlike collectors, they receive a stream of messages through an input channel and they have ability to demultiplex those messages into multiple internal tasks (go routines) through the use of internal channels, so they can potentially be processed in parallel. As it happens with the collectors, the custom process function must obey to a predetermined signature, defined as below:

type ProcessFunction func(name string, input Message, out chan Message)

The execute function is responsible for dealing with the creation and closing of the output channel, calling the demultiplexing function that will forward the messages to the tasks that will execute the custom process function and output the result to a given output channel.

func (processor *Processor) Execute(input <- chan Message) <- chan Message {
    var wg sync.WaitGroup
    numTasks := processor.demux.FanOut()
    wg.Add(numTasks)

    // create the channel
    out := make(chan Message)

    // define the work unit that processes the input stream and signals when it's done
    work := func(taskId int, inputStream <- chan Message) {
        for message := range inputStream {
            processor.process(processor.name, message, out)
        }
        wg.Done()
    }

    // triggers the routine that demultiplex the input stream into multiple streams
    go func() {
        processor.demux.Execute(input)
        for i := 0; i < numTasks; i++ {
            go work(i, processor.demux.Output(i))
        }
    }()

    // triggers the routine that waits for the work completion to close the channel
    go func() {
        wg.Wait()
        close(out)
    }()

    // the channel is returned in the meantime so a process can be created that listens to it
    return out
}

Demultiplexing

There are two provided ways of demultiplexing messages inside a processor. One of them is by randomly shuffling a task index, another is by selecting the task index based message value associated to a given key (e.g. all price updates for the same stock id go through the same channel). Though, as the demultiplexer is used as an interface inside the processors, we can inject any other custom implementation.

type ChannelDemux interface {
    Execute(input <- chan Message)
    Output(index int) <- chan Message
    FanOut() int
}

type IndexedChannelDemux struct {
    ChannelDemux
    out   [] chan Message
    index IndexFunction
}

Then the demultiplexer provides the execute function that handles the creation and closing of the required output channels for a given input channel and calls a custom function that does the actual assignment of an index to a given message.

func (demux *IndexedChannelDemux) Execute(input <- chan Message) {
    nchannels := len(demux.out);

    var wg sync.WaitGroup
    wg.Add(1)

    go func() {
        for message := range input {
            // assign index
            index := demux.index(nchannels, message)

            // Emit message
            demux.out[index] <- message
        }

        wg.Done()
    }()

    go func() {
        wg.Wait()
        for i := 0; i < nchannels; i++ {
            close(demux.out[i])
        }
    }()
}

Sample Pipeline

Now let’s build our pipeline using the mechanisms defined above.

GoStreamer Sample

Sample Custom Functions

So now we will make use of the above to build the actual pipeline. Let’s first create a sample collect function that reads from a socket and creates a message to send downstream.

func TextSocketCollector(name string, out chan streamer.Message) {
    listener, _ := net.Listen("tcp", ":9999")
    conn, _ := listener.Accept()

    for {
        line, _ := bufio.NewReader(conn).ReadString('\n')
        line = strings.TrimSuffix(line, "\n")

        out_message := streamer.NewMessage()
        out_message.Put("line", line)

        log.Printf("Received raw message from socket: %s\n", out_message)

        out <- out_message
    }
}

Now let’s create a sample process function that picks the lines and splits them into words.

func WordExtractor(name string, input streamer.Message, out chan streamer.Message) {
    line, _ := input.Get("line").(string)

    words := strings.Split(line, " ")

    for _, word := range words {
        out_message := streamer.NewMessage()
        out_message.Put("word", word)
        log.Printf("Extracted word: %s\n", word)
        out <- out_message
    }
}

And now another sample process that filters out the words that does not start with a # and sends downstream the ones that do.

func HashTagFilter(name string, input streamer.Message, out chan streamer.Message) {
    word, _ := input.Get("word").(string)

    if (strings.HasPrefix(word, "#")) {
        out_message := streamer.NewMessage()
        out_message.Put("hashtag", word)
        log.Printf("Filtered hashtag %s\n", word)
        out <- out_message
    }
}

We just need one more custom process function that will pickup the filtered messages and logs them into the console.

func HashTagPublisher(name string, input streamer.Message, out chan streamer.Message) {
    hashtag, _ := input.Get("hashtag").(string)
    log.Printf("Publishing %s\n", hashtag)
}

Connecting the dots

And finally, let’s glue all of them together in a pipeline.

func RunPipeline() {
    // build pipeline elements
    collector := streamer.NewCollector("collector",
	    TextSocketCollector)

    extractor := streamer.NewProcessor("extractor",
        WordExtractor, streamer.NewIndexedChannelDemux(2, streamer.RandomIndex))

    filter := streamer.NewProcessor("filter",
        HashTagFilter, streamer.NewIndexedChannelDemux(2, streamer.RandomIndex))

    publisher := streamer.NewProcessor("publisher",
        HashTagPublisher, streamer.NewIndexedChannelDemux(5, streamer.NewGroupDemux("hashtag").GroupIndex))

    // execute pipeline
    sequence := collector.Execute()
    extracted := extractor.Execute(sequence)
    filtered := filter.Execute(extracted)
    <-publisher.Execute(filtered)
}

The call to <-publisher.Execute(filtered) instructs that the main process will be a consumer for the pipeline output stream and thus will be awaiting while the channel is open.

The main function now just needs to be a lightweight function that calls the RunPipeline method above.

func main() {
    // this is really fast, so we will need the microsecs in the logs to see something :D
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)

    // run pipeline
    RunPipeline()
}

Conclusion

In this example we saw how to to build a simple data processing pipeline through the use of Go channels. The full example can be found here with instructions on how to build and run it. It also contains some utilities like the ability to configure the application through a properties file. Feel free to check and try around. Feedback is also welcome.

Go also provides an awesome testing package out of the box that allows to build tests easily with a simple semantics that’s already inspiring evolutions of existing testing libraries used in other languages. One can take a look at Dan’s JGoTesting as an example of this.

The learning curve for Go is really small and it allows to build really fast applications.

Go-ing now, thanks for reading.