Apache Kafka Consumer Rebalance

Consumer rebalance decide which consumer is responsible for which subset of all available partitions for some topic(s). For example, you might have a topic with 20 partitions and 10 consumers; at the end of a rebalance, you might expect each consumer to be reading from 2 partitions. If you shut down 10 of those consumers, you might expect each consumer to have 1 partitions after a rebalance has completed. Consumer rebalance is dynamic partition assignment which can handle automatically by Kafka.

A Group Coordinator is one of broker responsible to communicate with consumers to achieve rebalances between consumer.In earlier version Zookeeper stored metadata details but latest version it store on brokers.Consumer coordinator receive heartbeat and polling from all consumers of consumer group so he aware about each consumer heartbeat and manager their offset on partitions.

Group Leader: One of consumer of Consumer Group work as group leader which is chosen by Group coordinator and will responsible for making partition assignment decision on behalf of all consumers in a group.

Rebalance Scenarios:

  1. Consumer Group subscribes to any topics
  2. A Consumer instance could not able to send heart beat with session.heart.beat time interval.
  3. Consumer long process exceed poll timeout
  4. Consumer of Consumer group through exception
  5. New partition added.
  6. Scaling Up and Down consumer . Added new consumer or remove existing consumer manually for

Consumer Rebalance

Consumer rebalance initiated when consumer requests to join a group or leaves a group. The Group Leader receive a list of all active consumer from the Group Coordinator. Group Leader decide partition(s) assigned to each consumer by using PartitionAssigner. Once Group Leader finalize partition assignment it send assignments list to Group Coordinator which send back these information to all consumer. Group only send applicable partitions to their consumer not other consumer assigned partitions. Only Group Leader aware about all consumer and its assigned partitions. After the rebalance is complete, consumers start sending Heartbeat to Group Coordinator that its alive. Consumers send a OffsetFetch request to the Group Coordinator to get last committed offsets for their assigned partitions. Consumers start consuming messaged for newly assigned partition.

State Management

While rebalancing, Group coordinator set its state to Rebalance and wait all consumer to re-join the group.

When the Group start rebalancing , the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Once rebalance completed Group coordinator create new generation ID and notified to all consumers and group proceed to sync stage where consumers send sync request and go to wait until group Leader finish generating new assign partition.Once consumers received new assigne partition they moved to stable stage.

enter image description here

Static Membership

Thies rebalancing is quite heavy operation as it required to stop all consumer and wait to get new assigned partition. On each rebalance always create new generation id means refresh everything. To solve this overhead Kafka 2.3+ introduced Static Membership to reduce unnecessary Rebalance. KIP-345

In Static Membership consumer state will persist and on Rebalance the same assignment will get apply. It uses new group.instance.id to persist member identity. So even in worst case scenario member id get reshuffle to assigne new partition but still same consumer instance id will get same partition assignment

instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

And after the restart:

instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}   

Ref:

  1. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership
  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

Kafka – Exactly-Once Semantics

In distributed environment failure is very common scenario which can be happened any time. In Kafka environment, broker can crash, network failure, failure in processing, failure while publishing message or failure to consume messages etc. These different scenario introduced different kind of data loss and duplication.

Failure scenarios

A (Ack Failed): Producer published message successfully with retry>1 but could not received acknowledge due to failure. In that case Producer will retry same message might introduce duplicate.

B (Producer process failed in batch messages): Producer sending batch of messages it failed with few published success. In that case and once producer will restart it will again republish all message from batch which will introduce duplicate in Kafka.

enter image description here

C (Fire & Forget Failed) Producer published message with retry=0(fire and forget). In case of failure published will not aware and send next message this will cause message lost.

enter image description here

D (Consumer failed in batch message) A consumer receive a batch of messages from Kafka and manually commit their offset (enable.auto.commit=false). If consumer failed before committing to Kafka , next time Consumer will consume the same records again which reproduce duplicate on consumer side.

enter image description here

Exactly-Once semantics

In this case, even if a producer tries to resend a message, it leads to the message will be published and consume by consumer exactly once.

To achieve Exactly-Once semantic in Kafka , it uses below 3 property

  1. enable.idempotence=true (address a, b & c)
  2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(Producer will always have one in-flight request per connection)
  3. isolation.level=read_committed (address d )

Enable Idempotent(enable.idempotence=true)

Idempotent delivery enables producer to write message to Kafka exactly once to a particular partition of a topic during the lifetime of a single producer without data loss and order per partition.

“Note that enabling idempotence requires MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and ACKS_CONFIG be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown”

To achieve idempotence Kafka uses unique id which is called product id or PID and sequence number while producing messages. Producer keep incrementing sequence number on each message published which map with unique PID. Broker always compare current sequence number with previous one and it reject if new one is not +1 greater than previous one which avoid duplication and same time if more than greater show lost in messages

enter image description here

In failure scenario broker will compare sequence number with previous one and if sequence not increased +1 will reject the message.

enter image description here

Transaction (isolation.level)

Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.It allows you to commit your consumer offsets in the same transaction along with the data you have processed, thereby allowing end-to-end exactly-once semantics.

Producer doesn’t wait to write message to kafka wherease Producer uses beginTransaction, commitTransaction and abortTransaction(in case of failure) Consumer uses isolation.level either read_committed or read_uncommitted

  • read_committed: Consumer will always read committed data only.
  • read_uncommitted: Read all messages in offset order without waiting for transactions to be committed

