Developer Playground
One of the most common challenges when working with message brokers like Kafka is controlling the rate at which messages are processed. Especially when a large volume of messages arrives in a short period, it can lead to system overload or impact downstream services. This article explores how to implement rate limiting in Spring Kafka consumers using an internal queue approach.
We'll implement a rate-limited consumer using Spring Kafka with the following approach:
ConcurrentLinkedQueue
as an internal bufferLet's look at the key parts of the implementation:
@Component
class MobilePushRequestConsumer(
private val historyService: MobilePushHistoryWriteService,
private val producer: MobilePushProducer
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val queue: ConcurrentLinkedQueue<MobilePushRequest> = ConcurrentLinkedQueue()
@Value("\${mobile-push.process-per-second}")
private lateinit var processPerSecond: String
@PostConstruct
private fun setup() {
logger.info("plus reward push process per second : {}", processPerSecond)
}
@KafkaListener(
topics = [MOBILE_PUSH_FROM_OTHER_SERVICE],
containerFactory = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY
)
fun consumeTargetMobilePushRequest(message: CloudEvent) {
try {
// Deserialize the message
val data = CloudEventUtils.mapData(
message,
PojoCloudEventDataMapper.from(jacksonObjectMapper(), MobilePushRequest::class.java)
)?.value!!
val type = MobilePushType.fromParamKey(data.type)
// For specific message type, add to queue instead of processing immediately
if (type == MobilePushType.HEAVY) {
queue.add(data)
logger.info("Added to queue: {}", data)
} else {
// Process other types immediately
// ...processing logic omitted...
}
} catch (e: Exception) {
logger.error("An error occurred while consuming the request", e)
throw e
}
}
// Scheduled task that runs every second to process messages from the queue
@Scheduled(fixedRate = 1000)
fun processQueue() {
val batch = mutableListOf<MobilePushRequest>()
// Take only the configured number of messages from the queue
repeat(processPerSecond.toInt()) {
val request = queue.poll() ?: return@repeat
batch.add(request)
}
if (batch.isNotEmpty()) {
logger.info("Processing batch of size: {}", batch.size)
try {
// Process each message in the batch
batch.forEach { request ->
// Processing logic omitted...
}
logger.info("Batch processing complete")
} catch (e: Exception) {
logger.error("Error during batch processing", e)
}
}
}
}
@Scheduled
that runs at a fixed rate (1000ms)The processing rate is controlled by a configuration property:
# application.properties or application.yml
mobile-push:
process-per-second: 50 # Process 50 notifications per second
Proper error handling is crucial in any message processing system. When processing fails, we can use a Dead Letter Topic (DLT) to store messages that couldn't be processed:
@Scheduled(fixedRate = 1000)
fun processQueue() {
val batch = mutableListOf<MobilePushRequest>()
repeat(processPerSecond.toInt()) {
val request = queue.poll() ?: return@repeat
batch.add(request)
}
if (batch.isNotEmpty()) {
logger.info("Processing batch of size: {}", batch.size)
batch.forEach { request ->
try {
// ...processing logic omitted...
} catch (e: Exception) {
logger.error("Error processing message: {}", request, e)
// Send to Dead Letter Topic
sendToDLT(request, e)
}
}
logger.info("Batch processing complete")
}
}
private fun sendToDLT(request: MobilePushRequest, exception: Exception) {
try {
// Create a wrapped message with the original request and error details
val dltMessage = DeadLetterMessage(
originalMessage = request,
exceptionMessage = exception.message,
exceptionClass = exception.javaClass.name,
timestamp = System.currentTimeMillis()
)
// Send to a dedicated Dead Letter Topic
deadLetterProducer.send("mobile-push-dlt", dltMessage)
logger.info("Message sent to DLT: {}", request)
} catch (e: Exception) {
logger.error("Failed to send message to DLT: {}", request, e)
}
}
With this approach, messages that fail processing are:
You can also implement a separate consumer for the DLT that could attempt to reprocess messages after a delay or alert operations staff about persistent failures.
Implementing rate limiting for Kafka consumers using an internal queue is a simple yet effective approach for controlling message processing rates. This pattern is particularly useful for high-volume notification systems where controlling the throughput is essential for system stability and downstream service protection.
The implementation shown here provides a configurable, stable way to handle varying loads while ensuring all messages are eventually processed. For production systems, consider the additional factors mentioned in the considerations section to ensure reliability and robustness.