Following up Storm as a Metaphor, this post introduces you Apache Kafka with a simple metaphor between Kafka and a traditional newspaper home delivery.

Kafka is one of the most popular message brokers around. It’s indeed one of the choices for Big Data processing applications, in the sense that it can be used to for high-throughput communication of data between different systems. This post presents you the analogy

The Metaphor

So let’s just say you receive your favourite newspaper everyday at home. It’s not the newspaper’s publisher company that will deliver it to you, because that simply would not scale.

If that was the case, the publisher would have to know each single customer, while having to provide a newspaper with quality news. Also, each customer would have to know all publishers. Instead, the publishers delegate the delivery work to a broker, a distribution company, and the customers subscribe to that company only.

Then you can just subscribe to your favourite newspaper through that broker. Or you can subscribe a newspaper and a magazine. If you subscribe only a newspaper you will not be receiving any magazine.

Distribution Company

Typically a distribution company is spread across different warehouses, each one its own stock of newspapers and magazines in different shelves. This allows redundancy and better scaling across cities.

Subscribers are not aware of where the newspaper and magazines are stored as long as they can subscribe it. The beauty of this is that the publishers can scale at will and the consumers can subscribe or unsubscribe, without any of them knowing about each other.

How does this maps to Apache Kafka?

Kafka is a message broker. Publishers produce messages, send them to a broker’s topic which is persisted into the broker’s commit log for a pre-configured period of time. These messages can then be subscribed by multiple consumers. Kafka is persistent, it contains a commit log which is used to store the messages.

Kafka Distribution Company

Anatomy of a topic

The topic consists of a set of partitions, each one representing an ordered list of messages indexed by key. If not partition number is specified, the message’s key will be hashed in order to find the partition to produce to. Please note that an order is only kept inside one partition, so if one wants to ensure some kind of order, the key affinity might be used to get all the messages in the same partition.

Kafka Topic Anatomy

The producer

The producer is responsible for the packaging and dispatching. Here’s the setup:

Properties properties = PropertiesLoader.fromFile("newspaper-publisher.properties");
KafkaConsumer<String, byte[]> producer = new KafkaProducer<>(properties);

And here’s the publisher’s properties

bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

The packaging process occurs by serializing into byte[] (because in the example we used byte[]).

// pack it
byte[] pkg = packaging(subscription);
// dispatch it
producer.send(new ProducerRecord<String, byte[]>(topic, pkg));

The consumer

The consumer is responsible for receiving the package, unpack it, pay it, and do whatever he wants with it. Here’s the setup:

Properties properties = PropertiesLoader.fromFile("petersons.properties");
KafkaConsumer subscriber = new KafkaConsumer<>(properties);
subscriber.subscribe(newArrayList("awesome.newspaper.topic", "awesome.magazine.topic"));

And here’s the subscriber’s properties:

bootstrap.servers=localhost:9092
group.id=petersons.family
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

The consumption process occurs by consuming each record from the topic.

ConsumerRecords<String, byte[]> records = consumer.poll(timeout);
for (ConsumerRecord<String, byte[]> record : records) {
if (record.topic().equals("awesome.newspaper.topic")) {
// receive it
byte[] pkg = record.value();
// unpack it
Newspaper newspaper = unpackaging(pkg);
// payment!!
consumer.commitSync();
// read the newspaper
processData(newspaper);
}
}

The commit does not have to be sync. If you are receiving a newspaper at home you may have three options:

  • Pay as you receive the subscription – Call to commitSync
  • Leave a check, payment will eventually happen in the meanwhile – Call to commitAsync
  • Scheduled payment from your bank account – Configuration of the consumer properties enable.auto.commit and auto.commit.interval.ms, in which case you don’t call commitSync or commitAsync.

Consumer group

Let’s say that the subscription is made based on your family name (assuming there are no duplicate surnames), so that you don’t receive a different copy for each member of your family. The same way, another person subscribing the same title won’t pick your newspaper, but a different copy of it.

