Apache Kafka Consumer Rebalance

Consumer rebalance decide which consumer is responsible for which subset of all available partitions for some topic(s). For example, you might have a topic with 20 partitions and 10 consumers; at the end of a rebalance, you might expect each consumer to be reading from 2 partitions. If you shut down 10 of those consumers, you might expect each consumer to have 1 partitions after a rebalance has completed. Consumer rebalance is dynamic partition assignment which can handle automatically by Kafka.

A Group Coordinator is one of broker responsible to communicate with consumers to achieve rebalances between consumer.In earlier version Zookeeper stored metadata details but latest version it store on brokers.Consumer coordinator receive heartbeat and polling from all consumers of consumer group so he aware about each consumer heartbeat and manager their offset on partitions.

Group Leader: One of consumer of Consumer Group work as group leader which is chosen by Group coordinator and will responsible for making partition assignment decision on behalf of all consumers in a group.

Rebalance Scenarios:

  1. Consumer Group subscribes to any topics
  2. A Consumer instance could not able to send heart beat with session.heart.beat time interval.
  3. Consumer long process exceed poll timeout
  4. Consumer of Consumer group through exception
  5. New partition added.
  6. Scaling Up and Down consumer . Added new consumer or remove existing consumer manually for

Consumer Rebalance

Consumer rebalance initiated when consumer requests to join a group or leaves a group. The Group Leader receive a list of all active consumer from the Group Coordinator. Group Leader decide partition(s) assigned to each consumer by using PartitionAssigner. Once Group Leader finalize partition assignment it send assignments list to Group Coordinator which send back these information to all consumer. Group only send applicable partitions to their consumer not other consumer assigned partitions. Only Group Leader aware about all consumer and its assigned partitions. After the rebalance is complete, consumers start sending Heartbeat to Group Coordinator that its alive. Consumers send a OffsetFetch request to the Group Coordinator to get last committed offsets for their assigned partitions. Consumers start consuming messaged for newly assigned partition.

State Management

While rebalancing, Group coordinator set its state to Rebalance and wait all consumer to re-join the group.

When the Group start rebalancing , the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Once rebalance completed Group coordinator create new generation ID and notified to all consumers and group proceed to sync stage where consumers send sync request and go to wait until group Leader finish generating new assign partition.Once consumers received new assigne partition they moved to stable stage.

enter image description here

Static Membership

Thies rebalancing is quite heavy operation as it required to stop all consumer and wait to get new assigned partition. On each rebalance always create new generation id means refresh everything. To solve this overhead Kafka 2.3+ introduced Static Membership to reduce unnecessary Rebalance. KIP-345

In Static Membership consumer state will persist and on Rebalance the same assignment will get apply. It uses new group.instance.id to persist member identity. So even in worst case scenario member id get reshuffle to assigne new partition but still same consumer instance id will get same partition assignment

instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

And after the restart:

instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}   

Ref:

  1. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership
  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

Kafka – Exactly-Once Semantics

In distributed environment failure is very common scenario which can be happened any time. In Kafka environment, broker can crash, network failure, failure in processing, failure while publishing message or failure to consume messages etc. These different scenario introduced different kind of data loss and duplication.

Failure scenarios

A (Ack Failed): Producer published message successfully with retry>1 but could not received acknowledge due to failure. In that case Producer will retry same message might introduce duplicate.

B (Producer process failed in batch messages): Producer sending batch of messages it failed with few published success. In that case and once producer will restart it will again republish all message from batch which will introduce duplicate in Kafka.

enter image description here

C (Fire & Forget Failed) Producer published message with retry=0(fire and forget). In case of failure published will not aware and send next message this will cause message lost.

enter image description here

D (Consumer failed in batch message) A consumer receive a batch of messages from Kafka and manually commit their offset (enable.auto.commit=false). If consumer failed before committing to Kafka , next time Consumer will consume the same records again which reproduce duplicate on consumer side.

enter image description here

Exactly-Once semantics

In this case, even if a producer tries to resend a message, it leads to the message will be published and consume by consumer exactly once.

To achieve Exactly-Once semantic in Kafka , it uses below 3 property

  1. enable.idempotence=true (address a, b & c)
  2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(Producer will always have one in-flight request per connection)
  3. isolation.level=read_committed (address d )

Enable Idempotent(enable.idempotence=true)

Idempotent delivery enables producer to write message to Kafka exactly once to a particular partition of a topic during the lifetime of a single producer without data loss and order per partition.

“Note that enabling idempotence requires MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and ACKS_CONFIG be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown”

To achieve idempotence Kafka uses unique id which is called product id or PID and sequence number while producing messages. Producer keep incrementing sequence number on each message published which map with unique PID. Broker always compare current sequence number with previous one and it reject if new one is not +1 greater than previous one which avoid duplication and same time if more than greater show lost in messages

enter image description here

In failure scenario broker will compare sequence number with previous one and if sequence not increased +1 will reject the message.

enter image description here

Transaction (isolation.level)

Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.It allows you to commit your consumer offsets in the same transaction along with the data you have processed, thereby allowing end-to-end exactly-once semantics.

Producer doesn’t wait to write message to kafka wherease Producer uses beginTransaction, commitTransaction and abortTransaction(in case of failure) Consumer uses isolation.level either read_committed or read_uncommitted

  • read_committed: Consumer will always read committed data only.
  • read_uncommitted: Read all messages in offset order without waiting for transactions to be committed

If a consumer with isolation.level=read_committed reaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration transaction.timeout.ms(default 1 minute).

Exactly-Once in Producer & Consumer

In normal condition where we have seperate producer and consumer. Producer has to idempotent and same time manage transaction so consumer can use isolation.level to read only read_committed to make whole process as atomic operation. This makes guarantee that producer will always sync with source system. Even producer crash or transaction aborted , it always be consistent and publish message or batch of message as unit once.

The same consumer will either receive message or batch of message as unit once.

In Exactly-Once semantic Producer along with Consumer will appeared as atomic operation which will operate as one unit. Either publish and get consumed once at all or aborted.

Exactly Once in Kafka Stream

Kafka Stream consume messages from topic A , process and publish message to Topic B and once publish use commit(commit mostly run under cover) to flush all state store data to disk.

Exactly-once in Kafka Stream is read-process-write pattern which guarantee that these operation will be treated as atomic operation. Since Kafka Stream cater producer , consumer and transaction all together Kafka Stream comes special parameter processing.guarantee which could exactly_once or at_least_once which make life easy not to handle all parameters separately.

Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics and production to output topics all together. If any one of these steps fail, all of the changes are rolled back.

processing.guarantee : exactly_once automatically provide below parameters you no need to set explicetly

  1. isolation.level=read_committed
  2. enable.idempotence=true
  3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5

MapReduce Composite Key Operation-Part2

Composite Key Operation In complex operation where we require multi column operation, basic key may not help. For example if we need to calculate population group by country and state then selection of key matter. If we choose the composite key wisely it could solve problem easily. We can create composite key by implementing WritableComparable interface which make it use like any normal WritableComparable interface object. Below is the composite key with two fields country and state which overwrite compare method to do sorting based on country and state. It provides write() and readfields() method to searlized and de-searlized attributes.

private static class CompositeGroupKey implements
WritableComparable<CompositeGroupKey> {
String country;
String state;
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, country);
WritableUtils.writeString(out, state);
}
public void readFields(DataInput in) throws IOException {
this.country = WritableUtils.readString(in);
this.state = WritableUtils.readString(in);
}
public int compareTo(CompositeGroupKey pop) {
if (pop == null)
return 0;
int intcnt = country.compareTo(pop.country);
return intcnt == 0 ? state.compareTo(pop.state) : intcnt;
}
@Override
public String toString() {
return country.toString() + ":" + state.toString();
}
}

We are using above composite key to create MapReduce job to count total population group by Country and State. input

Country State City Population (Mil)
USA, CA Su 12
USA, CA SA 42
USA, CA Fr 23
USA, MO XY 23
USA, MO AB 19
USA, MO XY 23
USA, MO AB 19
IND, TN AT 11
IND, TN KL 10

output

Country State Total Population
IND TN 21
USA CA 77
USA MO 84

