MapReduce编程之单词去重


在MR编程中,最典型的业务就是求sum,max,min,avg,distinct, group by 还有 join 等操作的实现了。事实上,无论是那种业务。 MapReduce的编程框架已经决定了要把mapper阶段计算出来的key-value会按照key做组划分。所以reduceTask当中的reduce方法,其实接收到的参数就是key相同的一组key-value,然后根据业务逻辑做规约。比如distinct操作。如果需要按照某个字段值进行去重,那么只需要把该要进行去重的字段做key就OK,然后在reducer阶段,再在每一组中输出一个key-value值即可。


下面以一个简单的单词去重作为例子:

直接上源码,部分解释在源码中,请细看:

package com.ghgj.mazh.mapreduce.distinct;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 作者: 马中华:http://blog.csdn.net/zhongqi2513
 * 日期: 2017年10月25日下午12:34:25
 * 
 * 描述:单词去重
 * 
 */
public class DistinctWordMR {

	public static void main(String[] args) throws Exception {
		// 指定hdfs相关的参数
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop06:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");

		Job job = Job.getInstance(conf);
		// 设置jar包所在路径
		job.setJarByClass(DistinctWordMR.class);

		// 指定mapper类和reducer类
		job.setMapperClass(DistinctWordMRMapper.class);
		job.setReducerClass(DistinctWordMRReducer.class);

		// 指定maptask的输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		// 指定reducetask的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 指定该mapreduce程序数据的输入和输出路径
//		Path inputPath = new Path("d:/wordcount/input");
//		Path outputPath = new Path("d:/wordcount/output");
		Path inputPath = new Path("/wc/input");
		Path outputPath = new Path("/wc/output");
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);

		// 最后提交任务
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);
	}

	/**
	 * 作者: 马中华:http://blog.csdn.net/zhongqi2513
	 * 日期: 2017年10月25日下午12:39:34
	 * 
	 * 描述:单词去重MR中的mapper组件。 读取文件然后切分出单词
	 */
	private static class DistinctWordMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

		private Text outkey = new Text();

		/**
		 * 在单词计数的场景中。 把单词作为key输出即可, 不用输出value
		 */
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String[] split = value.toString().split(" ");
			for (String word : split) {
				outkey.set(word);
				context.write(outkey, NullWritable.get());
			}
		}
	}

	/**
	 * 作者: 马中华:http://blog.csdn.net/zhongqi2513
	 * 日期: 2017年10月25日下午12:39:20
	 * 
	 * 描述:单词去重的MR程序的reducer组件
	 */
	private static class DistinctWordMRReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

			/**
			 * reduce方法没调用一次,就接收到一组相同的单词。所以,在此因为是去重的业务,所以直接输出一次key即可。就表示这一组单词就取一个。就相当于实现去重的业务
			 */
			context.write(key, NullWritable.get());
		}
	}
}


下面是程序接收到的数据:

hello huangbo
hello xuzheng
hello wangbaoqiang
one two three four five
one two three four
one two three
one two
hello hi


下面是程序输出的结果数据:

five
four
hello
hi
huangbo
one
three
two
wangbaoqiang
xuzheng


从以上的输出结果可以得出一个结论:

1、MapReduce编程框架中,一定会对mapper阶段输出的key-value排序,会按照key-value中的key排序,默认按照自然顺序排序。而且只会按照key进行排序

2、如果一个MapReduce程序,没有reducer阶段,那么mapper和reducer中间的shuffle过程就没有,所以这种情况,是不会排序的,也就是说,只要一个MR程序有reducer阶段,那么该程序一定会对key进行排序。


问题:如果想要进行排序的字段在value中呢,由于MR编程模型只会对key进行排序,所以要怎么实现呢。?



Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