hadoop的几个排序
转载于:http://www.linuxidc.com/Linux/2013-08/88603.htm1,map阶段的排序map阶段的排序时对key进行排序,最简单的方式就是将要排序的字段封装成对象,然后这个对象实现WritableComparator接口重写compare这个比较方法,在shuffle阶段就会按照这个定义排序;2,reduce阶段排序其实在redu
转载于:http://www.linuxidc.com/Linux/2013-08/88603.htm
1,map阶段的排序
map阶段的排序时对key进行排序,最简单的方式就是将要排序的字段封装成对象,然后这个对象实现WritableComparator接口重写compare这个比较方法,在shuffle阶段就会按照这个定义排序;
2,reduce阶段排序
其实在reduce阶段还有一个排序,因为reduce的数据来源于多个map,来源于map的数据是有序的,所以我们要对多个map的数据归并排序,怎么归并排序我们可以实WritableComparator;job.setSortComparatorClass(XXXXX.class);
3,分组job.setGroupingComparatorClass(GroupComparator.class);
在partition分区,每个map对数据按照key分到不同的本地分区数据文件中,每个key整合成一个记录,每次reduce方法调用处理一个记录,但是group的目的是让一次reduce调用处理多条记录;reduce方法每次是读一条记录,读到相应的key,但是处理value集合时,处理完当前记录的values后,还会判断下一条记录是不是和当前的key是不是同一个组,如果是的话,会继续读取这些记录的值,而这个记录也会被认为已经处理了,直到记录不是当前组,这次reduce调用才结束,这样一次reduce调用就会处理掉一个组中的所有记录,而不仅仅是一条了。
这个有什么用呢?如果不用分组,那么同一组的记录就要在多次reduce方法中独立处理,那么有些状态数据就要传递了,就会增加复杂度,在一次调用中处理的话,这些状态只要用方法内的变量就可以的。比如查找最大值,只要读第一个值就可以了。
package cn.zb.nginx;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondSort {
public static class SMapper extends Mapper<LongWritable, Text, Slog, Text>{
Slog log = new Slog();
Text active = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
StringTokenizer values = new StringTokenizer(value.toString());
int index = 0;
while(values.hasMoreTokens()){
if(index == 0){
log.setUserId(values.nextToken());
}else if(index == 1){
log.setVisitTime(Integer.parseInt(values.nextToken()));
}else{
active.set(values.nextToken());
}
index++;
}
context.write(log, active);
}
}
public static class SReducer extends Reducer<Slog, Text, Text, NullWritable>{
Text out = new Text();
@Override
protected void reduce(Slog log, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int i = 0;
Iterator<Text> iterator = values.iterator();
//只取出最大或者最小值
while(iterator.hasNext() && i == 0){
i++;
out.set(log.toString()+iterator.next());
context.write(out, NullWritable.get());
}
}
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
Job job = Job.getInstance(conf);
job.setJarByClass(SecondSort.class);
job.setPartitionerClass(UserPatitioner.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setMapperClass(SMapper.class);
job.setReducerClass(SReducer.class);
job.setMapOutputKeyClass(Slog.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//job.setNumReduceTasks(1);
String devSrc = "c:/wc/src3";
String devDest = "c:/wc/output3";
FileInputFormat.addInputPath(job, new Path(devSrc));
FileOutputFormat.setOutputPath(job, new Path(devDest));
boolean waitForCompletion = job.waitForCompletion(true);
System.out.println("#####################"+waitForCompletion+"########################");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class GroupComparator extends WritableComparator{
public GroupComparator() {
super(Slog.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Slog log1 = (Slog)a;
Slog log2 = (Slog)b;
return log1.getUserId().equals(log2.getUserId())?0 :(log1.getUserId().hashCode()>log2.getUserId().hashCode()?1:-1);
}
}
class UserPatitioner extends Partitioner<Slog, Text>{
@Override
public int getPartition(Slog key, Text value, int partitionNum) {
return key.getUserId().hashCode()%partitionNum;
}
}
class Slog implements WritableComparable<Slog>{
private String userId;
private Integer visitTime;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Integer getVisitTime() {
return visitTime;
}
public void setVisitTime(Integer visitTime) {
this.visitTime = visitTime;
}
@Override
public void readFields(DataInput in) throws IOException {
userId = in.readUTF();
visitTime = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userId);
out.writeInt(visitTime);
}
@Override
public int compareTo(Slog o) {
if(!userId.equals(o.getUserId())){
return userId.hashCode() > o.getUserId().hashCode()? -1:1;
}else if(visitTime != o.getVisitTime()){
if(visitTime > o.visitTime){
return -1;
}else if(visitTime < o.visitTime){
return 1;
}
}
return 0;
}
@Override
public String toString() {
return userId + "\t" + visitTime + "\t";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((userId == null) ? 0 : userId.hashCode());
result = prime * result
+ ((visitTime == null) ? 0 : visitTime.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Slog other = (Slog) obj;
if (userId == null) {
if (other.userId != null)
return false;
} else if (!userId.equals(other.userId))
return false;
if (visitTime == null) {
if (other.visitTime != null)
return false;
} else if (!visitTime.equals(other.visitTime))
return false;
return true;
}
}
更多推荐
所有评论(0)