The Day an MSK Upgrade Duplicated Every Outbound Record
A managed Kafka cluster upgrade rolled brokers, our consumer re-processed already-handled records, and Mongo happily inserted them again because we never set _id. Multiple DLTs, a duplicate-document mess, and the one-line fix that one of our sibling pipelines had been quietly using all along.
We had producer idempotence enabled. We had Kafka transactional commit configured. We had a fixed backoff and a DLT recoverer. And we still ended up with several duplicate records in MongoDB across two of our outbound message channels, every one of them eventually causing a webhook handler to crash with IncorrectResultSizeDataAccessException when it tried to look up the single document it expected.
The trigger was a managed Kafka (MSK) cluster upgrade. The root cause was something else entirely — and it had been sitting in our code waiting for the right kind of bad weather to expose it.
1. TL;DR
- Producer
enable.idempotenceand Kafka transactional commit do not make your consumer idempotent. They protect the producer side only. - Kafka guarantees at-least-once. Any broker rolling restart, coordinator transition, or session timeout can cause a record to be re-delivered after your listener has already finished its side effects.
- If your listener writes to MongoDB by calling
repository.save(entity)on an entity whose_idis generated inside the listener, every re-delivery creates a new document. Same business key, different_id. - The fix is to let the producer mint the
_id, ship it in the event payload, and have the consumer rehydrate the entity with that exact_id. Mongo'ssavebecomesreplaceOne({_id: X}, ..., upsert: true), which is a single atomic upsert. Re-deliveries replace instead of duplicate. - One of our sibling pipelines had been doing exactly this for years. It absorbed the same broker storm without a single DLT. The other two pipelines did not, so they took the hit.
- If you only ship one change after reading this post: audit every
@KafkaListenerthat writes to a database and confirm the listener can be re-invoked with the same record and still leave the system in the same state.
2. The Incident: A Cluster of DLTs in Half an Hour
The on-call channel started lighting up with DLT alerts from our writer consumer group. All of them carried the same exception:
IncorrectResultSizeDataAccessException
Query findByExternalMessageId returned non unique result
The listener was a webhook handler. A third-party delivery provider had POSTed back with the delivery status of a message we sent earlier. The handler looked up the original record by its provider-issued ID and was expecting a single document. It was getting two. Or three.
We checked the collection directly:
db.outbound_messages.find(
{external_message_id: "<id-A>"}
).count()
// 2
db.outbound_messages.find(
{external_message_id: "<id-B>"}
).count()
// 3
Two documents for a send that we knew, from upstream logs, had been emitted exactly once. Three documents for another single send. The provider APIs each saw one outbound call. The webhook came back once. Yet the database had multiples.
Something between the producer and the writer was multiplying the records.
3. Trigger vs Root Cause
The natural first move during a postmortem is to point at whatever was different that day. In our case the infra team had announced a managed-Kafka cluster upgrade, rolling the brokers in sequence (~70 minutes per cluster). The DLTs landed right in the middle of the upgrade window for the cluster our notification consumers were attached to. Case closed?
Not quite. The MSK upgrade was the trigger. It was the gust of wind that knocked the dead branch off the tree. The branch had been dead for months. We had simply never had wind strong enough.
Separating trigger from root cause matters because the trigger is one-off and the root cause is permanent. Without the upgrade, the same outcome would have followed from any coordinator transition: a broker restart for a routine OS patch, an availability-zone failover, an ISR shrink under network blip, a consumer pod rolling restart that overruns the session timeout. We would have lived with a latent bug indefinitely, and it would have fired on a date determined entirely by cloud provider luck.
Heuristic: if the trigger is unique but the root cause is universal, you have a bug. If the trigger and the failure are both universal, you have a regression. If both are unique, you might just have noise. Postmortems that stop at the trigger ship the bug into the next outage.
4. What MSK Upgrade Looked Like from the Consumer's Point of View
The Kafka broker layer cycles in a rolling fashion: one broker is taken out of rotation, patched, started up, allowed to rejoin the cluster, and then the next. For consumer groups this is mostly invisible — until the broker being cycled happens to be the group coordinator for your consumer group. Then three errors show up in your logs in rapid succession.
First the heartbeats fail because the coordinator broker went down:
[Consumer ... groupId=writer-service] Attempt to heartbeat failed since coordinator b-1.example-cluster.kafka.amazonaws.com:9094 (id: 2) is either not started or not valid; will attempt rejoin
Then offset commits start failing while the coordinator is in transition:
Offset commit failed on partition outbound.message.write-0 at offset 3586709: The coordinator is loading and hence can't process requests. Offset commit failed on partition outbound.message.write-1 at offset 3588551: This is not the correct coordinator.
These three messages — not started or not valid, coordinator is loading, and not the correct coordinator — are the signature of a broker rolling restart from the consumer's perspective. If you see them clustered together, the broker hosting your group coordinator is changing.
Crucially, the offset commit failure does not mean the record was not processed. The listener method had already returned successfully. The Mongo insert was already done. The failure is only on the bookkeeping step — telling Kafka "I'm done with offset N, please don't give that one to me again."
That bookkeeping failure is enough to trigger Kafka's at-least-once recovery. When the consumer re-establishes its session against the new coordinator, it asks "where do I resume?" and the coordinator answers "from the last offset I have on record" — which is the offset before the failed commit. The next poll returns the same record again. The listener fires again. The Mongo insert happens again.
Our consumer group experienced this on three separate broker rotations during the upgrade window. One channel got hit once, ending up with two documents per record. Another channel got hit twice, ending up with three.
5. The Root Cause: Listener Mints the _id
Here is the offending listener, simplified:
@KafkaListener(topics = ["outbound.message.write"]) fun consume(event: MessageWriteEvent) { val record = OutboundMessage.create( recordId = event.recordId, // often null from the producer externalMessageId = event.externalMessageId, ... ) adaptor.create(record) }
And the factory:
fun create(recordId: String?, ...): OutboundMessage { return OutboundMessage( id = recordId?.let { ObjectId(it) } ?: ObjectId.get(), // ← ... ) }
The producer was not setting recordId, so every consumer invocation called ObjectId.get(), generating a fresh ObjectId on the spot. When the same event was delivered twice, the listener built two entities with two different _id values but the same external_message_id. repository.save(entity) happily inserted both, because from Mongo's perspective they were two distinct documents with distinct primary keys.
There was no unique index on external_message_id to catch this at the database level. The field was indexed for query performance, but the index was not declared unique. So Mongo had no opinion about whether the second insert should be rejected.
Three failures lined up:
- The producer side did not stamp a stable primary key into the event payload.
- The consumer side minted a fresh primary key each time it ran.
- The database had no unique constraint on the natural business key (
external_message_id) that would have noticed the duplicate.
Any one of those fixes would have prevented the incident. We applied the first two, because they are the cheapest and the most local.
6. Why save() with a Stable _id Is Naturally Idempotent
This is the part that took a couple of whiteboard rounds in the postmortem, so it's worth being explicit.
Spring Data MongoDB's MongoRepository.save(entity) does not issue a "check if exists then insert or update" pair of operations. It issues a single Mongo command:
update {
q: { _id: ObjectId("...") },
u: { ...full document... },
upsert: true,
multi: false
}
The wire-level operation is replaceOne(filter, doc, {upsert: true}). Mongo executes it atomically inside the storage engine, holding the _id index lock for the duration. There is no separate read. The two paths converge into one wire call:
- If no document matches the filter, insert the document.
- If a document matches, replace it with the new one.
For an at-least-once consumer this is exactly the semantics you want. The first delivery inserts. The second delivery replaces with identical content. Net effect: one document, regardless of how many times the listener ran.
Common Confusion: save vs insert
The names look interchangeable. They are not.
| API call | When same _id already exists |
|---|---|
| MongoRepository.save(e) | Replace (no exception) |
| MongoRepository.insert(e) | DuplicateKeyException |
| collection.replaceOne(..., upsert) | Replace (no exception) |
| collection.insertOne(doc) | DuplicateKeyException |
The contract is set by the method name. save means "make the database look like this." insert means "add this new thing." Re-running a Kafka consumer wants the first semantic, not the second.
As a useful side benefit, Spring Data's @CreatedDate annotation is driven by the isNew() check, which for entities with an _id already populated returns false. So createdAt is set on the first insert and untouched on replace. @LastModifiedDate ticks on every save, as expected. The audit metadata survives re-delivery correctly.
7. The Sibling That Survived
The most useful discovery during the postmortem was an internal one. Our system has three outbound message channels sharing the same writer service, the same broker, and the same Mongo cluster. All three were exposed to the same broker rotation. Only two of them ended up with duplicate documents.
Looking at the surviving channel's producer:
queueTemplate.send(
WRITE_TOPIC,
EventBuilder()
.targets(listOf(WriteTarget(
outbound.accountId,
outbound.id!!.toHexString() // ← the _id is minted here
)))
.build()
)
The producer mints the Mongo _id before the message ever hits Kafka. The consumer rehydrates with that exact id:
OutboundMessage.from( id = ObjectId(target.recordId), // ← stable across re-delivery ... )
So when the broker storm caused the surviving channel's partitions to re-deliver, the listener built entities with the same _id the second time. save turned into replaceOne. The database never saw a duplicate.
We could see the re-delivery in the consume logs. Throughput on the surviving channel during the upgrade window spiked from ~85 records/minute baseline to over 320 records/minute as Kafka replayed records that had not been committed. The application happily absorbed the storm because every replay landed on top of an existing document.
That code had been written this way for years, with no relation to any incident. It was simply how the author had chosen to plumb the relationship between the gateway-side entity and the writer-side document. The choice paid out under stress that nobody had anticipated when the code was written.
Lesson: when one part of your system survives an incident that breaks a sibling, study the survivor before designing the fix. The pattern you need may already exist in your codebase and be one git grep away.
8. Four Patterns for Idempotent Consumers
Once you accept that at-least-once is permanent and that any of the triggers (broker rotation, AZ failover, pod restart, GC pause) will eventually cause a redelivery, the only question is which idempotency pattern to apply. In rough order of how cheap and how local they are:
- Use a natural business key for upsert. If you already have a unique identifier in the event payload (vendor message id, order id, payment intent id), use it as the filter key. The benefit is zero extra plumbing — you store something you were already storing. The cost is making sure the field is unique in the data, which is usually true but worth confirming.
-
Mint the primary key on the producer side and ship it in the event. This is what the surviving channel did. The consumer rehydrates with the same key on every delivery, and
saveturns into upsert. Works well when the producer naturally creates the record and the consumer is just persisting it. This is the change we applied to the other two channels. -
Add a unique index on the business key and swallow the duplicate exception. If you cannot change the producer or the consumer key, you can still let the database enforce uniqueness. Catch
DuplicateKeyExceptionand treat it as success. The cost is a slightly more awkward control flow and the need to be careful that the catch does not hide a real bug. The benefit is a defense-in-depth layer that catches any future producer-side path that forgets to send a key. - External dedup store. Maintain a set of processed event ids in Redis or a small Mongo collection, check before processing, write after success. This is the most expensive option in terms of latency and operational surface area, so reserve it for cases where 1–3 are not feasible — for example, when the side effect is an external HTTP call that the receiving system cannot dedup on its own.
In our incident, pattern 2 was the fix and pattern 3 is the follow-up. The two compose nicely. Pattern 2 handles the common path with no extra code at the writer. Pattern 3 catches the case where a future endpoint is added that forgets to mint the id.
9. What Producer Idempotence and Transactional Commit Do Not Solve
A reasonable response when reading this story is: "we have enable.idempotence=true on the producer, surely that protects us?" It does not. It is worth being precise about which problem each Kafka feature actually addresses.
- 📤 Producer idempotence Prevents the same producer from publishing the same record twice when it retries an inflight request. Scope: producer → broker. Does nothing for what happens after the consumer reads the record.
- 🔁 Kafka transactional commit (EOS) Lets a consumer-producer pair commit "I read offset N and I produced these output records" atomically. Useful when your consumer is itself a producer. Does nothing if your consumer's side effect is a write to an external system like MongoDB.
- 🧱 Application idempotency The only thing that protects external side effects (database, HTTP, email). It is the consumer's responsibility, not Kafka's. The Kafka features above coexist with the duplicate-insert bug we hit, because none of them ever touch the Mongo write path.
The mental model that gets you through this: Kafka's guarantees are about Kafka. Any state you maintain outside Kafka has to handle re-delivery on its own.
10. The Fix and the Cleanup
The code change was small. On the producer side, the two affected channels now mint an ObjectId alongside the external message id and ship both in the write event:
val recordId = ObjectId.get() val externalMessageId = externalIdGenerator.generate() producer.produceWriteEvent( command = command, externalMessageId = externalMessageId, recordId = recordId, // ← new )
On the consumer side no change was required, because the factory already accepted an optional recordId and used it when present. The bug was that the producer call sites had been silently relying on the default of null.
The data cleanup was less elegant. For each external_message_id that had duplicates, we kept the document with the earliest created_at (which still had the correct upstream status) and removed the rest. Then we redrove the DLT events back to the original topic, where the now-deduplicated documents could be looked up cleanly.
A separate ticket tracks adding a unique index on external_message_id. We do not want to add it before the dedup is done, because the index build will fail in the presence of duplicates. The order is: deduplicate → add index → the database now refuses any future bug of this shape.
11. Takeaways
- At-least-once is permanent. Every Kafka consumer that mutates external state needs a deliberate idempotency story. The question is "which pattern" not "do we need one."
- Trigger and root cause are different. The infra event that exposes a latent application bug is news; the application bug is the work.
MongoRepository.saveis a single atomic upsert keyed on_id. Pair it with a stable, producer-minted_idand you get idempotency for free.- The signature of a broker rolling restart in your consumer logs is the trio "not started or not valid", "coordinator is loading", and "not the correct coordinator". When you see them together, expect re-deliveries.
- When one part of your system survives an incident and a sibling does not, the survivor is documentation. Read it before reaching for a new design.
- Database-level constraints (unique indexes) are a defense-in-depth layer, not a substitute for application-level idempotency. Use both when the data is critical.
12. Related Reading
- Surviving DocDB Failover with Spring Data MongoDB and Kafka — the prequel that focuses on CircuitBreaker fallback behavior during a failover.
- When Your CircuitBreaker Never Opens — tuning Resilience4j thresholds so they actually fire during real outages.
- The Kafka Event Ordering Illusion — another category of at-least-once surprise that catches teams off guard.