從Job變成JobConf
ChainReducer和ChainMapper可以把mapper加在mapper或reducer後面
都只有addMapper和setReducer兩個方法@@ 為什麼沒有addReducer?
mapper.java
package Sample;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class mapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
System.out.println("mapper1~key:"+key.toString()+" value:"+value.toString());
output.collect(key, value);
}
}
reducer.java
package Sample;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class reducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
while(values.hasNext())
{
Text val = values.next();
System.out.println("reducer1 key:"+key.toString()+" value:"+val.toString());
output.collect(key, val);
}
}
}
HalloHadoop.java
package Sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
public class HelloHadoop {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
JobConf job = new JobConf(conf);
job.setJobName("Hello JobConf");
job.setJarByClass(HelloHadoop.class);
job.setMapperClass(mapper.class);
job.setCombinerClass(reducer.class);
job.setReducerClass(reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path("hdfs://XXXX/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://XXXX/output/"));
/*
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer (job, reducer2.class, Text.class, Text.class, Text.class, Text.class, false, reduceConf);
ChainReducer.addMapper(job, mapper2.class, Text.class, Text.class, Text.class, Text.class, false, null);
*/
JobClient.runJob(job);
}
}
留言列表