Apache Kafka Consumer Rebalance

Consumer rebalance decide which consumer is responsible for which subset of all available partitions for some topic(s). For example, you might have a topic with 20 partitions and 10 consumers; at the end of a rebalance, you might expect each consumer to be reading from 2 partitions. If you shut down 10 of those consumers, you might expect each consumer to have 1 partitions after a rebalance has completed. Consumer rebalance is dynamic partition assignment which can handle automatically by Kafka.

A Group Coordinator is one of broker responsible to communicate with consumers to achieve rebalances between consumer.In earlier version Zookeeper stored metadata details but latest version it store on brokers.Consumer coordinator receive heartbeat and polling from all consumers of consumer group so he aware about each consumer heartbeat and manager their offset on partitions.

Group Leader: One of consumer of Consumer Group work as group leader which is chosen by Group coordinator and will responsible for making partition assignment decision on behalf of all consumers in a group.

Rebalance Scenarios:

  1. Consumer Group subscribes to any topics
  2. A Consumer instance could not able to send heart beat with session.heart.beat time interval.
  3. Consumer long process exceed poll timeout
  4. Consumer of Consumer group through exception
  5. New partition added.
  6. Scaling Up and Down consumer . Added new consumer or remove existing consumer manually for

Consumer Rebalance

Consumer rebalance initiated when consumer requests to join a group or leaves a group. The Group Leader receive a list of all active consumer from the Group Coordinator. Group Leader decide partition(s) assigned to each consumer by using PartitionAssigner. Once Group Leader finalize partition assignment it send assignments list to Group Coordinator which send back these information to all consumer. Group only send applicable partitions to their consumer not other consumer assigned partitions. Only Group Leader aware about all consumer and its assigned partitions. After the rebalance is complete, consumers start sending Heartbeat to Group Coordinator that its alive. Consumers send a OffsetFetch request to the Group Coordinator to get last committed offsets for their assigned partitions. Consumers start consuming messaged for newly assigned partition.

State Management

While rebalancing, Group coordinator set its state to Rebalance and wait all consumer to re-join the group.

When the Group start rebalancing , the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Once rebalance completed Group coordinator create new generation ID and notified to all consumers and group proceed to sync stage where consumers send sync request and go to wait until group Leader finish generating new assign partition.Once consumers received new assigne partition they moved to stable stage.

enter image description here

Static Membership

Thies rebalancing is quite heavy operation as it required to stop all consumer and wait to get new assigned partition. On each rebalance always create new generation id means refresh everything. To solve this overhead Kafka 2.3+ introduced Static Membership to reduce unnecessary Rebalance. KIP-345

In Static Membership consumer state will persist and on Rebalance the same assignment will get apply. It uses new group.instance.id to persist member identity. So even in worst case scenario member id get reshuffle to assigne new partition but still same consumer instance id will get same partition assignment

instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

And after the restart:

instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}   

Ref:

  1. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership
  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
Advertisement

Kafka – Exactly-Once Semantics

In distributed environment failure is very common scenario which can be happened any time. In Kafka environment, broker can crash, network failure, failure in processing, failure while publishing message or failure to consume messages etc. These different scenario introduced different kind of data loss and duplication.

Failure scenarios

A (Ack Failed): Producer published message successfully with retry>1 but could not received acknowledge due to failure. In that case Producer will retry same message might introduce duplicate.

B (Producer process failed in batch messages): Producer sending batch of messages it failed with few published success. In that case and once producer will restart it will again republish all message from batch which will introduce duplicate in Kafka.

enter image description here

C (Fire & Forget Failed) Producer published message with retry=0(fire and forget). In case of failure published will not aware and send next message this will cause message lost.

enter image description here

D (Consumer failed in batch message) A consumer receive a batch of messages from Kafka and manually commit their offset (enable.auto.commit=false). If consumer failed before committing to Kafka , next time Consumer will consume the same records again which reproduce duplicate on consumer side.

enter image description here

Exactly-Once semantics

In this case, even if a producer tries to resend a message, it leads to the message will be published and consume by consumer exactly once.

To achieve Exactly-Once semantic in Kafka , it uses below 3 property

  1. enable.idempotence=true (address a, b & c)
  2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(Producer will always have one in-flight request per connection)
  3. isolation.level=read_committed (address d )

Enable Idempotent(enable.idempotence=true)

Idempotent delivery enables producer to write message to Kafka exactly once to a particular partition of a topic during the lifetime of a single producer without data loss and order per partition.

“Note that enabling idempotence requires MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and ACKS_CONFIG be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown”

To achieve idempotence Kafka uses unique id which is called product id or PID and sequence number while producing messages. Producer keep incrementing sequence number on each message published which map with unique PID. Broker always compare current sequence number with previous one and it reject if new one is not +1 greater than previous one which avoid duplication and same time if more than greater show lost in messages

enter image description here

In failure scenario broker will compare sequence number with previous one and if sequence not increased +1 will reject the message.

enter image description here

Transaction (isolation.level)

Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.It allows you to commit your consumer offsets in the same transaction along with the data you have processed, thereby allowing end-to-end exactly-once semantics.

Producer doesn’t wait to write message to kafka wherease Producer uses beginTransaction, commitTransaction and abortTransaction(in case of failure) Consumer uses isolation.level either read_committed or read_uncommitted

  • read_committed: Consumer will always read committed data only.
  • read_uncommitted: Read all messages in offset order without waiting for transactions to be committed

If a consumer with isolation.level=read_committed reaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration transaction.timeout.ms(default 1 minute).

Exactly-Once in Producer & Consumer

In normal condition where we have seperate producer and consumer. Producer has to idempotent and same time manage transaction so consumer can use isolation.level to read only read_committed to make whole process as atomic operation. This makes guarantee that producer will always sync with source system. Even producer crash or transaction aborted , it always be consistent and publish message or batch of message as unit once.

The same consumer will either receive message or batch of message as unit once.

In Exactly-Once semantic Producer along with Consumer will appeared as atomic operation which will operate as one unit. Either publish and get consumed once at all or aborted.

Exactly Once in Kafka Stream

Kafka Stream consume messages from topic A , process and publish message to Topic B and once publish use commit(commit mostly run under cover) to flush all state store data to disk.

Exactly-once in Kafka Stream is read-process-write pattern which guarantee that these operation will be treated as atomic operation. Since Kafka Stream cater producer , consumer and transaction all together Kafka Stream comes special parameter processing.guarantee which could exactly_once or at_least_once which make life easy not to handle all parameters separately.

Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics and production to output topics all together. If any one of these steps fail, all of the changes are rolled back.

processing.guarantee : exactly_once automatically provide below parameters you no need to set explicetly

  1. isolation.level=read_committed
  2. enable.idempotence=true
  3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5