Mapper Program Once we define composite key we create the mapper class which use input generated from InputFormat. Input Format split the file and pass to individual Mapper which invoke multiple map tasks. Map task transform input split record into Key-value pair where Key and Value should be implement WritableComparable interface. Writable Interface provides the capabilities to write the data into disk and sort it. The Number of map task will be decided based on InputSplits defined in InputFormat. The split is a logically split not physical. The MapReduce first invoke setup () method of context and then invoke map (Object, Object, Context) for each input split and at last invoke cleanup (Context) method for cleanup activity. We extend Mapper class to basic generic Mapper<Key2, Value1, Key2, Value2> class which indicate the input & out put for key and value(s). Mapper class could be overwrite map () method to process the input data as below

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] keyvalue = line.split(",");
populat.set(Integer.parseInt(keyvalue[3]));
CompositeGroupKey cntry = new CompositeGroupKey(keyvalue[0], keyvalue[1]);
context.write(cntry, populat);
}

If you see above map method it pass Key as Object and value as Text. Value contains each line of file. Inside map method we split the line and create the Key-value and pass to intermediate area through context. Context will fill the io buffer with key-value mapper and later spills to local disk. The map task output grouped by sorted key and writes to the local disk as intermediate data. The grouping of map output defined by partition that identifies the reducer for each key. MapReduce could also provides local combiner which combine intermediate map output and pass to the reducer to cut down the amount of data transferred from the Map to the Reducer. Reducer: The Reducer copy intermediate map tasks output from local disk through http and pass to individual reducer based on each key. Before invoking individual reduce task, Reducer shuffle, merge and sort key-value. Reducer process collection of values for each key and write to the disk. Below is the reduce method which will be spawned by Reducer for each Key that means each reduce task would be having single key with collection of values

public void reduce(CompositeGroupKey key, Iterator<IntWritable> values,Context context) throws IOException, InterruptedException {
int cnt = 0;
while (values.hasNext()) {
cnt = cnt + values.next().get();
}
context.write(key, new IntWritable(cnt));
}

