Kafka – Exactly-Once Semantics

In distributed environment failure is very common scenario which can be happened any time. In Kafka environment, broker can crash, network failure, failure in processing, failure while publishing message or failure to consume messages etc. These different scenario introduced different kind of data loss and duplication.

Failure scenarios

A (Ack Failed): Producer published message successfully with retry>1 but could not received acknowledge due to failure. In that case Producer will retry same message might introduce duplicate.

B (Producer process failed in batch messages): Producer sending batch of messages it failed with few published success. In that case and once producer will restart it will again republish all message from batch which will introduce duplicate in Kafka.

enter image description here

C (Fire & Forget Failed) Producer published message with retry=0(fire and forget). In case of failure published will not aware and send next message this will cause message lost.

enter image description here

D (Consumer failed in batch message) A consumer receive a batch of messages from Kafka and manually commit their offset (enable.auto.commit=false). If consumer failed before committing to Kafka , next time Consumer will consume the same records again which reproduce duplicate on consumer side.

enter image description here

Exactly-Once semantics

In this case, even if a producer tries to resend a message, it leads to the message will be published and consume by consumer exactly once.

To achieve Exactly-Once semantic in Kafka , it uses below 3 property

  1. enable.idempotence=true (address a, b & c)
  2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(Producer will always have one in-flight request per connection)
  3. isolation.level=read_committed (address d )

Enable Idempotent(enable.idempotence=true)

Idempotent delivery enables producer to write message to Kafka exactly once to a particular partition of a topic during the lifetime of a single producer without data loss and order per partition.

“Note that enabling idempotence requires MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and ACKS_CONFIG be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown”

To achieve idempotence Kafka uses unique id which is called product id or PID and sequence number while producing messages. Producer keep incrementing sequence number on each message published which map with unique PID. Broker always compare current sequence number with previous one and it reject if new one is not +1 greater than previous one which avoid duplication and same time if more than greater show lost in messages

enter image description here

In failure scenario broker will compare sequence number with previous one and if sequence not increased +1 will reject the message.

enter image description here

Transaction (isolation.level)

Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.It allows you to commit your consumer offsets in the same transaction along with the data you have processed, thereby allowing end-to-end exactly-once semantics.

Producer doesn’t wait to write message to kafka wherease Producer uses beginTransaction, commitTransaction and abortTransaction(in case of failure) Consumer uses isolation.level either read_committed or read_uncommitted

  • read_committed: Consumer will always read committed data only.
  • read_uncommitted: Read all messages in offset order without waiting for transactions to be committed

If a consumer with isolation.level=read_committed reaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration transaction.timeout.ms(default 1 minute).

Exactly-Once in Producer & Consumer

In normal condition where we have seperate producer and consumer. Producer has to idempotent and same time manage transaction so consumer can use isolation.level to read only read_committed to make whole process as atomic operation. This makes guarantee that producer will always sync with source system. Even producer crash or transaction aborted , it always be consistent and publish message or batch of message as unit once.

The same consumer will either receive message or batch of message as unit once.

In Exactly-Once semantic Producer along with Consumer will appeared as atomic operation which will operate as one unit. Either publish and get consumed once at all or aborted.

Exactly Once in Kafka Stream

Kafka Stream consume messages from topic A , process and publish message to Topic B and once publish use commit(commit mostly run under cover) to flush all state store data to disk.

Exactly-once in Kafka Stream is read-process-write pattern which guarantee that these operation will be treated as atomic operation. Since Kafka Stream cater producer , consumer and transaction all together Kafka Stream comes special parameter processing.guarantee which could exactly_once or at_least_once which make life easy not to handle all parameters separately.

Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics and production to output topics all together. If any one of these steps fail, all of the changes are rolled back.

processing.guarantee : exactly_once automatically provide below parameters you no need to set explicetly

  1. isolation.level=read_committed
  2. enable.idempotence=true

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s