Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
1. Hadoop Streaming方式运行程序
Hadoop Streaming可以运行除JAVA语言以外,其它的语言编写的程序。其启动脚本示例如下:
1 #!/bin/sh 2 3# 参数合法性判断 4 5if [ $# != 7 ]; then 6echo"./bin/avp_platform_startup.sh [USER_NAME] [INPUT_PAT] [OUTPUT_PAT] [MAP_TASKS] [REDUCE_TASKS] [CLASS_ID] [CODE_TYPE]" 7 exit 8fi 910# GLOBAL VARS 11 USER_NAME=$112 INPUT_PAT=$213 OUTPUT_PAT=$314 MAP_TASKS=$415 REDUCE_TASKS=$516 CLASS_ID=$617 CODE_TYPE=$71819# Hadoop Start Area 20 /home/work/software/hadoop/bin/hadoop jar /home/work/software/hadoop/contrib/streaming/hadoop-streaming.jar 21 -input /home/$USER/$USER_NAME/output/webpage/$INPUT_PAT/22 -output /home/$USER/$USER_NAME/output/avp/avp-extract-$USER_NAME\_nlp_$OUTPUT_PAT-`date +%F-%H-%M-%S` 23 -mapper "avp_extract_mapper.sh . . $CODE_TYPE"24 -reducer "avp_extract_reducer.sh"25 -file ./script/avp_extract_mapper.sh26 -file ./script/avp_extract_reducer.sh27 -file ./script/extract_tools/tidy_page.py 28 -file ./script/decode.pl 29 -file ./script/extract_tools/format.py 30 -file ./script/extract_tools/extract_tool.py 31 -file ./class/$CLASS_ID/site.list 32 -file ./class/$CLASS_ID/*.conf 33 -jobconf mapred.job.name=$USER_NAME\_avp_platform_extract_tools_$OUTPUT_PAT 34 -jobconf mapred.map.tasks=$MAP_TASKS 35 -jobconf mapred.reduce.tasks=$REDUCE_TASKS
2. JAVA原生接口编写HADOOP程序
第一步,需要将用JAVA编写的代码打包成JAR包。
<SPAN style=‘font-family: "courier new", courier;‘>MaxTemperatureMapper.java
1 package oldapi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapReduceBase; 9 import org.apache.hadoop.mapred.Mapper; 10 import org.apache.hadoop.mapred.OutputCollector; 11 import org.apache.hadoop.mapred.Reporter; 12 13 public class MaxTemperatureMapper extends MapReduceBase 14implements Mapper<LongWritable, Text, Text, IntWritable> { 1516privatestaticfinalint MISSING = 9999; 1718publicvoid map(LongWritable key, Text value, 19 OutputCollector<Text, IntWritable> output, Reporter reporter) 20throws IOException { 2122 String line = value.toString(); 23 String year = line.substring(15, 19); 24int airTemperature; 25if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus signs26 airTemperature = Integer.parseInt(line.substring(88, 92)); 27 } else { 28 airTemperature = Integer.parseInt(line.substring(87, 92)); 29 } 30 String quality = line.substring(92, 93); 31if (airTemperature != MISSING && quality.matches("[01459]")) { 32 output.collect(new Text(year), new IntWritable(airTemperature)); 33 } 34 } 35 }
<SPAN style=‘font-family: "courier new", courier; font-size: 16px;‘>MaxTemperatureReducer.java
1 package oldapi; 2 3 import java.io.IOException; 4 import java.util.Iterator; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapReduceBase; 9 import org.apache.hadoop.mapred.OutputCollector; 10 import org.apache.hadoop.mapred.Reducer; 11 import org.apache.hadoop.mapred.Reporter; 12 13 public class MaxTemperatureReducer extends MapReduceBase 14implements Reducer<Text, IntWritable, Text, IntWritable> { 1516publicvoid reduce(Text key, Iterator<IntWritable> values, 17 OutputCollector<Text, IntWritable> output, Reporter reporter) 18throws IOException { 1920int maxValue = Integer.MIN_VALUE; 21while (values.hasNext()) { 22 maxValue = Math.max(maxValue, values.next().get()); 23 } 24 output.collect(key, new IntWritable(maxValue)); 25 } 26 }
<SPAN style=‘font-family: "courier new", courier; font-size: 16px;‘>MaxTemperature.java
1 package oldapi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.FileInputFormat; 9 import org.apache.hadoop.mapred.FileOutputFormat; 10 import org.apache.hadoop.mapred.JobClient; 11 import org.apache.hadoop.mapred.JobConf; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14 import org.apache.hadoop.conf.Configured; 15 import oldapi.MaxTemperatureMapper; 16 import oldapi.MaxTemperatureReducer; 17 18 public class MaxTemperature extends Configured implements Tool{ 1920publicint run(String[] args) throws IOException { 21if (args.length != 2) { 22 System.err.println("Usage: MaxTemperature <input path> <output path>"); 23 System.exit(-1); 24 } 2526 JobConf conf = new JobConf(MaxTemperature.class); 27 conf.setJobName("Max temperature"); 2829 FileInputFormat.addInputPath(conf, new Path(args[0])); 30 FileOutputFormat.setOutputPath(conf, new Path(args[1])); 3132 conf.setMapperClass(MaxTemperatureMapper.class); 33 conf.setCombinerClass(MaxTemperatureReducer.class); 34 conf.setReducerClass(MaxTemperatureReducer.class); 3536 conf.setOutputKeyClass(Text.class); 37 conf.setOutputValueClass(IntWritable.class); 3839 JobClient.runJob(conf); 40return 0; 41 } 4243publicstaticvoid main(String[] args) throws Exception { 44int exitCode = ToolRunner.run(new MaxTemperature(),args); 45 System.exit(exitCode); 46 } 4748 }
打包命令:(即会生成MaxTemperature.jar文件,-C参数指定的文件夹目录结构:MaxTemperature/oldapi/*.java)
1 jar cvf MaxTemperature.jar -C MaxTemperature/ .
第二步,启动运行。
1 sudo -u sniper hadoop jar MaxTemperature.jar oldapi.MaxTemperature /home/sniper/zhuliang/sample.txt /home/sniper/zhuliang/max-temp-output
附:输入文件样例
1 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999 2 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 3 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 4 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 5 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
原文:http://www.cnblogs.com/bjzhuliang/p/3618724.html