Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)

  

1. Hadoop Streaming方式运行程序

Hadoop Streaming可以运行除JAVA语言以外,其它的语言编写的程序。其启动脚本示例如下:

             1 #!/bin/sh 2 3# 参数合法性判断
 4 5if [ $# != 7 ]; then 6echo"./bin/avp_platform_startup.sh [USER_NAME] [INPUT_PAT] [OUTPUT_PAT] [MAP_TASKS] [REDUCE_TASKS] [CLASS_ID] [CODE_TYPE]" 7    exit
 8fi 910# GLOBAL VARS
11 USER_NAME=$112 INPUT_PAT=$213 OUTPUT_PAT=$314 MAP_TASKS=$415 REDUCE_TASKS=$516 CLASS_ID=$617 CODE_TYPE=$71819# Hadoop Start Area
20 /home/work/software/hadoop/bin/hadoop jar /home/work/software/hadoop/contrib/streaming/hadoop-streaming.jar 21     -input /home/$USER/$USER_NAME/output/webpage/$INPUT_PAT/22     -output /home/$USER/$USER_NAME/output/avp/avp-extract-$USER_NAME\_nlp_$OUTPUT_PAT-`date +%F-%H-%M-%S` 23     -mapper "avp_extract_mapper.sh . . $CODE_TYPE"24     -reducer "avp_extract_reducer.sh"25     -file ./script/avp_extract_mapper.sh26     -file ./script/avp_extract_reducer.sh27     -file ./script/extract_tools/tidy_page.py 28     -file ./script/decode.pl 29     -file ./script/extract_tools/format.py 30     -file ./script/extract_tools/extract_tool.py 31     -file ./class/$CLASS_ID/site.list 32     -file ./class/$CLASS_ID/*.conf 33    -jobconf mapred.job.name=$USER_NAME\_avp_platform_extract_tools_$OUTPUT_PAT 34    -jobconf mapred.map.tasks=$MAP_TASKS 35    -jobconf mapred.reduce.tasks=$REDUCE_TASKS

2. JAVA原生接口编写HADOOP程序

第一步,需要将用JAVA编写的代码打包成JAR包。

<SPAN style=‘font-family: "courier new", courier;‘>MaxTemperatureMapper.java

         1
        package
         oldapi;

         2
         3
        import
         java.io.IOException;

         4
         5
        import
         org.apache.hadoop.io.IntWritable;

         6
        import
         org.apache.hadoop.io.LongWritable;

         7
        import
         org.apache.hadoop.io.Text;

         8
        import
         org.apache.hadoop.mapred.MapReduceBase;

         9
        import
         org.apache.hadoop.mapred.Mapper;

        10
        import
         org.apache.hadoop.mapred.OutputCollector;

        11
        import
         org.apache.hadoop.mapred.Reporter;

        12
        13
        public
        class MaxTemperatureMapper extends MapReduceBase
14implements Mapper<LongWritable, Text, Text, IntWritable> {
1516privatestaticfinalint MISSING = 9999;
1718publicvoid map(LongWritable key, Text value,
19       OutputCollector<Text, IntWritable> output, Reporter reporter)
20throws IOException {
2122     String line = value.toString();
23     String year = line.substring(15, 19);
24int airTemperature;
25if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus signs26       airTemperature = Integer.parseInt(line.substring(88, 92));
27     } else {
28       airTemperature = Integer.parseInt(line.substring(87, 92));
29    }
30     String quality = line.substring(92, 93);
31if (airTemperature != MISSING && quality.matches("[01459]")) {
32       output.collect(new Text(year), new IntWritable(airTemperature));
33    }
34  }
35 }

<SPAN style=‘font-family: "courier new", courier; font-size: 16px;‘>MaxTemperatureReducer.java

         1
        package
         oldapi;

         2
         3
        import
         java.io.IOException;

         4
        import
         java.util.Iterator;

         5
         6
        import
         org.apache.hadoop.io.IntWritable;

         7
        import
         org.apache.hadoop.io.Text;

         8
        import
         org.apache.hadoop.mapred.MapReduceBase;

         9
        import
         org.apache.hadoop.mapred.OutputCollector;

        10
        import
         org.apache.hadoop.mapred.Reducer;

        11
        import
         org.apache.hadoop.mapred.Reporter;

        12
        13
        public
        class MaxTemperatureReducer extends MapReduceBase
14implements Reducer<Text, IntWritable, Text, IntWritable> {
1516publicvoid reduce(Text key, Iterator<IntWritable> values,
17       OutputCollector<Text, IntWritable> output, Reporter reporter)
18throws IOException {
1920int maxValue = Integer.MIN_VALUE;
21while (values.hasNext()) {
22       maxValue = Math.max(maxValue, values.next().get());
23    }
24     output.collect(key, new IntWritable(maxValue));
25  }
26 }

<SPAN style=‘font-family: "courier new", courier; font-size: 16px;‘>MaxTemperature.java

         1
        package
         oldapi;

         2
         3
        import
         java.io.IOException;

         4
         5
        import
         org.apache.hadoop.fs.Path;

         6
        import
         org.apache.hadoop.io.IntWritable;

         7
        import
         org.apache.hadoop.io.Text;

         8
        import
         org.apache.hadoop.mapred.FileInputFormat;

         9
        import
         org.apache.hadoop.mapred.FileOutputFormat;

        10
        import
         org.apache.hadoop.mapred.JobClient;

        11
        import
         org.apache.hadoop.mapred.JobConf;

        12
        import
         org.apache.hadoop.util.Tool;

        13
        import
         org.apache.hadoop.util.ToolRunner;

        14
        import
         org.apache.hadoop.conf.Configured;

        15
        import
         oldapi.MaxTemperatureMapper;

        16
        import
         oldapi.MaxTemperatureReducer;

        17
        18
        public
        class MaxTemperature extends Configured implements Tool{
1920publicint run(String[] args) throws IOException {
21if (args.length != 2) {
22       System.err.println("Usage: MaxTemperature <input path> <output path>");
23       System.exit(-1);
24    }
2526     JobConf conf = new JobConf(MaxTemperature.class);
27     conf.setJobName("Max temperature");
2829     FileInputFormat.addInputPath(conf, new Path(args[0]));
30     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
3132     conf.setMapperClass(MaxTemperatureMapper.class);
33     conf.setCombinerClass(MaxTemperatureReducer.class);
34     conf.setReducerClass(MaxTemperatureReducer.class);
3536     conf.setOutputKeyClass(Text.class);
37     conf.setOutputValueClass(IntWritable.class);
3839    JobClient.runJob(conf);
40return 0;
41  }
4243publicstaticvoid main(String[] args) throws Exception {
44int exitCode = ToolRunner.run(new MaxTemperature(),args);
45      System.exit(exitCode);
46  }
4748 }

打包命令:(即会生成MaxTemperature.jar文件,-C参数指定的文件夹目录结构:MaxTemperature/oldapi/*.java)

        1 jar cvf MaxTemperature.jar -C MaxTemperature/ .

第二步,启动运行。

        1
        sudo -u sniper hadoop jar MaxTemperature.jar oldapi.MaxTemperature /home/sniper/zhuliang/sample.txt /home/sniper/zhuliang/max-temp-output

附:输入文件样例

        1
        0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999

        2
        0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999

        3
        0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

        4
        0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999

        5 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

原文:http://www.cnblogs.com/bjzhuliang/p/3618724.html

相关文章