Go/Golang is powerful language for developing efficient server side applications. I’ve been playing around with Go lately and decided to share some experiments. So here I present a simple data processing pipeline using Golang channels.
The example is far from being a full framework but it’s rich regarding the usage language features that I hope they can be useful as a getting started guide or a quick reference.
Go brings a lot of what’s already offered by many other languages (e.g. C, Java, etc) but have some unique characteristics that makes it an appealing language (e.g. ease of deployment, package management, performance, channels).
Channels and Routines
A channel is the way two go routines have to communicate with one another through messaging. They are based on the semantics of Communicating Sequential Processes (CSP).
Go routines are independent lightweight threads designed to run concurrently with each other. The Go runtime will manage the Go routines and multiplex them between real operating system threads as needed, thus optimizing potential parallel execution and CPU usage by avoiding idle times.
This model favors the principle of sharing memory by communicating instead of communicating by sharing memory as traditional approaches (e.g. Java, C++) do, allowing for a more efficient way of designing concurrent applications by avoiding locks and contention accessing shared resources.
Channels and routines are the basis of some really cool tools these days, I’d recommend taking a look at fwd - network port forwarder as an example.
What we are building?
In this example we are building a simple processing pipeline that consumes a text line from a socket and sends it through a series of processes to extract independent words, filter the ones starting with # and printing the result to the console. For this, a set of structures and functions were created so we can try around and build other kind of pipelines at will.
How is this designed?
Below is a picture of how this is designed. The basic elements are collectors, processors and messages. A message is part of the internal domain. It’s basically a set of key value pairs that are created in the collector and exchanged through several processors.
A collector is a simple structure handles some commonalities to all collectors like naming and channel creation/closing.
It calls a custom function that is responsible for actually collecting the data from a source and publish it one or multiple messages to a given output channel. The custom function are required to obey a predetermined signature, defined as below.
The execute function is responsible for dealing with the creation and closing of the output channel and call the custom collect function in the process. The custom function that writes into the output channel needs to run inside a independent routine. The WaitGroup allows the main process that triggers the Go routine to wait for it to finish. After creating a channel, two routines are triggered. One that does the collection and signals when its done. Another one that waits by the done signal, and closes the channel upon receiving it.
A processor is an independent unit that have those same commonalities like naming and channel creation/closing.
Unlike collectors, they receive a stream of messages through an input channel and they have ability to demultiplex those messages into multiple internal tasks (go routines) through the use of internal channels, so they can potentially be processed in parallel. As it happens with the collectors, the custom process function must obey to a predetermined signature, defined as below:
The execute function is responsible for dealing with the creation and closing of the output channel, calling the demultiplexing function that will forward the messages to the tasks that will execute the custom process function and output the result to a given output channel.
There are two provided ways of demultiplexing messages inside a processor. One of them is by randomly shuffling a task index, another is by selecting the task index based message value associated to a given key (e.g. all price updates for the same stock id go through the same channel). Though, as the demultiplexer is used as an interface inside the processors, we can inject any other custom implementation.
Then the demultiplexer provides the execute function that handles the creation and closing of the required output channels for a given input channel and calls a custom function that does the actual assignment of an index to a given message.
Now let’s build our pipeline using the mechanisms defined above.
Sample Custom Functions
So now we will make use of the above to build the actual pipeline. Let’s first create a sample collect function that reads from a socket and creates a message to send downstream.
Now let’s create a sample process function that picks the lines and splits them into words.
And now another sample process that filters out the words that does not start with a # and sends downstream the ones that do.
We just need one more custom process function that will pickup the filtered messages and logs them into the console.
Connecting the dots
And finally, let’s glue all of them together in a pipeline.
The call to <-publisher.Execute(filtered) instructs that the main process will be a consumer for the pipeline output stream and thus will be awaiting while the channel is open.
The main function now just needs to be a lightweight function that calls the RunPipeline method above.
In this example we saw how to to build a simple data processing pipeline through the use of Go channels. The full example can be found here with instructions on how to build and run it. It also contains some utilities like the ability to configure the application through a properties file. Feel free to check and try around. Feedback is also welcome.
Go also provides an awesome testing package out of the box that allows to build tests easily with a simple semantics that’s already inspiring evolutions of existing testing libraries used in other languages. One can take a look at Dan’s JGoTesting as an example of this.
The learning curve for Go is really small and it allows to build really fast applications.
Go-ing now, thanks for reading.
subscribe via RSS