Apache Kafka Consumer Rebalance

Consumer rebalance decide which consumer is responsible for which subset of all available partitions for some topic(s). For example, you might have a topic with 20 partitions and 10 consumers; at the end of a rebalance, you might expect each consumer to be reading from 2 partitions. If you shut down 10 of those consumers, you might expect each consumer to have 1 partitions after a rebalance has completed. Consumer rebalance is dynamic partition assignment which can handle automatically by Kafka.

A Group Coordinator is one of broker responsible to communicate with consumers to achieve rebalances between consumer.In earlier version Zookeeper stored metadata details but latest version it store on brokers.Consumer coordinator receive heartbeat and polling from all consumers of consumer group so he aware about each consumer heartbeat and manager their offset on partitions.

Group Leader: One of consumer of Consumer Group work as group leader which is chosen by Group coordinator and will responsible for making partition assignment decision on behalf of all consumers in a group.

Rebalance Scenarios:

  1. Consumer Group subscribes to any topics
  2. A Consumer instance could not able to send heart beat with session.heart.beat time interval.
  3. Consumer long process exceed poll timeout
  4. Consumer of Consumer group through exception
  5. New partition added.
  6. Scaling Up and Down consumer . Added new consumer or remove existing consumer manually for

Consumer Rebalance

Consumer rebalance initiated when consumer requests to join a group or leaves a group. The Group Leader receive a list of all active consumer from the Group Coordinator. Group Leader decide partition(s) assigned to each consumer by using PartitionAssigner. Once Group Leader finalize partition assignment it send assignments list to Group Coordinator which send back these information to all consumer. Group only send applicable partitions to their consumer not other consumer assigned partitions. Only Group Leader aware about all consumer and its assigned partitions. After the rebalance is complete, consumers start sending Heartbeat to Group Coordinator that its alive. Consumers send a OffsetFetch request to the Group Coordinator to get last committed offsets for their assigned partitions. Consumers start consuming messaged for newly assigned partition.

State Management

While rebalancing, Group coordinator set its state to Rebalance and wait all consumer to re-join the group.

When the Group start rebalancing , the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Once rebalance completed Group coordinator create new generation ID and notified to all consumers and group proceed to sync stage where consumers send sync request and go to wait until group Leader finish generating new assign partition.Once consumers received new assigne partition they moved to stable stage.

enter image description here

Static Membership

Thies rebalancing is quite heavy operation as it required to stop all consumer and wait to get new assigned partition. On each rebalance always create new generation id means refresh everything. To solve this overhead Kafka 2.3+ introduced Static Membership to reduce unnecessary Rebalance. KIP-345

In Static Membership consumer state will persist and on Rebalance the same assignment will get apply. It uses new group.instance.id to persist member identity. So even in worst case scenario member id get reshuffle to assigne new partition but still same consumer instance id will get same partition assignment

instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

And after the restart:

instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}   

Ref:

  1. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership
  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances