Categories
Kafka

Creating Custom Kafka Partitioners

If you’ve used Kafka, you’ve likely heard about partitions. Kafka allows you to partition the data in a given topic so that the processing work can be divided among multiple nodes. Thus partitioning of the data allows more data to be processed in parallel.

Kafka’s default logic will attempt to evenly distribute messages into the topic’s partitions. This is achieved with what is essentially a round-robin algorithm, assigning each message to a different partition in a circle. This default distribution works fine for most use cases.

However, Kafka supports the ability to provide a custom partitioning algorithm. When might you want to use a custom partitioning algorithm? One example is where you want to ensure data locality when running a service that will be used in further processing of the message.

If you’re planning to use custom partitioning this quick guide should help you.

Prerequisites and Setup

This guide will assume you already have Kafka and Zookeeper up and running, for more on that see this guide. For this guide we’ll also be setting up our project using Maven.

First off, we’ll create a topic with a partition from command line.

bin/kafka-topics.sh --create \
                    --bootstrap-server localhost:9092 \
                    --replication-factor 1 \
                    --partitions 3 \
--topic custom-partitioned-topic

Coding the Custom Partitioner

Once we have that running we can start coding.

First, we code our partitioner. Our partitioner will take the values that we send from our producer and return the partition that that value would map to.

package com.mustardgrain.blog;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    public void configure(Map configs) {
    }

    public void close() {
    }

    public int partition(String topic,
                         Object key,
                         byte[] keyBytes,
                         Object value,
                         byte[] valueBytes,
                         Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partitionValue = Integer.valueOf((String) value);

        if (partitionValue > (numPartitions - 1))
            return numPartitions - 1;

        return partitionValue;
    }

}

Now running the CustomProducer in the same directory as the CustomPartitioner will run messages alternately through partitions 0, 1, and 2.

Coding the Producer to Use the Custom Partitioner

This is a super simple Kafka Producer that will just send out a message to our topic values 0, 1, and 2:

package com.mustardgrain.blog;

import java.util.*;
import org.apache.kafka.clients.producer.*;

public class CustomProducer {

    public static void main(String[] args) throws Exception{

        String topicName = "custom-partitioned-topic";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", "com.mustardgrain.blog.CustomPartitioner");

        Producer producer = new KafkaProducer
                (props);

        for(int i = 0; i < 40; i++) {
            int v = i % 3;
            producer.send(new ProducerRecord(topicName, String.valueOf(v), String.valueOf(v)));
        }
        System.out.println("Message sent successfully");
        producer.close();
        System.out.println("SimpleProducer Completed.");
    }
}

Validating Your Partition Logic

You can get info on your partitions in your partitioner using:

List partitions = cluster.partitionsForTopic(topic);

You can use the information from the PartitionInfo class in your partitioning, for instance getting the number of partitions.

In order to make sure that the partitioner is working as expected you can run the following from the command line in your Kafka bin directory:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list localhost:9092 \
    --topic custom-partitioned-topic

Another thing that you can do in order to check that the message is coming through the correct partition is to use the data in your Kafka consumer. For example:

package com.mustardgrain.blog;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MessageConsumerRunner {

    public static void main(String[] args) {
        MessageConsumer consumer = new MessageConsumer();
        Thread thread = new Thread(consumer);
        thread.start();
        
        try {
            Thread.sleep(100000);
        } catch (InterruptedException ie) {
        }
    }

    private static class MessageConsumer implements Runnable {

        private final KafkaConsumer consumer;
        private final String topic;

        public MessageConsumer() {
            Properties prop = createConsumerConfig();
            this.consumer = new KafkaConsumer(prop);
            this.topic = "custom-partitioned-topic";
            this.consumer.subscribe(Collections.singletonList(this.topic));
        }

        private Properties createConsumerConfig() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "custom-partitions");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return props;
        }

        public void run() {
            while (true) {
                ConsumerRecords records = consumer.poll(100);
                for (final ConsumerRecord record : records)
                    System.out.println("Message is " + record.value() + ",Printed from Partition: " + record.partition());
            }
        }

    }

}

As you can see when you get a record out of Kafka you can get the ID of the partition that the message came out of.

Conclusion

If you want to create your own custom partitioner the key parts are this:

  1. In the Producer make sure you have this line while setting up your properties: props.put("partitioner.class", "class.name.of.CustomPartitioner");
  2. In your code for your partitioner, make sure the class implements the org.apache.kafka.common.Partitioner interface.

The partition method is where the magic happens. Using the parameters given and whatever magical extra logic you need, return the desired custom partition ID that you want to push this message to and Kafka will handle the rest.