Tuesday, March 5, 2013

Custom Partitioners for Kafka 0.8.0

One of the nice features of Kafka is the ability to define the partitioning algorithm. This allows the user to define the key that should be used for partitioning, instead of letting Kafka randomly assign it. However it wasn't obvious how to do this at first, turns out you need a couple of things defined to make it work.

First, is you need to tell Kafka the name of your Partitioner class:

props.put("partitioner.class", "com.curtinhome.kafka.playproducer.OrganizationPartitioner");

Then tell Kafka how to serialize the key so the Partitioner class can be called with the value:

 props.put("serializer.class", "kafka.serializer.StringEncoder");
 props.put("key.serializer.class", "kafka.serializer.StringEncoder");


Here we're telling Kafka that both the message being written and the key are Strings. If you wanted to pass an Integer key, you'd need to provide a class that coverts the Integer to a byte array.

Now when you write the message, you pass the Key and the Message:

        long events = Long.parseLong(args[0]);
        int blocks = Integer.parseInt(args[1]);

        Random rnd = new Random();
        Properties props = new Properties();
        props.put("broker.list",  "broker1.atlnp1:9092,broker2.atlnp1:9092,broker3.atlnp1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "com.curtinhome.kafka.playproducer.OrganizationPartitioner");
        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);
          for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String msg = runtime + "," + (50 + nBlocks) + "," + nEvents+ "," + rnd.nextInt(1000);
                KeyedMessage data =
                    new KeyedMessage("test1", String.valueOf(nBlocks), msg);
                producer.send(data);
            }
        }
        producer.close();


The Partitioner is pretty straight forward:

public class OrganizationPartitioner implements Partitioner {
    public OrganizationPartitioner(VerifiableProperties props) {

    }
    public int partition(String key, int a_numPartitions) {
        long organizationId = Long.parseLong(key);
        return (int) (organizationId % a_numPartitions);
    }

}

The number of partitions is provided by Kafka based on the topic configuration.

9 comments:

Anonymous said...

Does this mean while consuming I will be able to choose the message offset from particular partition?
For example while running multiple partitions is it possible to choose from one specific partition i.e partition 0?

Chris Curtin said...

Yes. Using the Simple Consumer you can define which partition to listen on and which offset to start reading from.

With the Consumer Groups you don't have this option, but it handles all the offset management for you, so there is a trade-off in functionality.

Anonymous said...

Thanks for such a quick reply Chris.. Followed your example and it was helpful, what I was trying to achieve is if I mention a key (say 10) while producing the message then it first check if the available partition exists or not, if it does it should simply send it to that partition otherwise it should send it to partition 0 . After doing some searches I found the number of available partition can be obtained by the "TopicMetadataRequest(topic_name) method" .. but is it anyhow possible to check in my custom partitioner class? so that I can check in runtime if the key given by the producer is a valid one or not ..

Also what should be the approach if I want to use a string as key ( for example "topic_one_key" ) in the producer.

Chris Curtin said...

In your partitioner you get the # of partitions passed to you so you can do the logic you've described there.

As for the string, you need to decide your mapping to the integer partitions. So hash it to an integer, convert the first character to a number (0-25) etc.

Unknown said...

Thanks Chris for post.
Did you find way to retrieve messages from Kafka by OrganisationId in your case? As I see consumer should apply same hash function to find partition by OrgId. In case of OrganisationId collision we will need to retrieve all data from partition and filter on the client side.
Am I right?

Unknown said...

I'm new to kafka. I had a basic doubt in the code . How the Values key and numPartitions are assigned or passed in the above implementation. Suppose say my Ip is localhost and numPartitions = 3 .. How it will do the work

Chris Curtin said...

The key is being passed by the code creating the message.

In the example, it is the nBlocks passed as a String
KeyedMessage data = new KeyedMessage("test1", String.valueOf(nBlocks), msg);

This is contrived but can be whatever you want in your data to partition by.

The number of partitions is being passed by Kafka and defined when the topic is configured.

Anonymous said...

Hi Chris,
How does this work with multiple topics? How does the partitioner know for which topic is it invoked? Does kafka create separate Partitioner instances for each topic?

Thanks,
Ritesh

Sid said...

Hi,
After pushing the message to kafka, how can i verify which message is going to which partition ? More specifically, Is there any command line argument which i can use to start a consumer from a single partition directly ?