If a consumer with isolation.level=read_committed reaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration transaction.timeout.ms(default 1 minute).

Exactly-Once in Producer & Consumer

In normal condition where we have seperate producer and consumer. Producer has to idempotent and same time manage transaction so consumer can use isolation.level to read only read_committed to make whole process as atomic operation. This makes guarantee that producer will always sync with source system. Even producer crash or transaction aborted , it always be consistent and publish message or batch of message as unit once.

The same consumer will either receive message or batch of message as unit once.

In Exactly-Once semantic Producer along with Consumer will appeared as atomic operation which will operate as one unit. Either publish and get consumed once at all or aborted.

Exactly Once in Kafka Stream

Kafka Stream consume messages from topic A , process and publish message to Topic B and once publish use commit(commit mostly run under cover) to flush all state store data to disk.

Exactly-once in Kafka Stream is read-process-write pattern which guarantee that these operation will be treated as atomic operation. Since Kafka Stream cater producer , consumer and transaction all together Kafka Stream comes special parameter processing.guarantee which could exactly_once or at_least_once which make life easy not to handle all parameters separately.

Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics and production to output topics all together. If any one of these steps fail, all of the changes are rolled back.

processing.guarantee : exactly_once automatically provide below parameters you no need to set explicetly

  1. isolation.level=read_committed
  2. enable.idempotence=true
  3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5

JAVA NIO – FileLock

Locking on File introduced with java.nio package that implemented by FileChannel. A FileLock is two types exclusive and shared. A shared lock allows other concurrently running program to acquire overlapping shared Lock. An Exclusive lock doesn’t allow other program to acquire overlapping lock.

Lock ideally supports shared Lock but it depends OS if OS doesn’t support share lock then it will act as exclusive locking. Locking applied on File not per channel or thread means if channel obtained the exclusive lock then other JVM’ channel will be block till first thread channel resume the lock.

Locks are associated with a file, not with individual file handles or channels Locking applied on File not per channel or thread means if channel obtained the lock before processing the file then other JVM’ channel will be block till first thread channel resume the lock. Since Lock is on File therefore it is not advisable to use lock on a single JVM.

Below is FileLock API in FileChannel class

public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel,
GatheringByteChannel, ScatteringByteChannel {
public abstract FileLock lock(long position, long size, boolean shared) throws IOException;
public final FileLock lock() throws IOException {
return lock(0L, Long.MAX_VALUE, false);
}
public abstract FileLock tryLock(long position, long size, boolean shared) throws IOException;
public final FileLock tryLock() throws IOException {
return tryLock(0L, Long.MAX_VALUE, false);
}
}

The FileLock (long position, long size, boolean shared) acquire lock on specified part of a file. The region of file specified by beginning position and size whereas last argument specified if lock is shared lock (value True) of exclusive (value False). To obtain the shared lock, you need to open file with read permission whereas write permission require exclusive lock.

We can use lock () method to acquire lock on whole file not the specified area on file. File can grow therefore we need to specify the size while obtaining the lock.

Method tryLock on specified area or on full file doesn’t block/hold while acquiring a lock means it immediately return value without going to wait to acquire lock. If other process already acquired lock for specific file then it will immediate return null.

FileLock API specified as below.

public abstract class FileLock {
public final FileChannel channel( )
public final long position( )
public final long size( )
public final boolean isShared( )
public final boolean overlaps (long position, long size)
public abstract boolean isValid( );
public abstract void release( ) throws IOException;
}

The FileLock encapsulates specific region, which is acquired by FileChannel. FileLock associate with specific FileChannel instance and FileLock keep the reference of FileChannel that could be determining by channel () method in FileLock.

FileLock lifecycle will start when lock method or tryLock invoke from FileChannel and end when release () method called. It FileLock also invalid if JVM shutdown or associated channel closed.

isShared() method return whether lock is shared or exclusive. If shared lock is not supported by operating system then this method always return false.

FileLock objects are thread-safe; multiple threads may access a lock object concurrently.

Finally, overlaps(long position, long size) return if lock overlap the given block range or not.

FileLock object associated with an underlying file which occurred deadlock if you don’t release the lock hence it is advisable to always release lock as mentioned below

FileLock lock = fileChannel.lock( )
try {
..............
} catch (IOException) [
..................
} finally {
lock.release( )
}

Grails – RESTful Webservice using JAX-RS plugin

I discussed about RESTful architecture consideration and how to build RESTful web services using Jersey JAX-RS [JSR 311]  in JAVA. Today I am going to discuss how to build the RESTful web services using Grails framework.

GRAILs framework is the platform which provides end to end solution to build web application from scratch to advance intuitive RIA web application. Grails potentially be a one-stop shop for all your enterprise application development. Grails is modular, allowing you to pick and choose which modules are applicable for your application Continue reading

Groovy & Grails Understanding – Part2

Grails

Grails is a web framework based on Groovy and Java which can be deployed into existing Java web servers, e.g. Tomcat or Jetty. Its scaffolding capabilities let you create a new project within a few minutes. Grails is based on the “convention over configuration” idea which allows the application to auto-wires itself based on naming schemes (instead of using configuration files, e.gl XML files). Continue reading

Groovy & Grails Understanding – Part1

Introduction

Enterprises today require agile platform for rapid development of applications with ready assurance to quality of services, compliance to architecture and design standards. Two key things influence our ability to be agile. First, it’s the attitude of everyone involved. Second it’s the languages, framework, and tools we use to get our work done. Continue reading