Article From:https://www.cnblogs.com/tele-share/p/9688174.html

MapReduceDefault InputFormat is TextInputFormat, key is offset, value is text, custom InputFormat needs to implement FileInputFormat, and override createRecorderMethod, you can also override isSplitable () to set whether to slice, override createRecordReader also requires custom RecordReader, InputFormat specifies key, value is what,RecordReader is the specific read logic. The following example is merging small files. The final output K is the file path, and V is the binary byte of the file.

1.InputFormat

 1 /**
 2  * Custom InputFormat specifies the K, V for reading files. 3  * @author tele
 4  *
 5  */
 6 public class MyInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
 7     /**
 8      * Set up no slicing, use small files as a whole. 9      */
10     @Override
11     protected boolean isSplitable(JobContext context, Path filename) {
12         return false;
13     }
14     
15     @Override
16     public RecordReader<NullWritable,BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
17             throws IOException, InterruptedException {
18         MyRecordReader recordReader = new MyRecordReader();
19         recordReader.initialize(split, context);
20         return recordReader;
21     }
22 }

2.RecordReader

 1 /**
 2  * recordreaderIt is used to read file contents, output file contents, and file path information is kept in split. 3  * @author tele
 4  *
 5  */
 6 public class MyRecordReader extends RecordReader<NullWritable,BytesWritable> {
 7     FileSplit split;
 8     BytesWritable value = new BytesWritable();
 9     boolean flag = false;
10     Configuration conf;
11     int count = 0;
12     
13     /**
14      * Initialization15      */
16     @Override
17     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
18         this.split = (FileSplit) split;
19         conf = context.getConfiguration();    conf = context.getConfiguration();
20     }
21 
22     /**
23      * Business logic processing, this method is used to determine whether there is still a file content to read, will enter twice, the first read content into value, return true, the second call return false24      * If you return to true, you will call getCurrentKey ().GetCurrentValue () to return the content to map.25      * 
26      */
27     @Override
28     public boolean nextKeyValue() throws IOException, InterruptedException {
29         count++;
30         if(!flag) {
31             //Get FS
32             FileSystem fs = FileSystem.get(conf);
33             //Open flow
34             Path path = this.split.getPath();
35             FSDataInputStream fsDataInputStream = fs.open(path);
36             long length = this.split.getLength();
37             byte[] buf = new byte[(int) length];
38             
39             //read
40             IOUtils.readFully(fsDataInputStream, buf, 0,buf.length);
41             value.set(buf, 0, buf.length);
42             
43             //Closed flow
44             IOUtils.closeStream(fsDataInputStream);
45             flag = true;
46         }else {
47             flag = false;
48         }
49         return flag;
50     }
51 
52     @Override
53     public NullWritable getCurrentKey() throws IOException, InterruptedException {
54         return NullWritable.get();
55     }
56 
57     @Override
58     public BytesWritable getCurrentValue() throws IOException, InterruptedException {
59         return value;
60     }
61 
62     @Override
63     public float getProgress() throws IOException, InterruptedException {
64         return flag?1:0;
65     }
66 
67     @Override
68     public void close() throws IOException {
69         
70     }
71 }

3.Mapper

 1 /**
 2  * Output the result to SequenceFileOutPutFormat, the output key is the file path, and value is the content of the file. 3  * @author tele
 4  *
 5  */
 6 public class InputformatMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable/*Text*/> {
 7     Text k = new Text();      
 8 
 9     @Override
10     protected void map(NullWritable key, BytesWritable value,
11             Mapper<NullWritable, BytesWritable, Text, BytesWritable/*Text*/>.Context context)
12             throws IOException, InterruptedException {
13         FileSplit split = (FileSplit) context.getInputSplit();
14         Path path = split.getPath();
15         
16         k.set(path.toString());
17         
18     /*    String result = new String(value.getBytes(),0,value.getLength());
19         context.write(k,new Text(result));*/
20         
21         context.write(k, value);
22     }
23 }

4.Driver(Because the output is bytes, you need to specify OutputFormat as SequenceFileOutputFormat.

 1 /**
 2  * drive 3  * @author tele
 4  *
 5  */
 6 public class InputformatDriver {
 7     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 8         //1.Get job example
 9         Configuration conf = new Configuration();
10         Job job = Job.getInstance(conf);
11         
12         //2.Associated class
13         job.setJarByClass(InputformatDriver.class);
14         job.setMapperClass(InputformatMapper.class);
15         
16         
17         //4.Setting up format
18         job.setInputFormatClass(MyInputFormat.class);
19         //Using SequenceFileOutputFormat as output format
20         job.setOutputFormatClass(SequenceFileOutputFormat.class);
21         
22         //5.data type
23         job.setOutputKeyClass(Text.class);
24         job.setOutputValueClass(BytesWritable.class);
25         
26     //    job.setOutputValueClass(Text.class);
27 
28         //6.Setting input and output paths
29         FileInputFormat.setInputPaths(job,new Path(args[0]));
30         FileOutputFormat.setOutputPath(job,new Path(args[1]));
31         
32         //7.Submission
33         boolean result = job.waitForCompletion(true);
34         System.exit(result?0:1);
35     }
36 }

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *