分布式grep的MapReduce实现

MapReduce的例子网上有很多,这里给出一个分布式grep的实现。

注:Hadoop版本为2.7.1


grep简介

Linux系统中grep命令是一种强大的文本搜索工具,它能使用正则表达式搜索文本,并把匹 配的行打印出来。grep全称是Global Regular Expression Print,表示全局正则表达式版本,它的使用权限是所有用户。关于正则表达式的内容可以参考http://www.runoob.com/regexp/regexp-tutorial.html

实现目标

基于MapReduce实现一个分布式的grep。搜索哪些文本文件具有匹配的文本,并且打印出匹配的条数。

算法思路

其实该算法的实现与WordCount算法具有一定的相似度,或者可以说是WordCount的升级版,只要将map端进行匹配,然后reduce端进行计算求和即可。下面给出该算法的实现代码。

代码块

import java.io.IOException;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class testGrep {
    public static class GrepMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        @Override
        public void map(Object obj, Text text, Context context)
                throws IOException, InterruptedException {
            String pattern = context.getConfiguration().get("grep");
            // System.out.println(split.getPath().toString());
            String str = text.toString();
            Pattern r = Pattern.compile(pattern);
            Matcher m = r.matcher(str);
            if (m.find()) {
                FileSplit split = (FileSplit) context.getInputSplit();
                String filename = split.getPath().getName();
                context.write(new Text(filename), new IntWritable(1));
            }
        }
    }

    public static class GrepReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text text, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable t : values) {
                sum += t.get();
            }
            context.write(text, new IntWritable(sum));
        }
    }


    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String pattern = ".22*.";//匹配含有w字符,这里修改我们需要匹配的模式
        conf.set("grep", pattern);// 在这里设置需要匹配的正则表达式
        Job job = Job.getInstance(conf, "grep");
        job.setJarByClass(testGrep.class);
        job.setMapperClass(GrepMapper.class);
        job.setReducerClass(GrepReducer.class);
        job.setCombinerClass(GrepReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //下面根据具体情况进行修改
        String args1 = "hdfs://172.16.47.128:9000/user/grep_input";
        String args2 = "hdfs://172.16.47.128:9000/user/grep_output";
        FileSystem fs = FileSystem.newInstance(URI.create(args1), conf);
        fs.delete(new Path(args2), true);
        FileInputFormat.addInputPath(job, new Path(args1));
        FileOutputFormat.setOutputPath(job, new Path(args2));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
Logo

更多推荐