Developer Playground
Optimizing Kafka Consumer Performance
Partitions and Pods: 1:1 Mapping
- Simplest configuration approach
- Ensures uniform load distribution across consumers
- Each pod handles exactly one partition
- Scales linearly with partition count
Disadvantages:
- Increased overhead for partition leader election and metadata management
- Higher resource consumption across cluster
- More complex rebalancing when pods join/leave the cluster
- Limited by maximum partition count of a topic
ConcurrentKafkaListenerContainerFactory
Spring Kafka provides the ConcurrentKafkaListenerContainerFactory
to control concurrency within a consumer pod.
It determines how many threads will process Kafka messages in parallel.
Key considerations:
- Maximum concurrency is limited by the number of topic partitions
- Threads exceeding the number of partitions will stay idle
- High concurrency may cause increased CPU and memory pressure
- The default concurrency value is 1 (single-threaded message consumption)
- Concurrency should generally not exceed the number of partitions assigned to the consumer instance
Implementation example (Kotlin):
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, CloudEvent>
): ConcurrentKafkaListenerContainerFactory<String, CloudEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<String, CloudEvent>()
factory.consumerFactory = consumerFactory
factory.setConcurrency(3) // Adjust based on the number of partitions
return factory
}
Source code reference (Java):
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
// Other code omitted
private int concurrency = 1;
// Other code omitted
}
The ConsumerFactory
must be configured for the listener container to function correctly.
For example, a DefaultKafkaConsumerFactory
can be used with specific deserializers and properties.
Confluent Parallel Consumer Overview
Confluent's Parallel Consumer offers an alternative approach to enhance consumer throughput without needing additional partitions.
Key Features:
- Allows multiple threads to process messages from a single partition
- Bypasses scaling limits imposed by partition count
- Overcomes batch listener limitations, offering better error handling and retry capabilities
- Supports fine-grained per-message acknowledgments
Limitations:
- Limited community references and documentation
- No direct integration with Spring Kafka (as of April 2025)
- Independent from the Spring ecosystem
Resources:
Async Processing After Consume
Another approach to improve throughput is to offload the actual message processing to a separate thread pool after quickly acknowledging receipt of the message from Kafka.
Implementation
@Component
class KafkaMessageHandler(
private val asyncExecutor: AsyncExecutor
) {
private val logger = LoggerFactory.getLogger(KafkaMessageHandler::class.java)
@KafkaListener(topics = ["my-topic"])
fun handleMessage(message: ConsumerRecord<String, String>) {
try {
asyncExecutor.execute {
try {
processMessage(message)
} catch (e: Exception) {
logger.error("Error processing message: {}", e.message, e)
}
}
} catch (e: RejectedExecutionException) {
logger.warn("Thread pool is full, message will be retried")
throw e
}
}
private fun processMessage(message: ConsumerRecord<String, String>) {
// Simulate I/O-bound operation
Thread.sleep(100)
logger.info("Processed message: {}", message.value())
}
}
AsyncExecutor
@Bean(destroyMethod = "shutdown")
fun asyncExecutor(): ExecutorService {
return Executors.newFixedThreadPool(10)
}
Benefits:
- Improved throughput for I/O-bound operations
- Consumer thread quickly acknowledges messages and continues
- Good for operations where reliability isn't critical
Drawbacks:
- • Increased memory usage: All messages in a batch are stored in memory before processing, leading to higher memory consumption especially with large batches.
- • Higher latency: Batch listener waits for either the batch size to be reached or a timeout to occur, which can delay message processing compared to single-message processing.
- • Difficult to tune batch size: Setting the optimal batch size is non-trivial. A large size increases memory usage; a small size reduces efficiency.
- • Complex error handling: If a single message in a batch fails, it may affect the entire batch. This requires more sophisticated error-handling logic.
- • Complex retry logic: When some messages in a batch fail, you'll need custom logic to isolate and retry only those messages, which adds processing complexity.
Batch Listener
Using the @KafkaListener
annotation in Spring Kafka, you can configure your consumer to operate in batch mode to enhance processing efficiency.
Implementation
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<
return factory;
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(
List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
Drawbacks:
- • Increased memory usage: All messages in a batch are stored in memory before processing, leading to higher memory consumption especially with large batches.
- • Higher latency: Batch listener waits for either the batch size to be reached or a timeout to occur, which can delay message processing compared to single-message processing.
- • Difficult to tune batch size: Setting the optimal batch size is non-trivial. A large size increases memory usage; a small size reduces efficiency.
- • Complex error handling: If a single message in a batch fails, it may affect the entire batch. This requires more sophisticated error-handling logic.
- • Complex retry logic: When some messages in a batch fail, you'll need custom logic to isolate and retry only those messages, which adds processing complexity.
Advertisement