從Job變成JobConf

ChainReducer和ChainMapper可以把mapper加在mapper或reducer後面

都只有addMapper和setReducer兩個方法@@ 為什麼沒有addReducer?

 

 

mapper.java

package Sample;

import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class mapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {

System.out.println("mapper1~key:"+key.toString()+" value:"+value.toString());
output.collect(key, value);
}
}

reducer.java

package Sample;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class reducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
while(values.hasNext())
{
Text val = values.next();
System.out.println("reducer1 key:"+key.toString()+" value:"+val.toString());
output.collect(key, val);
}
}
}

HalloHadoop.java

package Sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class HelloHadoop {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();

JobConf job = new JobConf(conf);
job.setJobName("Hello JobConf");


job.setJarByClass(HelloHadoop.class);
job.setMapperClass(mapper.class);
job.setCombinerClass(reducer.class);
job.setReducerClass(reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(KeyValueTextInputFormat.class);

KeyValueTextInputFormat.addInputPath(job, new Path("hdfs://XXXX/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://XXXX/output/"));
/*
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer (job, reducer2.class, Text.class, Text.class, Text.class, Text.class, false, reduceConf);
ChainReducer.addMapper(job, mapper2.class, Text.class, Text.class, Text.class, Text.class, false, null);
*/
JobClient.runJob(job);

}
}

arrow
arrow
    全站熱搜

    natsumi93 發表在 痞客邦 留言(0) 人氣()