Job: For running the MapReduce we have to set the Mapper, Reducers and other property in JobConf. JobConf is the main configuration class, which configure MapReduce parameters such as Mapper, Reducer, Combiner, InputFormat, OutputFormat, and Comparator etc. Below code shows how to create and run the job based on above map and reduce code

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "GroupMR");
job.setJarByClass(GroupMR.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(CompositeGroupKey.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setMaxInputSplitSize(job, 10);
FileInputFormat.setMinInputSplitSize(job, 100);
FileInputFormat.addInputPath(job, new Path("/Local/data/Country.csv"));
FileOutputFormat.setOutputPath(job, new Path("/Local/data/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);

This main method calls the MapReduce Job. Before calling the job we need to set the MapperClass, ReducerClass, OutputKeyClass and OutputValueClass. We can also set the FileInputFormat and FileOutputFormat. Running: To run the MapReduce program is quite straight forward, what we need to do package the java application in JAR say hadoopessence.jar and run as below in command line hadoop jar hadoopessence.jar.jar org.hadoopessence.compositesorting /input folder and /output folder in HDFS Download: Click here to download source code Reference Hadoop Essence: The Beginner’s Guide to Hadoop & Hive

Hadoop MapReduce Group By Operation – Part1

Introduction This article is continuity of my previous articles on Hadoop MapReduce. In my previous article from my Book Hadoop Essence, I had discussed about overall MapReduce architecture and how it works plus basic Hello World program. In this article I will be discussing how to write basic MapReduce program to calculate sum based on group by operation.

You can download full code in here. Click Here to Download source code.

Input and Output This program read the file country.csv, which contains Country CD, State, City, Population (million) USA, CA, Sunnyvale, 12 USA, CA, SAN JOSE, 42 USA, MO, XY, 23 USA, MO, AB, 19 IND, TN, AT, 11 IND, TN, KL, 10 MapReduce program will process and aggregate the total population group by country and state as below Country CD, State, Total Population (million) IND, TN, 22 USA, CA, 54 USA, MO, 42

Environment Setup You can refer Setup link to setup to configure Apache Hadoop.

Project Dependency We have to add Hadoop dependency in POM.xml of Maven project. We can create a maven based Java project and add the below Hadoop core dependency in POM. In this application I am using Hadoop 1.x version.

<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
    </dependency>

Mapper Program Mapper program split input file and spawned map task for each splits. Map task transform input split record into Key-value pair where Key and Value should be implement WritableComparable interface. Writable Interface provides the capabilities to write the data into disk and sort it. The Number of map task will be decided based on InputSplits defined in InputFormat. The split is a logically split not physical splits. The MapReduce first invoke setup () method of context and then invoke map (Object, Object, Context) for each input split and at last invoke cleanup (Context) method for cleanup activity. We extend Mapper class to basic generic Mapper<Key2, Value1, Key2, Value2> class which indicate the input & out put key and value. Mapper class could be overwrite map () method to process the input data as below

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] keyvalue = line.split(&amp;amp;quot;,&amp;amp;quot;);
            cntText.set(new Text(keyvalue[0]));
            stateText.set(keyvalue[1]);
            populat.set(Integer.parseInt(keyvalue[3]));
            Country cntry = new Country(cntText, stateText);
            context.write(cntry, populat);
        }
        

If you see above map method it pass Key as Object and value as Text. Value contains each line of file. Inside map method we split the line and create the Key-value and pass to intermediate area through context. Context will fill the io buffer with key-value mapper out and later spills to local disk. Below is the custom Country key, which I used for sorting the value country wise and then state wise. Below is country key which implements WritableComparable interface

private static class Country implements WritableComparable&amp;amp;lt;Country&amp;amp;gt; {
Text country;
        Text state;
        public Country(Text country, Text state) {
            this.country = country;
            this.state = state;
        }
        public Country() {
            this.country = new Text();
            this.state = new Text();
        }
public void write(DataOutput out) throws IOException {
            this.country.write(out);
            this.state.write(out);
        }
public void readFields(DataInput in) throws IOException {
            this.country.readFields(in);
            this.state.readFields(in);
        }
public int compareTo(Country pop) {
            if (pop == null)
                return 0;
            int intcnt = country.compareTo(pop.country);
            if (intcnt != 0) {
                return intcnt;
            } else {
                return state.compareTo(pop.state);

            }
        }
@Override
        public String toString() {
return country.toString() + &amp;amp;quot;:&amp;amp;quot; + state.toString();
        }
    }

In the above Key I have implemented compare method that will be used MapReduce program to sort the values. Below is the full mapper code

public  class GroupMapper extends Mapper&amp;amp;lt;LongWritable, Text, Country, IntWritable&amp;amp;gt; {
Country cntry = new Country();
Text cntText = new Text();
Text stateText = new Text();
IntWritable populat = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
            String[] keyvalue = line.split(&amp;amp;quot;,&amp;amp;quot;);
            cntText.set(new Text(keyvalue[0]));
            stateText.set(keyvalue[1]);
            populat.set(Integer.parseInt(keyvalue[3]));
            Country cntry = new Country(cntText, stateText);
            context.write(cntry, populat);

        }
    }

The map task output grouped by sorted key writes to the local disk and intermediate data. The grouping of map output defined by partition that identifies the reducer for each key. MapReduce also provide local combiner which combine intermediate map output and pass to the reducer. It helps to cut down the amount of data transferred from the Map to the Reducer. Here org.apache.hadoop.io.Text class as key, which provides methods to searlized, and compare texts at byte level. The value IntWritable again searlized as it implements writable interface and support sorting as it implements WritableComparable interface. Reducer: The Reducer copy intermediate map tasks output from local disk through http and pass to individual reducer based on each key. Before invoking individual reduce task Reducer shuffle, merge and sort key-value. Reduce task process collection of value for single key. Reducer process collection of values for each key and writ to the disk. Reducer class implements generic Reducer class as below public static class GroupReducer extends Reducer<Country, IntWritable, Country, IntWritable> { The generic Reducer class defined input & output key-value here we are getting input in <Country, IntWritable> and spill the out as <Country, IntWritable> Below is the reduce method which will be spawned by Reducer method for each Key that means each reduce task would be having single key with collection of values.

public void reduce(Country key, Iterator&amp;amp;lt;IntWritable&amp;amp;gt; values, Context context) throws IOException,
                InterruptedException {
int cnt = 0;
            while (values.hasNext()) {
                cnt = cnt + values.next().get();
            }
            context.write(key, new IntWritable(cnt));

        }
        

Job: For running the MapReduce we have to set the Mapper, Reducers and other property in JobConf. JobConf is the main configuration class, which configure MapReduce parameters such as Mapper, Reducer, Combiner, InputFormat, OutputFormat, and Comparator etc. Below code shows how to create and run the job based on above map and reduce code

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        FileUtils.deleteDirectory(new File(&amp;amp;quot;/Local/data/output&amp;amp;quot;));
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, &amp;amp;quot;GroupMR&amp;amp;quot;);
        job.setJarByClass(GroupMR.class);
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);
        job.setOutputKeyClass(Country.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setMaxInputSplitSize(job, 10);
        FileInputFormat.setMinInputSplitSize(job, 100);
        FileInputFormat.addInputPath(job, new Path(&amp;amp;quot;/Local/data/Country.csv&amp;amp;quot;));
        FileOutputFormat.setOutputPath(job, new Path(&amp;amp;quot;/Local/data/output&amp;amp;quot;));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

This main method calls the MapReduce Job. Before calling the job we need to set the MapperClass, ReducerClass, OutputKeyClass and OutputValueClass. We can also set the FileInputFormat and FileOutputFormat. FileInputFormat also use to decide number of map tasks. We can also set input split size, which take part while deciding number of map task s below. Split size can min between maxSize and blockSizeMath.min(maxSize, blockSize) As per above code, framework will execute the map-reduce job as described in Job.

Running To run the MapReduce program is quite straight forward, what we need to do package the java application in JAR say hadoopessence.jar and run as below in command line hadoop jar hadoopessence.jar.jar org.techmytalk.hadoopessence. GroupMR /input folder and /output folder in HDFS For testing we can directly run this application as Run As in eclipse.

Download Source We can download full source using download link

Summary: In this article I explained basic Map Reduce to compute Group By operation Use download link to download full source code.

Reference Hadoop Essence: The Beginner’s Guide to Hadoop & Hive

JAVA NIO – FileLock

Locking on File introduced with java.nio package that implemented by FileChannel. A FileLock is two types exclusive and shared. A shared lock allows other concurrently running program to acquire overlapping shared Lock. An Exclusive lock doesn’t allow other program to acquire overlapping lock.

Lock ideally supports shared Lock but it depends OS if OS doesn’t support share lock then it will act as exclusive locking. Locking applied on File not per channel or thread means if channel obtained the exclusive lock then other JVM’ channel will be block till first thread channel resume the lock.

Locks are associated with a file, not with individual file handles or channels Locking applied on File not per channel or thread means if channel obtained the lock before processing the file then other JVM’ channel will be block till first thread channel resume the lock. Since Lock is on File therefore it is not advisable to use lock on a single JVM.

Below is FileLock API in FileChannel class

public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel,
GatheringByteChannel, ScatteringByteChannel {
public abstract FileLock lock(long position, long size, boolean shared) throws IOException;
public final FileLock lock() throws IOException {
return lock(0L, Long.MAX_VALUE, false);
}
public abstract FileLock tryLock(long position, long size, boolean shared) throws IOException;
public final FileLock tryLock() throws IOException {
return tryLock(0L, Long.MAX_VALUE, false);
}
}

The FileLock (long position, long size, boolean shared) acquire lock on specified part of a file. The region of file specified by beginning position and size whereas last argument specified if lock is shared lock (value True) of exclusive (value False). To obtain the shared lock, you need to open file with read permission whereas write permission require exclusive lock.

We can use lock () method to acquire lock on whole file not the specified area on file. File can grow therefore we need to specify the size while obtaining the lock.

Method tryLock on specified area or on full file doesn’t block/hold while acquiring a lock means it immediately return value without going to wait to acquire lock. If other process already acquired lock for specific file then it will immediate return null.

FileLock API specified as below.

public abstract class FileLock {
public final FileChannel channel( )
public final long position( )
public final long size( )
public final boolean isShared( )
public final boolean overlaps (long position, long size)
public abstract boolean isValid( );
public abstract void release( ) throws IOException;
}

The FileLock encapsulates specific region, which is acquired by FileChannel. FileLock associate with specific FileChannel instance and FileLock keep the reference of FileChannel that could be determining by channel () method in FileLock.

FileLock lifecycle will start when lock method or tryLock invoke from FileChannel and end when release () method called. It FileLock also invalid if JVM shutdown or associated channel closed.

isShared() method return whether lock is shared or exclusive. If shared lock is not supported by operating system then this method always return false.

FileLock objects are thread-safe; multiple threads may access a lock object concurrently.

Finally, overlaps(long position, long size) return if lock overlap the given block range or not.

FileLock object associated with an underlying file which occurred deadlock if you don’t release the lock hence it is advisable to always release lock as mentioned below

FileLock lock = fileChannel.lock( )
try {
..............
} catch (IOException) [
..................
} finally {
lock.release( )
}

JAVA NIO – Memory-Mapped File

In Java’s earlier version, It uses FileSystem conventional API to access system file. Behind the scene JVM makes read () and write () system call to transfer data from OS kernel to JVM. JVM utilize it memory space to load and process file that causes trouble on large data file processes. File page convert to JVM system before processing the file data because OS handle file in page but JVM uses byte stream that is not compatible with page. In JDK version 1.4 and above Java provides MappedByteBuffer which help to establish a virtual memory mapping from JVM space to filesystem pages. This removes the overhead of transferring and coping the file’s content from OS kernel space to JVM space. OS uses Virtual Memory to cache the file in outside of Kernel space that could be sharable with other non-kernel process. Java maps the File pages to MappedByteBuffer directly and process these file without loading into JVM.

Belo diagram show how JVM mapped the file with MappedByteBuffer and process the file without loading the file in JVM.

JAVA NIO

MappedByteBuffer directly map with open file in Virtual Memory by using map method in FileChannel. The MappedByteBuffer object work like buffer but its data stored in a file on Virtual Memory. The get() method on MappedByteBuffer fetch the data from file which will represent current file data stored inside disk. In similar way put () method update the content directly on disk and modified content will be visible to other readers of the file. Processing file through MappedByteBuffer has big advantage because it doesn’t make any system call to read/write on file that improve the latency. Apart from that File in Virtual Memory cache the memory pages that will directly access by MappedByteBuffer and doesn’t consumes JVM space. The only drawback is the it throw page fault if requested page is not in Memory.

Below some of the key benefits of MappedByteBuffer:

  1. The JVM directly process on Virtual Memory hence it will avoid system read () and write() call.
  2. JVM doesn’t load file in its memory besides it uses Virtual Memory that bring the ability to process large data in efficient manner.
  3. OS mostly take care of reading and writing from shared VM without using JVM
  4. It could be used more than one process based on locking provided by OS. We will be discussing locking in later.
  5. This also provides ability map a region or part of file.
  6. The file data always mapped with disk file data without using buffer transfer.

 

Note: JVM invokes the shared VM, page fault generated that push the file data from OS to shared memory. The other benefit of memory mapping is that operating system automatically manages VM space.

 

public abstract class FileChannel
   extends AbstractInterruptibleChannel
   implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel
   {
   public abstract MappedByteBuffer map(MapMode mode,
           long position, long size) throws IOException;
   public static class MapMode {
   }
   }
   public static final MapMode READ_ONLY
   public static final MapMode READ_WRITE
   public static final MapMode PRIVATE

As per above code map () method on FileChannel pass the arguments as mode, position and size and return MappedByteBuffer for part of file. We can also map entire file by using as below

buffer =
fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

If you increase the size larger than file size then file will also become large to match the size of MappedByteBuffer in write mode but throw IOException in read mode because file can not modified in read mode. There are three type of mapping mode in map() method

  • READ_ONLY: Read only
  • READ_WRITE: Read and update the file
  • PRIVATE: Change in MappedByteBuffer will not reflect to File.

MappedByteBuffer will also not be visible to other programs that have mapped the same file; instead, they will cause private copies of the modified portions of the buffer to be created.

Map()method will throw NonWritableChannelException if it try to write on MapMode.READ_WRITE mode. NonReadableChannelException will be thrown if you request read on channel not open on read mode.

 

Below sample code show how to use MappedByteBuffer

public class MemoryMappedSample {
  public static void main(String[] args) throws Exception {
       int count = 10;
       RandomAccessFile memoryMappedFile = new RandomAccessFile("bigFile.txt", "rw");
       MappedByteBuffer out = memoryMappedFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, count);
       for (int i = 0; i < count; i++) {
           out.put((byte) 'A');
       }
       for (int i = 0; i < count; i++) {
           System.out.print((char) out.get(i));
       }
   }
}

