对于一些应用,需要特殊的数据结构来存储数据。比如运行基于MapReduce的进程,当存储数据时,将每个二进制数据块放入它自己的文件,这样做使得后期不容易扩展。为此,hadoop开发了一系列高级容器。

一、SequenceFile类

包为:org.apache.hadoop.io.SequenceFile

Hadoop的SequenceFile类为二进制键值对提供了一个持续化的数据结构。它提供了 WriterReader and SequenceFile.Sorter 类能独立执行读、写以及排序操作。

如果想应用于日志文件格式,需要选择一个键(如LongWritable表示时间戳)和一个值(如Writable表示日志记录的数量)。

用SequenceFile类作为小型文件的容器也不错。HDFS和MapReduce是大型文件的利器,当我们把文件打包到一个SequenceFile类中,我们能够高效对小型文件进程存储和处理。


(1)创建一个SequenceFile类

该类提供了一种静态方法创建SequenceFile.Writer实例。而且有几个重载方法。hadoop推荐使用静态构造方法如下:

public static org.apache.hadoop.io.SequenceFile.Writer createWriter(FileContext fc,      //文件上下文
                                                                    Configuration conf,   //配置信息
                                                                    Path name,            //文件path
                                                                    Class keyClass,      //键类
                                                                    Class valClass,      //值类
                                                                   CompressionType compressionType, //压缩类型
                                                                    CompressionCodec codec,   //压缩器
                                                                    Metadata metadata,   //文件头部metadata
                                                                    EnumSet<CreateFlag> createFlag, //给出创建的语义如overwrite
                                                                    org.apache.hadoop.fs.Options.CreateOpts... opts) //可选项
                                                             throws IOException

存储在SequenceFile类中的键和值不一定必须是Writable。可以被SequenceFile类序列化和反序列的任何类型都可以使用。

在SequencdFile.Writer之后,就用append()方法写入键/值对。然后在结束的时候调用close()方法。(SequenceFile.Write实现了java.io.Closeable)。

下面就是一个SequenceFile类的程序例子:

public static void main(String[] args) throws IOException(

  String uri = args[0];                                               //创建FileSystem的步骤已经在学习小结一中学过,这里就不再做介绍

  Configuration conf = new Configuration();

  FileSystem fs = FileSystem.gei(URI.create(uri), conf);

 Paht path = new Path(uri);


  IntWritable key = new IntWritable();          //创建键

  Text value = new Text();                           //创建值

  SequencdFile.Writer writer = null;

  try{

          writer = SequencdFile.createWriter(fs,conf,path,key.getClass(),value.getClass() );       //创建Writer

          key.set(data);

         value.set(data);

         writer.append(key,value);

       }finally{

         IOUtils.closeStream(writer);

       }

  }


(2)读取SequenceFile类

从头到尾读取序列文件,需要创建一个SequenceFile.Reader实例。反复调用next()方法之一遍历记录。使用哪一方法取决于所使用的序列化框架。比如Writable类型,可以为:

public boolean next(Writable key, Writable value);    //使用键值作为参数,如果读取是一个键值对,返回true。如果读取到文件末尾,返回false。

下面就是一个读取序列文件的例子:

public static void main(String[] args){

      String uri = args[0];

      Configuration conf = new Configuration();

      FileSystem fs = FileSystem.get(URI.create(uri),conf);

      Path path = new Path(uri);


       SequenceFile.Reader reader = null;

       try{

              reader = new SequenceFile.Reader(fs,path,conf);

              Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);  //使用Reflection工具类创建key的一个实例

              Writable key = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

              long position = reader.getPosition();           //取得reader当前位置

              while(reader.next(key,value)){                    //不断往下读取

                        String syncSeen = reader.syncSeen() ? " * " : " ";    //插入同步点

                        System.out.printf(position,syncSeen,key,value);

                         position = reader.getPosition();

                          }

                }finally{

                     IOUtils.closeStream(reader);

            }

}

此程序能显示序列文件中同步点的位置。同步点是流中的一个点,如果reader失去对位置的判断,同步点就可用于重新同步记录边界,例如在查找流中任意一个位置之后。同步点由SequenceFile.Reader来记录,当序列文件被写入的时候,它会每隔几个记录就插入一个特殊的项来标记此同步点。插入的开销非常小。上面我就使用星号*标记同步点。

有两种方法查找序列文件中指定的位置。第一种是seek()方法,第二种是sync(long position)方法。


三、序列文件的格式

序列文件由一个头部和一个或多个记录组成。

序列文件有三种类型,分别为1无压缩类型,2有压缩类型,3和块压缩类型。它们的记录组成不同,但序列文件头都相同。

SequenceFile 头格式:

  • version - 3 字节SEQ,后接1字节版本号
  • keyClassName -键类名
  • valueClassName - 值类名
  • compression - 布尔类型,标识是否对键值对启用压缩
  • blockCompression - 布尔类型,标识是否对键值对启用块压缩
  • compression codec - 编码解码器类型
  • metadata - SequenceFile的Metadata 
  • sync - 一个同步标记记录头结尾
记录的内部格式:
取决于是否启用压缩,如果是,要么是记录压缩,要么是块压缩。
1无压缩类型:如果没有启用压缩(默认设置)那么每个记录就由它的记录长度(字节数)、键的长度,键和值组成。长度字段为四字节。
2有压缩类型:记录压缩格式与无压缩格式基本相同,不同的是值字节是用定义在头部的编码器来压缩。注意,键是不压缩的。
3块压缩类型:块压缩一次压缩多个记录,因此它比记录压缩更紧凑,而且一般优先选择。当记录的字节数达到最小大小,才会添加到块。该最小值由io.seqfile.compress.blocksize中的属性定义。默认值是1000000字节。格式为记录数、键长度、键、值长度、值。



四、MapFile类

MapFile类是经过排序的带索引的SequenceFile,可以根据键进行查找。MapFile可以被认为是java.util.Map的一种持久化形式。它会无限增长,直至超过map在内存中占有的大小。

创建MapFile类的步骤和创建SequenceFile类差不多,这里我们就不再介绍。

(1)它有两个静态成员变量:

static String DATA_FILE_NAME;         //数据文件名

static String INDEX_FILE_NAME;        //索引文件名


(2)常用方法:

void rename(FileSystem fs, String oldName, String newName)   //对已存在的map目录重命名


long fix(FileSystem fs, Path dir, Class<? extends Writable> keyClass, Class<? extends Writable> valueClass, boolean dryrun, Configuration conf)        //该方法通过重新建立索引来定位出故障的MapFile,返回MapFile中无效的实体数

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