Article From:

kafkaWhen sending a message, it needs to be encapsulated into a ProducerRecord:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException("Invalid timestamp " + timestamp);
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;

What we need to pay attention to is partition and key.

kafkaWhen calling send, it actually put the message in memory and did not send it out. Before entering the memory queue, which partiton should the accounting message be placed in?

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    // ignore
    int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); // partitonThe partiton used for the specific placement of a computer message
    if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            return result.future;
    // ignore

Let’s analyze the partiton method:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition(); // ProducerRecordMedium partiton parameters

        if (partition != null) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
            int lastPartition = partitions.size() - 1;
            // they have given us a partition, use it
            if (partition < 0 || partition > lastPartition) {
                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
            return partition; // When partiton is specified, the message is sent to the specified partiton.

        // Otherwise, use partitioner to calculate the transmitted partiton based on the key parameter of ProducerRecord.
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,

You can specify the “partitioner.class” configuration item in the configuration using custom partitioner, and custom partitioner needs to implement the Partitioner interface:

public interface Partitioner extends Configurable {

     * Compute the partition for the given record.
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

     * This is called when partitioner is closed.
    public void close();


If no “partitioner.class” configuration is specified, the default partitioner:DefaultPartitioner is used. Let’s look at the allocation of DefaultPartitioner

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); // Gets the partiton list, which is updated when metadata is updated, and metadata is updated every 30s by default.
        int numPartitions = partitions.size();
        if (keyBytes == null) { // If ProducerRecord does not transmit key, it starts with a random number and uses round-robin mode.
            int nextValue = counter.getAndIncrement(); // counterBeing initialized as a random value, increasing each time
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
        } else { // Select a patition by hash for keyBytes
            // hash the keyBytes to choose a partition
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Leave a Reply

Your email address will not be published. Required fields are marked *