1. Welcome Developer Playground by Giri

Developer Playground

Controlling Processing Rate in Kafka Consumers

Updated: March 31, 2025

Introduction

One of the most common challenges when working with message brokers like Kafka is controlling the rate at which messages are processed. Especially when a large volume of messages arrives in a short period, it can lead to system overload or impact downstream services. This article explores how to implement rate limiting in Spring Kafka consumers using an internal queue approach.

The Problem

  • Need to send push notifications to many users simultaneously
  • Processing all notifications at once can overwhelm notification services
  • Want to control the processing rate (e.g., X notifications per second)
  • Need to maintain message ordering and ensure delivery
Kafka Consumer with Rate Limiting Queue and DLTKafka TopicKafka ConsumerReads messagesfrom Kafkaand enqueuesConcurrentLinkedQueueScheduled Task (1/sec): Process N messagesDead Letter TopicFailed messages with error infoSuccessProcess messageFailureSend to DLT1. Consume2. Queue3. Process4a. Success4b. Error5. Send to DLT

The Solution

We'll implement a rate-limited consumer using Spring Kafka with the following approach:

  • Use a ConcurrentLinkedQueue as an internal buffer
  • Configure a Kafka listener to add messages to the queue
  • Create a scheduled task to process messages at a controlled rate
  • Make the processing rate configurable via application properties

Implementation

Let's look at the key parts of the implementation:

@Component
class MobilePushRequestConsumer(
    private val historyService: MobilePushHistoryWriteService,
    private val producer: MobilePushProducer
) {
    private val logger = LoggerFactory.getLogger(javaClass)
    private val queue: ConcurrentLinkedQueue<MobilePushRequest> = ConcurrentLinkedQueue()

    @Value("\${mobile-push.process-per-second}")
    private lateinit var processPerSecond: String

    @PostConstruct
    private fun setup() {
        logger.info("plus reward push process per second : {}", processPerSecond)
    }

    @KafkaListener(
        topics = [MOBILE_PUSH_FROM_OTHER_SERVICE],
        containerFactory = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY
    )
    fun consumeTargetMobilePushRequest(message: CloudEvent) {
        try {
            // Deserialize the message
            val data = CloudEventUtils.mapData(
                message,
                PojoCloudEventDataMapper.from(jacksonObjectMapper(), MobilePushRequest::class.java)
            )?.value!!

            val type = MobilePushType.fromParamKey(data.type)

            // For specific message type, add to queue instead of processing immediately
            if (type == MobilePushType.HEAVY) {
                queue.add(data)
                logger.info("Added to queue: {}", data)
            } else {
                // Process other types immediately
                // ...processing logic omitted...
            }
        } catch (e: Exception) {
            logger.error("An error occurred while consuming the request", e)
            throw e
        }
    }

    // Scheduled task that runs every second to process messages from the queue
    @Scheduled(fixedRate = 1000)
    fun processQueue() {
        val batch = mutableListOf<MobilePushRequest>()

        // Take only the configured number of messages from the queue
        repeat(processPerSecond.toInt()) {
            val request = queue.poll() ?: return@repeat
            batch.add(request)
        }

        if (batch.isNotEmpty()) {
            logger.info("Processing batch of size: {}", batch.size)
            try {
                // Process each message in the batch
                batch.forEach { request ->
                    // Processing logic omitted...
                }
                logger.info("Batch processing complete")
            } catch (e: Exception) {
                logger.error("Error during batch processing", e)
            }
        }
    }
}
        

Key Components

  • ConcurrentLinkedQueue: A thread-safe queue that serves as our buffer for incoming messages
  • Message Type Filtering: Only specific message types (in this case, reward notifications) are added to the queue
  • Scheduled Method: A method annotated with @Scheduled that runs at a fixed rate (1000ms)
  • Configurable Rate: The number of messages processed per batch is controlled by a configuration property

How It Works

  1. The Kafka listener receives messages from the configured topic
  2. For regular messages, they are processed immediately in the listener
  3. For reward push notifications (SYSTEM_PLUS_REWARD), messages are added to the queue
  4. A scheduled task runs every second, taking up to N messages from the queue
  5. This creates a controlled flow of notifications, limiting to N per second

Configuration

The processing rate is controlled by a configuration property:

# application.properties or application.yml
mobile-push:
  process-per-second: 50  # Process 50 notifications per second
        

Benefits

  • Throttles message processing to prevent system overload
  • Provides backpressure mechanism to handle traffic spikes
  • Reduces load on downstream notification services
  • Configurable rate without application restart
  • No message loss - all messages are eventually processed

Error Handling with Dead Letter Topics

Proper error handling is crucial in any message processing system. When processing fails, we can use a Dead Letter Topic (DLT) to store messages that couldn't be processed:

@Scheduled(fixedRate = 1000)
fun processQueue() {
    val batch = mutableListOf<MobilePushRequest>()
    repeat(processPerSecond.toInt()) {
        val request = queue.poll() ?: return@repeat
        batch.add(request)
    }

    if (batch.isNotEmpty()) {
        logger.info("Processing batch of size: {}", batch.size)
        batch.forEach { request ->
            try {
                // ...processing logic omitted...
            } catch (e: Exception) {
                logger.error("Error processing message: {}", request, e)
                // Send to Dead Letter Topic
                sendToDLT(request, e)
            }
        }
        logger.info("Batch processing complete")
    }
}

private fun sendToDLT(request: MobilePushRequest, exception: Exception) {
    try {
        // Create a wrapped message with the original request and error details
        val dltMessage = DeadLetterMessage(
            originalMessage = request,
            exceptionMessage = exception.message,
            exceptionClass = exception.javaClass.name,
            timestamp = System.currentTimeMillis()
        )

        // Send to a dedicated Dead Letter Topic
        deadLetterProducer.send("mobile-push-dlt", dltMessage)
        logger.info("Message sent to DLT: {}", request)
    } catch (e: Exception) {
        logger.error("Failed to send message to DLT: {}", request, e)
    }
}
        

With this approach, messages that fail processing are:

  1. Logged with detailed error information
  2. Wrapped with additional metadata about the failure
  3. Sent to a dedicated Dead Letter Topic for later analysis or retry

You can also implement a separate consumer for the DLT that could attempt to reprocess messages after a delay or alert operations staff about persistent failures.

Considerations

  • Memory Usage: The queue uses memory, so consider the potential queue size during traffic spikes
  • Application Restart: Queue contents are lost on application restart - consider persistent queues for critical notifications
  • Monitoring: Implement monitoring for queue size to detect potential backlog issues
  • Error Handling: Implement robust error handling to prevent queue processing from stopping
  • Scaling: In clustered environments, each instance will have its own queue and rate limit

Alternative Approaches

  • Kafka Consumer Pause/Resume: Temporarily pause consumption when processing is backed up
  • External Rate Limiters: Use libraries like Resilience4j or Bucket4j
  • Distributed Rate Limiting: Use Redis-based rate limiters for clustered environments
  • Kafka Streams: For more complex throttling requirements

Conclusion

Implementing rate limiting for Kafka consumers using an internal queue is a simple yet effective approach for controlling message processing rates. This pattern is particularly useful for high-volume notification systems where controlling the throughput is essential for system stability and downstream service protection.

The implementation shown here provides a configurable, stable way to handle varying loads while ensuring all messages are eventually processed. For production systems, consider the additional factors mentioned in the considerations section to ensure reliability and robustness.

Copyright © 2025 Giri Labs Inc.·Trademark Policy