接着Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(一),现在我们 又有了新的需求,我们需要根据用户的总流量升序排列。


1.环境:Centos 6.5  32位, 在linux环境中开发。

2.核心代码如下:

2.1 Mapper类。

package com.npf.hadoop.sort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.npf.hadoop.FlowBean;


public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

        private FlowBean flowBean = new FlowBean();

        @Override
        protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
                String line = value.toString();
                String[] fileds = StringUtils.split(line, "\t");
                String phoneNumber = fileds[0].trim();
                System.out.println("phoneNumber"+phoneNumber);
                long upFlow = Long.valueOf(fileds[1].trim());
                System.out.println("upFlow"+upFlow);
                long downFlow = Long.valueOf(fileds[2].trim());
                System.out.println("downFlow"+downFlow);
                long sumFlow = Long.valueOf(fileds[3].trim());
                System.out.println("sumFlow"+sumFlow);
                flowBean.setPhoneNumber(phoneNumber);
                flowBean.setUpFlow(upFlow);
                flowBean.setDownFlow(downFlow);
                flowBean.setSumFlow(sumFlow);
                context.write(flowBean, NullWritable.get());
        }




}
2.2 Reducer类。

package com.npf.hadoop.sort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.npf.hadoop.FlowBean;

public class FlowCountSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{

        @Override
        protected void reduce(FlowBean bean, Iterable<NullWritable> values,Context context)throws IOException, InterruptedException {
                context.write(new Text(bean.getPhoneNumber()), bean);
        }
}
2.3 FlowBean类。

package com.npf.hadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

        private String phoneNumber;

        private long upFlow;

        private long downFlow;

        private long sumFlow;

        public String getPhoneNumber() {
                return phoneNumber;
        }

        public void setPhoneNumber(String phoneNumber) {
                this.phoneNumber = phoneNumber;
        }

        public long getUpFlow() {
                return upFlow;
        }

        public void setUpFlow(long upFlow) {
                this.upFlow = upFlow;
        }

        public long getDownFlow() {
                return downFlow;
        }

        public void setDownFlow(long downFlow) {
                this.downFlow = downFlow;
        }

        public long getSumFlow() {
                return sumFlow;
        }

        public void setSumFlow(long sumFlow) {
                this.sumFlow = sumFlow;
        }



        @Override
        public String toString() {
                return upFlow+"\t"+downFlow+"\t"+sumFlow;
        }

        @Override
        public void write(DataOutput out) throws IOException {
                out.writeUTF(phoneNumber);
                out.writeLong(upFlow);
                out.writeLong(downFlow);
                out.writeLong(sumFlow);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
                phoneNumber = in.readUTF();
                upFlow = in.readLong();
                downFlow = in.readLong();
                sumFlow = in.readLong();
        }

        @Override
        public int compareTo(FlowBean o) {
                return this.sumFlow > o.getSumFlow() ? 1 : -1;
        }
}
2.4 runner主程序入口。

package com.npf.hadoop.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.npf.hadoop.FlowBean;


public class FlowCountSortRunner {

public static void main(String[] args) throws Exception {

                Configuration configuration = new Configuration();
                Job job = Job.getInstance(configuration);
                job.setJarByClass(FlowCountSortRunner.class);

                job.setMapperClass(FlowCountSortMapper.class);
                job.setMapOutputKeyClass(FlowBean.class);
                job.setMapOutputValueClass(NullWritable.class);

                job.setReducerClass(FlowCountSortReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(FlowBean.class);

                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);

                FileInputFormat.setInputPaths(job, new Path("hdfs://devcitibank:9000/flowCountJob/outputdata"));
                FileOutputFormat.setOutputPath(job, new Path("hdfs://devcitibank:9000/flowCountJob/outputdatasort"));
                job.waitForCompletion(true);
        }

}
3. 我们通过Eclipse将我们的程序打成一个Jar包,打到/root目录下面。Jar包的名字我们命名为flowsortcount.jar。

4. ok, 我们来验证下在/root/目录下是否存在我们的Jar包。


5. 验证hadoop集群是否启动。

6. 验证我们在集群中的/flowCountJob/outputdata目录下面是否有我们需要处理的文件。


7.提交flowsortcount.jar到hadoop集群中去处理。

8. 执行成功后,我们去hadoop集群中去查看结果。


9. 源代码已托管到GitHub上面 : https://github.com/HadoopOrganization/HadoopMapReducerFlowCount

Logo

更多推荐