Blog

Handling duplicate messages using the Idempotent consumer pattern

Let’s imagine that you developing a message handler for an enterprise application. Unless the application is based entirely on streaming, it’s very likely that the message handler will need to update a database. For example, in the Customers and Orders application, the Customer Service defines the OrderEventConsumer class event handler, which handles an OrderCreated event by attempting to reserve credit for the order. Similarly, in the FTGO application, the Order History Service handles Order events by creating or updating the corresponding item in a DynamoDB-based CQRS view.

An application typically uses a message broker, such as Apache Kafka or RabbitMQ, that implements at-least once delivery. At-least once delivery ensures that messages will be delivered. It does mean, however, that the message broker can invoke a message handler repeatedly for the same message. You must use the Idempotent Consumer pattern to ensure that your message handlers correctly handle duplicate messages. In this post, I’ll describe why duplicate messages can occur and the problems they cause. You learn how to prevent those problems by making your message handlers idempotent. Let’s start by looking at why a message broker can deliver a message more than once.

Why can duplicate messages occur?

At a very high-level, a message handler executes the following pseudo code:

while (true) {
   Read message
   Begin database transaction
   … update one or more business objects…
   Commit database transaction
  Acknowledge message
}

A message handler loops repeatedly executing the following three steps. First, it reads a message from the message broker. Second, the message handler updates the database, Finally, it acknowledges the message, which tells the message broker that it has been processed successfully and should not be redelivered.

It’s possible, however, that the message handler successfully updates the database but somehow fails to acknowledge the message. The message handler might crash, for example. Alternatively, the broker might crash and lose the acknowledgement. A message broker that guarantees at-least once deliver recovers from these kinds of failures by repeatedly delivering the message until it has been successfully processed. As a result, a message handler can execute the database transaction multiple times for the same message.

If you are using a message broker, such as Apache Kafka, that offers a form of exactly once semantics, you might think that your application won’t encounter duplicate messages. But if you read the fine print, you will discover that the guarantee only applies to Apache Kafka messaging. Specifically, if Apache Kafka invokes a message handler more than once for the same message, it detects and discards any duplicate messages produced by the handler. The message handler will still execute the database transaction repeatedly.

Idempotency is important

A message handler must be idempotent: the outcome of processing the same message repeatedly must be the same as processing the message once. Some message handlers are inherently idempotent but others need to be made idempotent. Whether or not a message handler is naturally idempotent depends on the details of the business logic.

Consider, for example, a message handler for AccountDebited events that updates the account’s current balance in a CQRS view. On the one hand, if the AccountDebited event has a currentBalance attribute then the handler simply updates the view with that new value. This message handler is naturally idempotent and can be safely process the same event multiple times.

But on the other hand, let’s imagine that the AccountDebited event contains only the debit amount. The message handler must update the current balance by subtracting the debit amount. This message handle is not idempotent because if it processed the same message repeatedly then the current balance would be incorrect. To prevent this bug, the application must use a mechanism to make the message handler idempotent.

Tracking message IDs

You can make a message handler idempotent by recording in the database the IDs of the messages that it has processed successfully. When processing a message, a message handler can detect and discard duplicates by querying the database. There are a couple of different places to store the message IDs. One option is for the message handler to use a separate PROCESSED_MESSAGES table. The other option is for the message handler to store the IDs in the business entities that it creates or updates. Let’s first look at how to use a PROCESSED_MESSAGES table.

Save messages in a PROCESSED_MESSAGE table

One straightforward solution is to track the message IDs that have been processed in a separate table Here is the pseudo code that implements this algorithm.

while (true) {
   Read message
   Begin database transaction
    INSERT into PROCESSED_MESSAGE (subscriberId, ID) VALUES(subscriberId, message.ID)
    … update one or more business objects…
   Commit transaction
  Acknowledge message
}

After starting the database transaction, the message handler inserts the message’s ID into the PROCESSED_MESSAGE table. Since the (subscriberId, messageID) is the PROCESSED_MESSAGE table’s primary key the INSERT will fail if the message has been already processed successfully. The message handler can then abort the transaction and acknowledge the message.

Eventuate Tram’s SqlTableBasedDuplicateMessageDetector implements this behavior. You can enable SqlTableBasedDuplicateMessageDetector in your application by adding a dependency on eventuate-tram-spring-consumer-jdbc and then using either @EnableAutoConfiguration or @Import(TramConsumerJdbcAutoConfiguration.class).

Tracking IDs in a PROCESSED_MESSAGE table is conceptually straightforward. But it relies on the database supporting transactions that span multiple tables. That’s not a problem when using SQL database but it’s not always suitable when using a NoSQL database. Let’s look at an alternative approach that does not require multi-table database transactions.

Store message IDs in the business entities created or updated by message handlers

Instead of storing the IDs in a separate table, a message handler can store them in the entities that it creates and updates. A message handler that works this way typically creates or updates an entity and performs a duplicate check using a single database operation. Since this approach doesn’t rely on general purpose transactions, it’s particularly useful when developing message handlers that use a NoSQL database.

For example, the FTGO application’s Order History Service implements a DynamoDB-based CQRS view. The view consists of the ftgo-order-history table, which contains an an item for each order. It’s kept up to date by event handlers for Order and Delivery events.

One way to make these message handlers idempotent is for each Order table item to have a set valued messageIDs attribute. This attribute stores the IDs of the messages that have triggered the creation or the update of the table item. A message handler creates or update an Order using a DynamoDB update operation that adds the message’s ID to the messageIDs attribute. The update is made idempotent by using a conditional expression that verifies that the messageIDs attribute doesn’t already contain the ID.

The Order History Service service uses a variation of this approach that exploits the fact that event IDs published by an aggregate instance increase monotonically. Rather than storing all IDs, a message handler can simply store the highest event ID published by each aggregate instance. An Order item has one or more attributes named events.<<aggregateType>>.<<aggregateId>> whose value is the highest event ID seen so far from each aggregate instance. An update operation uses a conditional expression that verifies that either this attribute does not exist or that its value is less than current event’s ID. This way of tracking message IDs works especially well for the Order History Service service since only a small number of aggregates publish events that trigger the creation and updating of a given Order table item.

Summary

A message broker can deliver the same message repeatedly. To prevent duplicate messages from causing bugs, a message handlers must use the Idempotent Consumer pattern. If a message handler is not inherently idempotent, it must record successfully processed messages and discard duplicates. A message handler can store IDs in either a PROCESSED_MESSAGES table or in the business entities that it creates or updates.

Learn

To learn more about how to develop message-based microservices:

Posts

Book

Training

Learn more about workshops and bootcamps that enable you to successfully develop applications with the microservices architecture.

Delivered in-person and remotely.

Take a look at my self-paced, virtual bootcamp on distributed data patterns in microservice architecture.

Consulting

Learn more about consulting services that derisk and accelerate your microservices initiative.