Developer Playground
Spring Kafka provides the ConcurrentKafkaListenerContainerFactory
to control concurrency within a consumer pod. It determines how many threads will process Kafka messages in parallel.
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, CloudEvent>
): ConcurrentKafkaListenerContainerFactory<String, CloudEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<String, CloudEvent>()
factory.consumerFactory = consumerFactory
factory.setConcurrency(3) // Adjust based on the number of partitions
return factory
}
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
// Other code omitted
private int concurrency = 1;
// Other code omitted
}
The ConsumerFactory
must be configured for the listener container to function correctly. For example, a DefaultKafkaConsumerFactory
can be used with specific deserializers and properties.
Confluent's Parallel Consumer offers an alternative approach to enhance consumer throughput without needing additional partitions.
Another approach to improve throughput is to offload the actual message processing to a separate thread pool after quickly acknowledging receipt of the message from Kafka.
@Component
class KafkaMessageHandler {
private val taskQueue = LinkedBlockingQueue<Runnable>(1000)
private val asyncExecutor = ThreadPoolExecutor(
10, 10,
60L, TimeUnit.SECONDS,
taskQueue,
Executors.defaultThreadFactory(),
RejectedExecutionHandler { runnable, _ ->
// handle backpressure or overflow
println("Queue is full. Dropping message or handling backpressure.")
}
)
@KafkaListener(
topics = ["your-topic"],
containerFactory = "kafkaListenerContainerFactory"
)
fun consume(
message: CloudEvent,
ack: Acknowledgment
) {
try {
asyncExecutor.submit {
try {
processMessage(message)
ack.acknowledge()
} catch (e: Exception) {
handleProcessingError(message, e)
}
}
} catch (e: RejectedExecutionException) {
println("Task rejected. Queue capacity exceeded.")
handleQueueOverflow(message)
}
}
fun processMessage(message: CloudEvent) {
// business logic
}
fun handleProcessingError(message: CloudEvent, ex: Exception) {
// error handling logic
}
fun handleQueueOverflow(message: CloudEvent) {
// handle queue overflow (e.g. send to DLT)
}
}
@Bean(destroyMethod = "shutdown")
fun asyncExecutor(): ExecutorService {
return Executors.newFixedThreadPool(10)
}
Using the @KafkaListener
annotation in Spring Kafka, you can configure your consumer to operate in batch mode to enhance processing efficiency.
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<
return factory;
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(
List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}