博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce-自定义outputformat
阅读量:3967 次
发布时间:2019-05-24

本文共 4796 字,大约阅读时间需要 15 分钟。

常见的outputformat实现类

使用场景:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lPyRQCi4-1605151404282)(https://s1.ax1x.com/2020/11/02/BrGyK1.png)]

实例操作:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MmT3uXeL-1605151404284)(https://s1.ax1x.com/2020/11/02/BrGqVf.png)]

Mapper:

package com.out.io;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class outMapper extends Mapper
{
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String k = value.toString() + "\n"; Text kk = new Text(); kk.set(k); context.write(kk,NullWritable.get()); }}

Reducer:

package com.out.io;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class outReducer extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
for (NullWritable i:values) {
context.write(key,NullWritable.get()); } }}

outformat:

package com.out.io;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.lib.FilterOutputFormat;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;//import javax.xml.soap.Text;public class outformat extends FileOutputFormat
{
@Override public RecordWriter
getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new FilterRecord(taskAttemptContext); }}

文件读写类,实现主要逻辑:RecordWriter

package com.out.io;//import org.apache.commons.lang.ObjectUtils;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecord extends RecordWriter
{
FSDataOutputStream fosatguigu = null; FSDataOutputStream fosother = null; public FilterRecord(TaskAttemptContext job) {
FileSystem fs; try{
fs = FileSystem.get(job.getConfiguration()); fosatguigu = fs.create(new Path("d:/mapreduceoutput/out5/atguidu.log")); fosother = fs.create(new Path("d:/mapreduceoutput/out5/other.log")); } catch (IOException e) {
e.printStackTrace(); } } @Override public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
if (text.toString().contains("atguigu")){
fosatguigu.write(text.toString().getBytes()); } else {
fosother.write(text.toString().getBytes()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//fosatguigu.close(); //fosother.close(); IOUtils.closeStream(fosatguigu); IOUtils.closeStream(fosother); }}

Driver:

package com.out.io;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class outDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{
"d:/mapreduceinput/input5","d:/mapreduceoutput/out5"}; Job job = Job.getInstance(new Configuration()); job.setJarByClass(outDriver.class); job.setMapperClass(outMapper.class); job.setReducerClass(outReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(outformat.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}

转载地址:http://aicki.baihongyu.com/

你可能感兴趣的文章
Platform总线
查看>>
Platform总线
查看>>
Linux驱动程序中的platform总线详…
查看>>
Linux驱动程序中的platform总线详…
查看>>
按键驱动--platform设备的例子
查看>>
按键驱动--platform设备的例子
查看>>
mini2440按键驱动及详细解释(转)
查看>>
mini2440按键驱动及详细解释(转)
查看>>
在中断上下文使用disable_irq()的…
查看>>
在中断上下文使用disable_irq()的…
查看>>
内核定时器
查看>>
内核定时器
查看>>
中断与内核定时器
查看>>
中断与内核定时器
查看>>
source insight的疑问
查看>>
source insight的疑问
查看>>
Linux输入子系统 input_dev 概述
查看>>
Linux输入子系统 input_dev 概述
查看>>
A new I/O memory access mechanis…
查看>>
A new I/O memory access mechanis…
查看>>