转载于:http://www.linuxidc.com/Linux/2013-08/88603.htm


1,map阶段的排序

map阶段的排序时对key进行排序,最简单的方式就是将要排序的字段封装成对象,然后这个对象实现WritableComparator接口重写compare这个比较方法,在shuffle阶段就会按照这个定义排序;


2,reduce阶段排序

其实在reduce阶段还有一个排序,因为reduce的数据来源于多个map,来源于map的数据是有序的,所以我们要对多个map的数据归并排序,怎么归并排序我们可以实WritableComparator;job.setSortComparatorClass(XXXXX.class);


3,分组job.setGroupingComparatorClass(GroupComparator.class);

在partition分区,每个map对数据按照key分到不同的本地分区数据文件中,每个key整合成一个记录,每次reduce方法调用处理一个记录,但是group的目的是让一次reduce调用处理多条记录;reduce方法每次是读一条记录,读到相应的key,但是处理value集合时,处理完当前记录的values后,还会判断下一条记录是不是和当前的key是不是同一个组,如果是的话,会继续读取这些记录的值,而这个记录也会被认为已经处理了,直到记录不是当前组,这次reduce调用才结束,这样一次reduce调用就会处理掉一个组中的所有记录,而不仅仅是一条了。

这个有什么用呢?如果不用分组,那么同一组的记录就要在多次reduce方法中独立处理,那么有些状态数据就要传递了,就会增加复杂度,在一次调用中处理的话,这些状态只要用方法内的变量就可以的。比如查找最大值,只要读第一个值就可以了。


package cn.zb.nginx;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondSort {

	
	public static class SMapper extends Mapper<LongWritable, Text, Slog, Text>{
		
		Slog log = new Slog();
		Text active = new Text();
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
		
			StringTokenizer values = new StringTokenizer(value.toString());
			int index = 0;
			while(values.hasMoreTokens()){
				
				if(index == 0){
					log.setUserId(values.nextToken());
				}else if(index == 1){
					log.setVisitTime(Integer.parseInt(values.nextToken()));
				}else{
					active.set(values.nextToken());
				}
				index++;
			}
			context.write(log, active);
		}
	}
	
	public static class SReducer extends Reducer<Slog, Text, Text, NullWritable>{
		Text out = new Text();
		@Override
		protected void reduce(Slog log, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			int i = 0;
			
			Iterator<Text> iterator = values.iterator();
			//只取出最大或者最小值
			while(iterator.hasNext() && i == 0){
				i++;
				out.set(log.toString()+iterator.next());
				context.write(out, NullWritable.get());
			}
		}
	}
	
	
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		try {
			Job job = Job.getInstance(conf);
			job.setJarByClass(SecondSort.class);
			job.setPartitionerClass(UserPatitioner.class);
			job.setGroupingComparatorClass(GroupComparator.class);
			
			job.setMapperClass(SMapper.class);
			job.setReducerClass(SReducer.class);
			
			job.setMapOutputKeyClass(Slog.class);
			job.setMapOutputValueClass(Text.class);
			
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(NullWritable.class);
			//job.setNumReduceTasks(1);
			String devSrc = "c:/wc/src3";
			String devDest = "c:/wc/output3";
			FileInputFormat.addInputPath(job, new Path(devSrc));
			FileOutputFormat.setOutputPath(job, new Path(devDest));
			
			boolean waitForCompletion = job.waitForCompletion(true);
			System.out.println("#####################"+waitForCompletion+"########################");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

class GroupComparator extends WritableComparator{

	
	
	
	public GroupComparator() {
		super(Slog.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		
		Slog log1 = (Slog)a;
		Slog log2 = (Slog)b;
		
		return log1.getUserId().equals(log2.getUserId())?0 :(log1.getUserId().hashCode()>log2.getUserId().hashCode()?1:-1);
	}
	
}
class UserPatitioner extends Partitioner<Slog, Text>{

	@Override
	public int getPartition(Slog key, Text value, int partitionNum) {
		
		return key.getUserId().hashCode()%partitionNum;
	}
	
}

class Slog implements WritableComparable<Slog>{
	private String userId;
	private Integer visitTime;
	public String getUserId() {
		return userId;
	}
	public void setUserId(String userId) {
		this.userId = userId;
	}
	public Integer getVisitTime() {
		return visitTime;
	}
	public void setVisitTime(Integer visitTime) {
		this.visitTime = visitTime;
	}
	
	
	@Override
	public void readFields(DataInput in) throws IOException {
		
		userId = in.readUTF();
		visitTime = in.readInt();
	}
	@Override
	public void write(DataOutput out) throws IOException {
		
		out.writeUTF(userId);
		out.writeInt(visitTime);
	}
	@Override
	public int compareTo(Slog o) {
		
		if(!userId.equals(o.getUserId())){
			return userId.hashCode() > o.getUserId().hashCode()? -1:1;
		}else if(visitTime != o.getVisitTime()){
			if(visitTime > o.visitTime){
				return -1;
			}else if(visitTime < o.visitTime){
				return 1;
			}
		}
		
		return 0;
	}
	@Override
	public String toString() {
		return userId + "\t" + visitTime + "\t";
	}
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((userId == null) ? 0 : userId.hashCode());
		result = prime * result
				+ ((visitTime == null) ? 0 : visitTime.hashCode());
		return result;
	}
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		Slog other = (Slog) obj;
		if (userId == null) {
			if (other.userId != null)
				return false;
		} else if (!userId.equals(other.userId))
			return false;
		if (visitTime == null) {
			if (other.visitTime != null)
				return false;
		} else if (!visitTime.equals(other.visitTime))
			return false;
		return true;
	}
	
	
	
	
}



Logo

更多推荐