If multiple consumers have the same group identifier (family name), all consumers from that group will be acting as if they were consuming from a regular queue instead of a topic.

Kafka offset

Once the courier delivers the newspaper to the customer, the customer acknowledges saying it’s received, by paying it. When this happens, the subscription number is written down as being delivered, so the next time they won’t be delivering the same number again.

The offset is basically the position in the commit log where a specific subscriber is at a given moment for a specific topic. This is abstracted by Kafka’s consumer API.

Apache Zookeeper used to be the place to keep track of the Kafka’s offset in previous versions of Kafka, despite the consumers are free to chose a different storage location (e.g. MySQL). Since version 0.9.x that Kafka offers the ability for the consumer to store the offset in a durable, replicated, highly available topic in Kafka itself.

Apache Zookeeper is still used by Kafka to coordinate the broker nodes (the distribution company warehouses). Each partition will have one node that acts as the leader and all the other nodes acting as followers. A leader handles all the read/write requests for the partitions it’s responsible for. A partition is a small distribution unit inside the broker. If one of the nodes fails, the followers will take over his partitions, so the subscribers are never aware of that fail.

Going back and forth

Consumers can go forward and backward in the commit log by selecting the offset to read from. That’s useful, for example, if you have an issue in production and you want to replay all the messages from that very same topic by pointing a dark environment into to trace the problem.

That can be done by setting the auto.offset.reset property to earliest (or smallest for older consumer’s versions).

In the case a customer wakes up a bit dizzy one morning, and he doesn’t remember anything from the last week, he can decide to request the distribution company the news from the entire week so he gets up to date, or since ever, where “ever” represents the entire stock, which has a maximum, otherwise the distribution warehouses could run out of capacity.

This might be useful to deal with application’s bootstrap. If you stop an application consuming from Kafka configured to read from latest, it will not commit the offset so whenever it restarts, it will restart from whatever position it stopped, because that position is persisted. It would be like a family going on holidays and after coming back, reading and paying all the pending subscription copies.

But you can configure the consumer to bootstrap reading from the earliest offset, in which case it will start reading from the beginning of time. In this case, don’t forget to ensure that whatever operation you do from the consumed information is idempotent. Otherwise you may end up with strange behaviours.

Schema

Messages are sent in a specific format that must be a language understood in both sides (the schema). The key.serializer and value.serializer properties will tell the type of package we are sending our subscription in. They must be known in both sides. So in the consumer there will be the key.deserializer and value.deserializer.

Some popular options for (de)serializing when using Kafka:

The first one is schema-less while the last two options are schema oriented. The choice may depend on your requirements.

Retention policy

This distribution company has enough capacity to store a quite big stock of newspapers for, let’s say, a week. After that time the old newspapers just become garbage and go to the recycle bin (Please recycle, save the planet!).

In Kafka, the equivalent to a maximum stock’s retention of the distribution company is defined by the Kafka’s retention policy, which and can be set in the server configurations (log.retention.hours). This configuration can be overriden in a per-topic basis (topic.log.retention.hours).

It defines for how long we keep the log history in Kafka or, in other words, when it will start to be discarded (e.g. after 168 hours).

Conclusion

In this post we saw how Kafka can be compared to a distribution delivery company, as well as how the several involved entities get along.

Kafka is a publish-subscribe message broker for high-throughput communication of data, which scales very well.

There are a lot more to say, handle this as an introductory explanation. The official documentation is very complete. A good explanation of some details can also be found in Kafka in a Nutshell by Kevin Sookocheff. Cloudera’s Kafka for Beginners post also contains a good introductory explanation not only about Kafka but also on its differences to Apache Flume.

Kafka can be complemented with data processing technologies such as Storm or Spark in order to make something meaningful out of data or create data processing pipelines easily.

A full producer and consumer examples using Avro schemas (API version 0.9.0.1) can be found in this github repository. Feel free to try it around.

Happy messaging.