Spark is a very popular data processing framework from the Big Data landscape. It is implemented in Scala but provides APIs to non-JVM languages such as Python.

Python offers a rich set of features for numerical analysis and machine learning which makes it a powerful language to use with Spark data processing.

In this post we will build a Python application that reads text from an input network socket and prints the occurrence of each word by accumulating the count into the internal state.

Preparing the environment

To use Spark with Python we will need to install Spark distribution itself and Py4J Python library. Py4J enables a Python application to access to JVM objects dynamically. The following is a way of doing it from command line. Feel free to use your favourite method though.

curl http://d3kbcqa49mib13.cloudfront.net/spark-1.6.2-bin-hadoop2.6.tgz | tar -xzf - -C . && mv spark-1.6.2-bin-hadoop2.6 spark
pip install py4j==0.9

By the time of this writing, it’s important to use Py4J 0.9 because the latest (0.10) version has an issue when using Spark streaming.

Now we just need to define where is our Spark directory home and how can Python find the required Spark libraries.

export SPARK_HOME=<path/to/spark/folder>
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

You may want to add the above to your ~/.bashrc or ~/.bash_profile file.

Word occurrences count example

Our example will take text from an input socket stream in micro-batches, count the words occurrences and update the internal state which will be then printed to the system output stream.

Spark Stateful Streaming Example

Below is the full Python code to get the example working. In the next section, we will break it into pieces to analyse each part of it.

Digesting the example

Creating the streaming context

First you get your Spark context that is needed to create Spark RDDs (Resilient Distributed DataSets). RDDs are the primary Spark abstraction that provide actions (e.g. transformation, filtering, aggregation, etc).

To define the Spark context we need to define the master URL and the Application name. Master URL can be a Spark, Mesos or Yarn cluster URL or it can be defined as local to run in local mode. The argument [*] means that we will run Spark with as many worker threads as logical cores in the machine. Instead of * one could set the number of worker threads explicitly (e.g. local[2]).

sc = SparkContext("local[*]", "StreamWordCounter")

The Streaming context will be used to hang on a stream (in this example, in will be a socket stream). It’s important to notice that Spark is not a real-time streaming processing framework as Storm, we need to define a streaming context with a pre-defined micro-batching interval.

ssc = StreamingContext(sc, 10)

Checkpoint directory

The checkpoint directory is only necessary because our example is a streaming application. A streaming application is a always-on application that must be fault-tolerant. Checkpoints provide a provide a way of recovering to a safe stable application snapshot. Spark takes care of that for you, you just need to tell him where to store the checkpoint files.

ssc.checkpoint("/tmp")

Listening to the socket stream

This is pretty much straightforward. We just need to instruct our streaming context to listen to a socket on a specified host and port so Spark is able to listen to whatever is written there.

text = ssc.socketTextStream("localhost", 9999);

Counting word occurrences

Now the core part, counting the words. The below is translated to english as follows:

For each line in the micro-batch, split it on whitespaces to create a list of words, create a tuple word/count, starting on count 1, and then merge all the tuples with the same key by summing the corresponding count value.

countStream = text.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a + b)

Updating the internal state

Spark provides stateful streaming out of the box. For that you need an update function that receives the data patch and the state specification and patches the state with the new data. In our example, the state specification is a simple number (the total number of counts) and the data patch is also a simple aggregation number (the current count).

totalCounts = countStream.updateStateByKey(updateTotalCount)

Our update total count function is responsible for taking the latest state and update it with the new data:

def updateTotalCount(currentCount, countState):
if countState is None:
countState = 0
return sum(currentCount, countState)

Running the example

python StreamWordCounter.py

You can use nc linux command to act as a socket stream producer.

$ nc -lk 9999
hello world
hello awesome world
hello there

In your streaming application you shall now see the following output:

(u'world', 2)
(u'awesome', 1)
(u'there', 1)
(u'hello', 3)

This is a simple print of the tuples with the total count for each word, but you can do whatever you want with that data (e.g. saving in a file). Take this as a simple quick start example.

Conclusion

This post intended to be a quick intro to Stateful Spark Streaming with Python. We can see how easy it can be to setup a simple example that takes lines from a network socket and count the occurrence of each word by accumulating the internal state with the count for each word.

Happy Sparking!