Developer Playground

Optimizing Kafka Consumer Performance

Updated: April 10, 2025

Partitions and Pods: 1:1 Mapping

  • Simplest configuration approach
  • Ensures uniform load distribution across consumers
  • Each pod handles exactly one partition
  • Scales linearly with partition count

Disadvantages:

  • Increased overhead for partition leader election and metadata management
  • Higher resource consumption across cluster
  • More complex rebalancing when pods join/leave the cluster
  • Limited by maximum partition count of a topic
Partition-Pod 1:1 Mapping Kafka Topic Partition 0 Partition 1 Partition 2 Partition 3 Consumer Pods Pod 1 (Partition 0) Pod 2 (Partition 1) Pod 3 (Partition 2) Pod 4 (Partition 3) Advantages: Simple, even load: 1 pod = 1 partition Disadvantages: Scales only with partitions, high coordination cost

ConcurrentKafkaListenerContainerFactory

Spring Kafka provides the ConcurrentKafkaListenerContainerFactory to control concurrency within a consumer pod. It determines how many threads will process Kafka messages in parallel.

Key considerations:

  • Maximum concurrency is limited by the number of topic partitions
  • Threads exceeding the number of partitions will stay idle
  • High concurrency may cause increased CPU and memory pressure
  • The default concurrency value is 1 (single-threaded message consumption)
  • Concurrency should generally not exceed the number of partitions assigned to the consumer instance

Implementation example (Kotlin):

@Bean
fun kafkaListenerContainerFactory(
    consumerFactory: ConsumerFactory<String, CloudEvent>
): ConcurrentKafkaListenerContainerFactory<String, CloudEvent> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, CloudEvent>()
    factory.consumerFactory = consumerFactory
    factory.setConcurrency(3) // Adjust based on the number of partitions
    return factory
}

Source code reference (Java):

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    // Other code omitted
    private int concurrency = 1;
    // Other code omitted
}

The ConsumerFactory must be configured for the listener container to function correctly. For example, a DefaultKafkaConsumerFactory can be used with specific deserializers and properties.

Confluent Parallel Consumer Overview

Confluent's Parallel Consumer offers an alternative approach to enhance consumer throughput without needing additional partitions.

Key Features:

  • Allows multiple threads to process messages from a single partition
  • Bypasses scaling limits imposed by partition count
  • Overcomes batch listener limitations, offering better error handling and retry capabilities
  • Supports fine-grained per-message acknowledgments

Limitations:

  • Limited community references and documentation
  • No direct integration with Spring Kafka (as of April 2025)
  • Independent from the Spring ecosystem

Resources:

Confluent Parallel Consumer Kafka Topic Partition 0 Partition 1 Parallel Consumer Parallel Consumer Buffer Worker Pool Worker 1 Worker 2 Worker 3 Worker 4 Benefits of Parallel Consumer: • Enables multi-threaded processing per partition • Scales consumption without increasing partitions • Supports individual message-level acknowledgement • Improves reliability over batch-based listeners

Async Processing After Consume

Another approach to improve throughput is to offload the actual message processing to a separate thread pool after quickly acknowledging receipt of the message from Kafka.

Implementation

@Component
class KafkaMessageHandler(
    private val asyncExecutor: AsyncExecutor
) {
    private val logger = LoggerFactory.getLogger(KafkaMessageHandler::class.java)

    @KafkaListener(topics = ["my-topic"])
    fun handleMessage(message: ConsumerRecord<String, String>) {
        try {
            asyncExecutor.execute {
                try {
                    processMessage(message)
                } catch (e: Exception) {
                    logger.error("Error processing message: {}", e.message, e)
                }
            }
        } catch (e: RejectedExecutionException) {
            logger.warn("Thread pool is full, message will be retried")
            throw e
        }
    }

    private fun processMessage(message: ConsumerRecord<String, String>) {
        // Simulate I/O-bound operation
        Thread.sleep(100)
        logger.info("Processed message: {}", message.value())
    }
}

AsyncExecutor

@Bean(destroyMethod = "shutdown")
fun asyncExecutor(): ExecutorService {
  return Executors.newFixedThreadPool(10)
}

Benefits:

  • Improved throughput for I/O-bound operations
  • Consumer thread quickly acknowledges messages and continues
  • Good for operations where reliability isn't critical

Drawbacks:

  • Increased memory usage: All messages in a batch are stored in memory before processing, leading to higher memory consumption especially with large batches.
  • Higher latency: Batch listener waits for either the batch size to be reached or a timeout to occur, which can delay message processing compared to single-message processing.
  • Difficult to tune batch size: Setting the optimal batch size is non-trivial. A large size increases memory usage; a small size reduces efficiency.
  • Complex error handling: If a single message in a batch fails, it may affect the entire batch. This requires more sophisticated error-handling logic.
  • Complex retry logic: When some messages in a batch fail, you'll need custom logic to isolate and retry only those messages, which adds processing complexity.
Reliable Async Processing Kafka Kafka Consumer submit to ExecutorService ExecutorService processMessage() ack.acknowledge() if success ✅ handleProcessingError() if fail ❌ Retry / DLT 1. Consume message 2. Submit to thread pool 3. Process → Ack / Error ⚠ Backpressure mechanism may be needed if thread pool is overloaded

Batch Listener

Using the @KafkaListener annotation in Spring Kafka, you can configure your consumer to operate in batch mode to enhance processing efficiency.

Implementation

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<
    return factory;
}

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(
    List<String> list,
    @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
    @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

Drawbacks:

  • Increased memory usage: All messages in a batch are stored in memory before processing, leading to higher memory consumption especially with large batches.
  • Higher latency: Batch listener waits for either the batch size to be reached or a timeout to occur, which can delay message processing compared to single-message processing.
  • Difficult to tune batch size: Setting the optimal batch size is non-trivial. A large size increases memory usage; a small size reduces efficiency.
  • Complex error handling: If a single message in a batch fails, it may affect the entire batch. This requires more sophisticated error-handling logic.
  • Complex retry logic: When some messages in a batch fail, you'll need custom logic to isolate and retry only those messages, which adds processing complexity.

Advertisement