MapReduce Composite Key Operation-Part2

Composite Key Operation In complex operation where we require multi column operation, basic key may not help. For example if we need to calculate population group by country and state then selection of key matter. If we choose the composite key wisely it could solve problem easily. We can create composite key by implementing WritableComparable interface which make it use like any normal WritableComparable interface object. Below is the composite key with two fields country and state which overwrite compare method to do sorting based on country and state. It provides write() and readfields() method to searlized and de-searlized attributes.

private static class CompositeGroupKey implements
WritableComparable<CompositeGroupKey> {
String country;
String state;
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, country);
WritableUtils.writeString(out, state);
}
public void readFields(DataInput in) throws IOException {
this.country = WritableUtils.readString(in);
this.state = WritableUtils.readString(in);
}
public int compareTo(CompositeGroupKey pop) {
if (pop == null)
return 0;
int intcnt = country.compareTo(pop.country);
return intcnt == 0 ? state.compareTo(pop.state) : intcnt;
}
@Override
public String toString() {
return country.toString() + ":" + state.toString();
}
}

We are using above composite key to create MapReduce job to count total population group by Country and State. input

Country State City Population (Mil)
USA, CA Su 12
USA, CA SA 42
USA, CA Fr 23
USA, MO XY 23
USA, MO AB 19
USA, MO XY 23
USA, MO AB 19
IND, TN AT 11
IND, TN KL 10

output

Country State Total Population
IND TN 21
USA CA 77
USA MO 84

Mapper Program Once we define composite key we create the mapper class which use input generated from InputFormat. Input Format split the file and pass to individual Mapper which invoke multiple map tasks. Map task transform input split record into Key-value pair where Key and Value should be implement WritableComparable interface. Writable Interface provides the capabilities to write the data into disk and sort it. The Number of map task will be decided based on InputSplits defined in InputFormat. The split is a logically split not physical. The MapReduce first invoke setup () method of context and then invoke map (Object, Object, Context) for each input split and at last invoke cleanup (Context) method for cleanup activity. We extend Mapper class to basic generic Mapper<Key2, Value1, Key2, Value2> class which indicate the input & out put for key and value(s). Mapper class could be overwrite map () method to process the input data as below

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] keyvalue = line.split(",");
populat.set(Integer.parseInt(keyvalue[3]));
CompositeGroupKey cntry = new CompositeGroupKey(keyvalue[0], keyvalue[1]);
context.write(cntry, populat);
}

If you see above map method it pass Key as Object and value as Text. Value contains each line of file. Inside map method we split the line and create the Key-value and pass to intermediate area through context. Context will fill the io buffer with key-value mapper and later spills to local disk. The map task output grouped by sorted key and writes to the local disk as intermediate data. The grouping of map output defined by partition that identifies the reducer for each key. MapReduce could also provides local combiner which combine intermediate map output and pass to the reducer to cut down the amount of data transferred from the Map to the Reducer. Reducer: The Reducer copy intermediate map tasks output from local disk through http and pass to individual reducer based on each key. Before invoking individual reduce task, Reducer shuffle, merge and sort key-value. Reducer process collection of values for each key and write to the disk. Below is the reduce method which will be spawned by Reducer for each Key that means each reduce task would be having single key with collection of values

public void reduce(CompositeGroupKey key, Iterator<IntWritable> values,Context context) throws IOException, InterruptedException {
int cnt = 0;
while (values.hasNext()) {
cnt = cnt + values.next().get();
}
context.write(key, new IntWritable(cnt));
}

Job: For running the MapReduce we have to set the Mapper, Reducers and other property in JobConf. JobConf is the main configuration class, which configure MapReduce parameters such as Mapper, Reducer, Combiner, InputFormat, OutputFormat, and Comparator etc. Below code shows how to create and run the job based on above map and reduce code

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "GroupMR");
job.setJarByClass(GroupMR.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(CompositeGroupKey.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setMaxInputSplitSize(job, 10);
FileInputFormat.setMinInputSplitSize(job, 100);
FileInputFormat.addInputPath(job, new Path("/Local/data/Country.csv"));
FileOutputFormat.setOutputPath(job, new Path("/Local/data/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);

This main method calls the MapReduce Job. Before calling the job we need to set the MapperClass, ReducerClass, OutputKeyClass and OutputValueClass. We can also set the FileInputFormat and FileOutputFormat. Running: To run the MapReduce program is quite straight forward, what we need to do package the java application in JAR say hadoopessence.jar and run as below in command line hadoop jar hadoopessence.jar.jar org.hadoopessence.compositesorting /input folder and /output folder in HDFS Download: Click here to download source code Reference Hadoop Essence: The Beginner’s Guide to Hadoop & Hive

Advertisement