JAVA NIO – Channel

A channel uses ByteBuffer to transfer I/O from source buffers to target buffers. We can pass the data into Buffer, which send to Channel or Channel push the data into Buffer back.

Channels provide direct connection with IO to transport data from OS ByteBuffer and File or Sockets. Channel uses buffers to send and receive data, which is compatible with OS ByteBuffer and minimize the overhead to access the OS’s filesystem.

A channel provides open connection to I/O such as network socket, file etc. to read and write operations. Channel could not directly open but could be open by calling open method on RandomAccessFile, FileInputStream, or FileOutputStream object. A Channel interface provides close method and once channel closed it could not able to reopen and throws exception ClosedChannelException.

Below is the Channel interface

public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}

Channel interface depends upon operating system native calls provide by OS provider. It provides isOpen () to check is channel is open and close () to close the open channel,

Below diagram shows overall package diagram

JAVA NIO Channel

The InterruptibleChannel interface is marker that could be asynchronously closed and interrupted, means if a thread is blocked in an I/O operation on as interruptible channel then thread may invoke close method asynchronously.

WritableByteChannel interface that extends Channel interface provide write (ByteBuffer src) method to write a sequence of bytes to this channel from the given buffer.

RedableByteChannel interface have read (ByteBuffer) method that reads sequence of bytes from channel into the buffer passed as argument. Only one thread can invoke read operation and if other thread initiate that time it will block until the first operation is complete.

