Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(二)
接着Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(一),现在我们 又有了新的需求,我们需要根据用户的总流量升序排列。1.环境:Centos 6.5 32位, 在linux环境中开发。
·
接着Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(一),现在我们 又有了新的需求,我们需要根据用户的总流量升序排列。
1.环境:Centos 6.5 32位, 在linux环境中开发。
2.核心代码如下:
2.1 Mapper类。
package com.npf.hadoop.sort;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.npf.hadoop.FlowBean;
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
private FlowBean flowBean = new FlowBean();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = StringUtils.split(line, "\t");
String phoneNumber = fileds[0].trim();
System.out.println("phoneNumber"+phoneNumber);
long upFlow = Long.valueOf(fileds[1].trim());
System.out.println("upFlow"+upFlow);
long downFlow = Long.valueOf(fileds[2].trim());
System.out.println("downFlow"+downFlow);
long sumFlow = Long.valueOf(fileds[3].trim());
System.out.println("sumFlow"+sumFlow);
flowBean.setPhoneNumber(phoneNumber);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setSumFlow(sumFlow);
context.write(flowBean, NullWritable.get());
}
}
2.2 Reducer类。
package com.npf.hadoop.sort;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.npf.hadoop.FlowBean;
public class FlowCountSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
@Override
protected void reduce(FlowBean bean, Iterable<NullWritable> values,Context context)throws IOException, InterruptedException {
context.write(new Text(bean.getPhoneNumber()), bean);
}
}
2.3 FlowBean类。
package com.npf.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean>{
private String phoneNumber;
private long upFlow;
private long downFlow;
private long sumFlow;
public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public int compareTo(FlowBean o) {
return this.sumFlow > o.getSumFlow() ? 1 : -1;
}
}
2.4 runner主程序入口。
package com.npf.hadoop.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.npf.hadoop.FlowBean;
public class FlowCountSortRunner {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowCountSortRunner.class);
job.setMapperClass(FlowCountSortMapper.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FlowCountSortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://devcitibank:9000/flowCountJob/outputdata"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://devcitibank:9000/flowCountJob/outputdatasort"));
job.waitForCompletion(true);
}
}
3. 我们通过Eclipse将我们的程序打成一个Jar包,打到/root目录下面。Jar包的名字我们命名为flowsortcount.jar。
4. ok, 我们来验证下在/root/目录下是否存在我们的Jar包。
5. 验证hadoop集群是否启动。
6. 验证我们在集群中的/flowCountJob/outputdata目录下面是否有我们需要处理的文件。
7.提交flowsortcount.jar到hadoop集群中去处理。
8. 执行成功后,我们去hadoop集群中去查看结果。
9. 源代码已托管到GitHub上面 : https://github.com/HadoopOrganization/HadoopMapReducerFlowCount
更多推荐
已为社区贡献1条内容
所有评论(0)