本文共 4796 字,大约阅读时间需要 15 分钟。
常见的outputformat实现类
使用场景:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lPyRQCi4-1605151404282)(https://s1.ax1x.com/2020/11/02/BrGyK1.png)]
实例操作:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MmT3uXeL-1605151404284)(https://s1.ax1x.com/2020/11/02/BrGqVf.png)]
Mapper:
package com.out.io;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class outMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String k = value.toString() + "\n"; Text kk = new Text(); kk.set(k); context.write(kk,NullWritable.get()); }}
Reducer:
package com.out.io;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class outReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (NullWritable i:values) { context.write(key,NullWritable.get()); } }}
outformat:
package com.out.io;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.lib.FilterOutputFormat;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;//import javax.xml.soap.Text;public class outformat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new FilterRecord(taskAttemptContext); }}
文件读写类,实现主要逻辑:RecordWriter
package com.out.io;//import org.apache.commons.lang.ObjectUtils;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecord extends RecordWriter{ FSDataOutputStream fosatguigu = null; FSDataOutputStream fosother = null; public FilterRecord(TaskAttemptContext job) { FileSystem fs; try{ fs = FileSystem.get(job.getConfiguration()); fosatguigu = fs.create(new Path("d:/mapreduceoutput/out5/atguidu.log")); fosother = fs.create(new Path("d:/mapreduceoutput/out5/other.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException { if (text.toString().contains("atguigu")){ fosatguigu.write(text.toString().getBytes()); } else { fosother.write(text.toString().getBytes()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //fosatguigu.close(); //fosother.close(); IOUtils.closeStream(fosatguigu); IOUtils.closeStream(fosother); }}
Driver:
package com.out.io;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class outDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{ "d:/mapreduceinput/input5","d:/mapreduceoutput/out5"}; Job job = Job.getInstance(new Configuration()); job.setJarByClass(outDriver.class); job.setMapperClass(outMapper.class); job.setReducerClass(outReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(outformat.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}
转载地址:http://aicki.baihongyu.com/