Apache Storm is one of the tools that fits into the category of Big Data tools for real-time processing of data. This post will try to explain Storm with a simplified analogy to a simplified version the human digestive system.

The Metaphor

So let’s think of a human digestive system. The purpose of the entire digestion flow is to transform food (e.g. an apple) into nutrients that are sent into our blood stream. There are several parts of the human body participating in the food digestion.

What comes in and what goes out

  • Ingestible enters the system
  • Nutrients and waste leave the system

The digestion components

  • Lips
  • Mouth
  • Stomach
  • Small Intestine
  • Large Intestine
  • Villi

Each of the above elements has a well defined responsibility.

The responsibilities

Let’s say the lips serve as the opening for food intake, by perceiving an ingestible as being food that will then be sent to the mouth input channel. Note that a drug might also be considered an ingestible, though it’s not food, so we will discard it (Don’t do drugs!).

Mouth receives the food and is responsible for breaking it into smaller pieces and add saliva into them. Then, those small pieces fall into the Esophagus.

Stomach takes those smaller pieces of food from the Esophagus and melts them to breakdown proteins into smaller amino-acids to form a tick liquid called Chyme, which is sent into Pylorus.

Small Intestine receives the Chyme from the Pylorus and extracts the nutrients out of it, and send them into small channels Villi, so they can reach the Blood Stream. The remaining of Chyme it is sent down through Cecum.

Large Intestine receives that absorbed Chyme from the Cecum and extracts salt and water, and sends the remaining of it to… Well, you know what happens next.

The human digestive system

What?!

This is nothing more than a food processing pipeline. The whole process takes from 12 to 50 hours to complete. Gladly, we don’t have to wait that long to start eating again.

Storm is exactly that, a processing pipeline, a data processing pipeline. So let’s map our example in terms of storm concepts.

The human digestive system topology

Once an event is perceived as a semantically relevant entity, its information will always travel through the system in the form of a tuple. A tuple is a ordered list of elements.

Topology components and concepts

Spout

Spouts receive and interpret data from the outside and create the first tuple. The simpler way of implementing a spout is by extending BaseRichSpout.

public class Lips extends BaseRichSpout {

    private SpoutOutputCollector outputCollector;
    private Food food;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("food"));
    }

    @Override
    public void open(Map stormConf, TopologyContext context, SpoutOutputCollector outputCollector) {
        this.outputCollector = outputCollector;

        try {
            this.food = perceiveIngestableAsFood();
            if (food == null) {
                puke();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void nextTuple() {
        outputCollector.emit("food", new Values(food));
    }
}

Bolt

Each subsequent bolt will have a well defined responsibility, like doing some kind of computation over tuples (e.g. transforming, filtering, enriching, publishing to a database). The simpler way of implementing a bolt is to extend the BaseBasicBolt.

public Stomach extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector outputCollector) {
        Food food = (Food)tuple.getValueByField("food");
        Chyme chyme = produceChyme(food, gastricAcid);
        outputCollector.emit(new Values(chyme));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("chyme"));
    }
}

Streams

Streams are a way of communicating between storm components. One can declare multiple streams:

// small intestine bolt
outputFieldsDeclarer.declareStream("villi-channel", new Fields("nutrient"));
outputFieldsDeclarer.declareStream("cecum", new Fields("absorbedChyme"));

or can simply emit to the default one:

// stomach bolt
outputFieldsDeclarer.declare(new Fields("chyme"));

Tick tuples

For the sake of the example, let’s assume that the gastric acid is produced periodically by the stomach (We know it’s not strictly like that, but let’s keep the example). That gastric acid is being produced internally at the same time the food is coming.

Similarly, Storm has the concept of tick tuples. One can configure a bolt to send a tuple periodically to itself, that will come mixed with the regular tuples. The bolt’s logic need to identify whether it’s a tick tuple or a regular tuple and act upon it. The frequency of the tick is a bolt’s configuration.

public Stomach extends BaseBasicBolt {
    private GastricAcid gastricAcid = new GastricAcid();

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // configure how often a tick tuple will be sent to our bolt
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 20);
        return conf;
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector outputCollector) {
        if (isTickTuple(tuple)) {
            produceGastricAcid();
            return;
        }

        Food food = (Food)tuple.getValueByField("food");
        Chyme chyme = produceChyme(food, gastricAcid);
        outputCollector.emit(new Values(chyme));
    }

    private boolean isTickTuple(Tuple tuple) {
        return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("chyme"));
    }
}

Acking and Anchoring

These are concepts behind on how storm understand if a tuple is processed or not.

Acking

The tuples that are generated across the graph of spouts and bolts form a tuple tree. All leaves in a tuple tree must be acked by the component processing it (spout or bolt). If we fail instead of acking them, storm provides some message guarantee mechanisms that will retry again.

So even if we don’t like the ingestible, we need to ack it (mark it as processed), so we don’t try it again. Fortunately, the output collectors used in BaseRichSpout and the brothers BaseBasicBolt already provide implicit acking.

Anchoring

Anchoring is the ability of specifying the link between tuples in the tuple tree. For more complex operations like aggregates and joins we might need to anchor multiple input tuples into a single output tuple.

First declare your anchors:

private List<Tuple> anchors;

Let’s change our stomach example to allow us to anchor food tuples until we have enough gastric acid to melt it.

if (isTickTuple(tuple)) {
    produceGastricAcid();
    return;
}

if (!hasEnoughGastricAcid()) {
    anchors.add(tuple);
}

Chyme chyme = produceChyme(getAllPendingFood(anchors), gastricAcid);
outputCollector.emit(anchors, new Values(chyme));

Now you are telling Storm that all those food input tuples connect to a single chyme output tuple.

Putting everything together

So now we just need to glue all together. This is done by creating a topology class that will connect everything.

public class DigestiveSystemTopology {
    
    private StormTopology buildTopology() {
        TopologyBuilder builder = new TopologyBuilder();

        // set the spout
        builder.setSpout("lips", new Lips());

        // set the bolts
        builder.setBolt("mouth", new Mouth())
                .globalGrouping("lips");

        int stomachParallelismHint = 2; // to be read from config
        builder.setBolt("stomach-chamber", new Stomach(), stomachParallelismHint)
                .shuffleGrouping("mouth", new Fields("food"));

        builder.setBolt("small-intestine", new SmallIntestine())
                .shuffleGrouping("stomach-chamber");

        builder.setBolt("large-intestine", new LargeIntestine())
                .shuffleGrouping("small-intestine");

        int villiParallelismHint = 10; // to be read from config
        builder.setBolt("villi", new Villi(), villiParallelismHint)
                .fieldsGrouping("small-intestine", "nutrient");

        // build it
        return builder.createTopology();
    }

    public static void main(String[] args) {
        // ... topology startup goes here ...
    }

}

Pretty much self-explanatory. Well, I know, that parallelism and grouping concepts…

Parallelism

Imagine we could have a multi-chamber stomach, like a cow, or any other ruminant. The first two chambers of a ruminant stomach (Rumen and Reticulum) are basically responsible for fermentation.

So if we could just plugin another stomach chamber into our body, as well as any other parts, we would be able to accelerate our digestion process.

builder.setBolt("stomach-chamber", new Stomach(), stomachParallelismHint)

Obviously, if we add two stomach chambers but we keep only one small intestine, we will end up with a bottleneck.

Same with a topology, spouts and bolts need to be well scaled out in order to fit the volume and velocity of incoming data. Storm has the concepts of worker, executor and tasks.

  • A worker is a subset of a topology, runs in its own JVM and can hold multiple spouts and bolts. It would be like if we could split our digestive system and spread the several digestion components across them.
  • An executor is a thread spawned by the worker, and can hold several instances of the same component (spout or bolt). It would be like if we could have multiple stomach chambers.
  • A task is what performs the actual data processing. It would be equivalent to a stomach chamber.

Storm allows one to tune the number of workers, executors, and tasks by a simple configuration change, without having the need for a code change.

Grouping

Grouping methods allow one to specify how the bolts interchange tuples between them.

Global grouping ensures that every output tuple will always go to the same instance of mouth. That’s good enough here, as we always have one mouth per pair of lips. Should be used with caution, as obviously it’s not good for scaling.

Shuffle grouping round robins tuples between the destination bolt instances. In this case we have two stomach chambers (weird, huh?) and then the system just don’t care on which one the food ends up.

Fields grouping allows one to group tuples by a certain field. So let’s just assume we need that the same nutrient is always processed by the same Villi instance. Then we need to group that communication by the nutrient field.

Conclusion

There are a lot of concepts that are missing in this metaphor, but this describes the basics.

Storm provides the means for creating data processing pipelines, called topologies, that are composed by a graph of spouts and bolts and inner internal entities, called tuples, passing across them through streams.

It’s possible to parallelise storm topologies by increasing the number of workers, executors and tasks with a simple configuration change.

Acking and Anchoring are implicit in the base spout and bolt implementations, but one can implement/extend its parents in order to achieve more advanced features by doing acking and anchoring manually.

It’s also possible to configure a periodic tick to be sent internally to a bolt in order to do something in a periodic fashion. Note that these ticks are coming along with regular tuples, so the period might not be deterministic. Also, it’s not a good idea to have any heavy operation associated to a tick, or we can be punishing the regular flow.

Hope you’ve enjoyed the metaphor. Happy storming!