1. Welcome Developer Playground by Giri

Developer Playground

Optimizing Kafka Consumer Performance

Updated: April 10, 2025

Partitions and Pods: 1:1 Mapping

  • Simplest configuration approach
  • Ensures uniform load distribution across consumers
  • Each pod handles exactly one partition
  • Scales linearly with partition count

Disadvantages:

  • Increased overhead for partition leader election and metadata management
  • Higher resource consumption across cluster
  • More complex rebalancing when pods join/leave the cluster
  • Limited by maximum partition count of a topic
Partition-Pod 1:1 MappingKafka TopicPartition 0Partition 1Partition 2Partition 3Consumer PodsPod 1 (Partition 0)Pod 2 (Partition 1)Pod 3 (Partition 2)Pod 4 (Partition 3)Advantages:Simple, even load: 1 pod = 1 partitionDisadvantages:Scales only with partitions, high coordination cost

ConcurrentKafkaListenerContainerFactory

Spring Kafka provides the ConcurrentKafkaListenerContainerFactory to control concurrency within a consumer pod. It determines how many threads will process Kafka messages in parallel.

Key considerations:

  • Maximum concurrency is limited by the number of topic partitions
  • Threads exceeding the number of partitions will stay idle
  • High concurrency may cause increased CPU and memory pressure
  • The default concurrency value is 1 (single-threaded message consumption)
  • Concurrency should generally not exceed the number of partitions assigned to the consumer instance

Implementation example (Kotlin):

@Bean
fun kafkaListenerContainerFactory(
    consumerFactory: ConsumerFactory<String, CloudEvent>
): ConcurrentKafkaListenerContainerFactory<String, CloudEvent> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, CloudEvent>()
    factory.consumerFactory = consumerFactory
    factory.setConcurrency(3) // Adjust based on the number of partitions
    return factory
}

Source code reference (Java):

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    // Other code omitted
    private int concurrency = 1;
    // Other code omitted
}

The ConsumerFactory must be configured for the listener container to function correctly. For example, a DefaultKafkaConsumerFactory can be used with specific deserializers and properties.

Confluent Parallel Consumer Overview

Confluent's Parallel Consumer offers an alternative approach to enhance consumer throughput without needing additional partitions.

Key Features:

  • Allows multiple threads to process messages from a single partition
  • Bypasses scaling limits imposed by partition count
  • Overcomes batch listener limitations, offering better error handling and retry capabilities
  • Supports fine-grained per-message acknowledgments

Limitations:

  • Limited community references and documentation
  • No direct integration with Spring Kafka (as of April 2025)
  • Independent from the Spring ecosystem

Resources:

Confluent Parallel ConsumerKafka TopicPartition 0Partition 1Parallel ConsumerParallel Consumer BufferWorker PoolWorker 1Worker 2Worker 3Worker 4Benefits of Parallel Consumer:• Enables multi-threaded processing per partition• Scales consumption without increasing partitions• Supports individual message-level acknowledgement• Improves reliability over batch-based listeners

Async Processing After Consume

Another approach to improve throughput is to offload the actual message processing to a separate thread pool after quickly acknowledging receipt of the message from Kafka.

Implementation


@Component
class KafkaMessageHandler {

    private val taskQueue = LinkedBlockingQueue<Runnable>(1000)

    private val asyncExecutor = ThreadPoolExecutor(
        10, 10,
        60L, TimeUnit.SECONDS,
        taskQueue,
        Executors.defaultThreadFactory(),
        RejectedExecutionHandler { runnable, _ ->
            // handle backpressure or overflow
            println("Queue is full. Dropping message or handling backpressure.")
        }
    )

    @KafkaListener(
        topics = ["your-topic"],
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun consume(
        message: CloudEvent,
        ack: Acknowledgment
    ) {
        try {
            asyncExecutor.submit {
                try {
                    processMessage(message)
                    ack.acknowledge()
                } catch (e: Exception) {
                    handleProcessingError(message, e)
                }
            }
        } catch (e: RejectedExecutionException) {
            println("Task rejected. Queue capacity exceeded.")
            handleQueueOverflow(message)
        }
    }

    fun processMessage(message: CloudEvent) {
        // business logic
    }

    fun handleProcessingError(message: CloudEvent, ex: Exception) {
        // error handling logic
    }

    fun handleQueueOverflow(message: CloudEvent) {
        // handle queue overflow (e.g. send to DLT)
    }
}

AsyncExecutor

@Bean(destroyMethod = "shutdown")
fun asyncExecutor(): ExecutorService {
  return Executors.newFixedThreadPool(10)
}

Benefits:

  • Improved throughput for I/O-bound operations
  • Consumer thread quickly acknowledges messages and continues
  • Good for operations where reliability isn't critical

Drawbacks:

  • Overhead in managing failed processing and retries
  • Need for backpressure mechanism if processing is slower than consumption
  • Potential for message loss if service crashes after acknowledging
Reliable Async ProcessingKafkaKafka Consumersubmit to ExecutorServiceExecutorServiceprocessMessage()ack.acknowledge() if success ✅handleProcessingError() if fail ❌Retry / DLT1. Consume message2. Submit to thread pool3. Process → Ack / Error⚠ Backpressure mechanism may be needed if thread pool is overloaded

Batch Listener

Using the @KafkaListener annotation in Spring Kafka, you can configure your consumer to operate in batch mode to enhance processing efficiency.

Implementation

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<
    return factory;
}

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(
    List<String> list,
    @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
    @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

Drawbacks:

  • Increased memory usage: All messages in a batch are stored in memory before processing, leading to higher memory consumption especially with large batches.
  • Higher latency: Batch listener waits for either the batch size to be reached or a timeout to occur, which can delay message processing compared to single-message processing.
  • Difficult to tune batch size: Setting the optimal batch size is non-trivial. A large size increases memory usage; a small size reduces efficiency.
  • Complex error handling: If a single message in a batch fails, it may affect the entire batch. This requires more sophisticated error-handling logic.
  • Complex retry logic: When some messages in a batch fail, you'll need custom logic to isolate and retry only those messages, which adds processing complexity.
Copyright © 2025 Giri Labs Inc.·Trademark Policy