The NIO provide concretes interfaces with various implementation in SPI package which could be change by different provider. SelectableChannel uses Selector to multiplex ByteBuffer. Package java.nio.channels.spi provide implementation of various channel. For instance AbstractInterruptibleChannel and AbstractSelectableChannel, provide the methods needed by channel implementations that are interruptible or selectable, respectively.

A Channel handle communicates with I/O in both direction similar to InputStream and OutputStream. It is capable to handle concurrent write & read, means it doesn’t block I/O to be read or write.

Channels Creations

There are two types of channel File and Sockets. Sockets channels are three types SocketChannel, ServerSocketChannel and DatagramChannel. Channel could be created in several ways for instance socket channels have factory method to create new socket channels whereas FileChannel could be created by calling getChannel () in IO Stream / RandomAccessFile.

SocketChannel socketchannel = SocketChannel.open( );
socketchannel.connect (new InetSocketAddress ("host", someport));
ServerSocketChannel channel = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));
DatagramChannel channel = DatagramChannel.open( );
RandomAccessFile randaccfile = new RandomAccessFile ("file", "r");
FileChannel channel = randaccfile.getChannel( );

Channels could be unidirectional or bidirectional for instance if class implements ReadableByteChannel and WritableByteChannel both then this class will be bidirectional channel but if class implements any one of them then it will be unidirectional channel.

Selector

Selector provides capabilities to process single IO operation across multiple buffers. Selector class uses multiplex to merge multiple stream into single stream and DE-multiplex to separate single stream to multiple stream. For example write data is gathered from multiple buffers and sent to Channel.

Selector operation intern call OS native call to fill or drain data directly without directly copying to buffer.

public interface ScatteringByteChannel extends ReadableByteChannel {
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
public long read(ByteBuffer[] dsts) throws IOException;
}

public interface GatheringByteChannel extends WritableByteChannel {
public long write(ByteBuffer[] srcs) throws IOException;
}
public interface WritableByteChannel extends Channel {
public int write(ByteBuffer src) throws IOException;

}