Hbase 版本为 2.2.2 采用3个节点的集群部署,压缩算法采用SNAPPY 

创建两张表,一直使用压缩算法一张不使用压缩算法

test表使用SNAPPY 压缩

create 'test', { NAME => 'info', COMPRESSION => 'SNAPPY' }

demo不采用压缩

create 'demo', { NAME => 'info' }

使用list命令查看表集合

 使用 desc命令查看表空间

desc 'test'

注意两个表的压缩区别

此时使用hadoop命令查看表空间所占空间大小

hadoop fs -du -h /hbase/data/defailt //没有指定空间,默认在deefault中

[root@aims02 ~]# hadoop fs -du -h /hbase/data/default
235  235  /hbase/data/default/demo
244  244  /hbase/data/default/test

采用java工具类进行数据插入:

package com.aims.nuclear.power.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;


/**
 * @Package: com.aims.nuclear.power.config
 * @Description: <>
 * @Author: MILLA
 * @CreateDate: 2019/12/17 14:12
 * @UpdateUser: MILLA
 * @UpdateDate: 2019/12/17 14:12
 * @UpdateRemark: <>
 * @Version: 1.0
 */
public class HbaseUtil {
    public static final String c = "info";

    public static void main(String[] args) throws Exception {
        insert();
    }

    public static void insert() throws Exception {
        Connection connection = getConnection();
//        Admin admin = connection.getAdmin();
        Table test = connection.getTable(TableName.valueOf("test"));
        Table demo = connection.getTable(TableName.valueOf("demo"));

        new Thread(() -> {
            try {
                int i = 0;
                while (true) {
                    Thread.sleep(1L);
                    String rowKey = getRandStr() + System.currentTimeMillis();
                    insertList(test, demo, i++, rowKey);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    //插入数据的方法
    public static void insertList(Table test, Table demo, int i, String rowKey) {
        User user = new User();
//        String rowKey = getRandStr() + System.currentTimeMillis();
        Put put = new Put((rowKey).getBytes());
        byte[] name = Bytes.toBytes(user.getName());
        byte[] age = Bytes.toBytes(user.getAge());
        byte[] sex = Bytes.toBytes(user.getSex());
//        byte[] part = user.getPart();

        put.addColumn(c.getBytes(), "name".getBytes(), name);
        put.addColumn(c.getBytes(), "Age".getBytes(), age);
        put.addColumn(c.getBytes(), "Sex".getBytes(), sex);
//        put.addColumn(c.getBytes(), "Part".getBytes(), part);
        try {
            test.put(put);
            demo.put(put);
            if (i % 10000 == 0) {
                System.out.println("test表-->rowKey: " + rowKey + " 插入数据成功;次数:" + i + " 已插入大小:" + (i > 1024 * 1024 ? (double) i / (1024 * 1024) + "GB " : (double) i / 1024 + " MB"));
                System.out.println("demo表-->rowKey: " + rowKey + " 插入数据成功;次数:" + i + " 已插入大小:" + (i > 1024 * 1024 ? (double) i / (1024 * 1024) + "GB " : (double) i / 1024 + " MB"));
            }
        } catch (IOException e) {
//            System.out.println("test表-->rowKey: " + rowKey + " 插入数据异常");
//            System.out.println("demo表-->rowKey: " + rowKey + " 插入数据异常");
            e.printStackTrace();
        }
    }

    //创建表的方法
    private static void createTable(Admin admin, String tableName) throws Exception {
        HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
        HColumnDescriptor descriptor2 = new HColumnDescriptor(c);
        descriptor.addFamily(descriptor2);
        admin.createTable(descriptor);
    }

    //获得与hbase的连接
    private static Connection getConnection() throws Exception {
        Configuration configuration = HBaseConfiguration.create();
//        configuration.set("hbase.zookeeper.quorum", "aims03");
        configuration.set("hbase.zookeeper.quorum", "aims02");
//        configuration.set("hbase.zookeeper.quorum", "192.168.16.32");
        configuration.set("hbase.zookeeper.port", "2181");
        Connection connection = ConnectionFactory.createConnection(configuration);
        return connection;
    }

    public static String getRandStr() {


        String strs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";

        StringBuffer buff = new StringBuffer();


        char str = strs.charAt((int) (Math.random() * 26));

        buff.append(str);


        return buff.toString();

    }
}

然后不停的跟踪表空间的占比情况:

[root@aims02 ~]# hadoop fs -du -h /hbase/data/default
28.3 G  28.3 G  /hbase/data/default/demo
5.0 G   5.0 G   /hbase/data/default/test
   //此时的压缩比为:5.0/28.3≈17.67%

查询可根据以下java工具类查询:

package com.aims.nuclear.power.config;

/**
 * @Package: com.aims.nuclear.power.config
 * @Description: <>
 * @Author: MILLA
 * @CreateDate: 2019/12/18 12:10
 * @UpdateUser: MILLA
 * @UpdateDate: 2019/12/18 12:10
 * @UpdateRemark: <>
 * @Version: 1.0
 */

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.aims.nuclear.power.config.HbaseUtil.insertList;

@Slf4j
public class HBasePoolUtils {


    public static void main(String[] args) {
//        testOne();
        testList();
    }

    private static void testList() {
        long demoStart = System.currentTimeMillis();
        List<User> users = queryData("test", "Y1576732287729", "Y1576744250744");
        long demoEnd = System.currentTimeMillis();


//        long testStart = System.currentTimeMillis();
//        List<User> users1 = queryData("test", "1576749130724", "A" + System.currentTimeMillis());
//        long testEnd = System.currentTimeMillis();
//        System.out.println("test------数据量:" + users.size() + " |--------: " + (testEnd - testStart));

        System.out.println("test------数据量:" + users.size() + " |--------: " + (demoEnd - demoStart));
    }

    private static void testOne() {


        long testStart = System.currentTimeMillis();
        queryData("test", "Q1576749130546");
        long testEnd = System.currentTimeMillis();
        long demoStart = System.currentTimeMillis();
        queryData("demo", "Q1576749130546");
        long demoEnd = System.currentTimeMillis();
        System.out.println("test--------------: " + (testEnd - testStart));
        System.out.println("demo--------------: " + (demoEnd - demoStart));
    }

    private static Configuration conf = null;
    // 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    private static ExecutorService executor = null;
    private static Connection conn = null;

    static {
        try {
            conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "aims02");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("hbase.defaults.for.version.skip", "true");
            executor = Executors.newFixedThreadPool(20);
            conn = ConnectionFactory.createConnection(conf, executor);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static Connection getConn() {
        return conn;
    }

    /**
     * 查询操作
     *
     * @tableName 表名
     * @index 索引
     * @author zhouzp
     * @date 2019年1月8日
     */
    public static Map<String, Object> queryData(String tableName, String index) {
        Table table = null;
        try {
            table = conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(index.getBytes());
            Result rs = table.get(get);
            List<Cell> cells = rs.listCells();
            for (Cell cell : cells) {
                byte[] bytes = CellUtil.cloneValue(cell);
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                System.out.println("family: " + family + ", qualifier:" + qualifier);
                if (StringUtils.equals("Part", qualifier)) {
                    System.out.println("value: " + Arrays.toString((double[]) SerializeUtil.bytes2Object(bytes)));
                } else if (StringUtils.equals("Age", qualifier)) {
                    System.out.println("value: " + Bytes.toDouble(bytes));
                } else {
                    System.out.println("value: " + Bytes.toString(bytes));
                }
                System.out.println("-----------------------");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    public static List<User> queryData(String tableName, String start, String end) {
        Table table = null;
        List<User> userList = Lists.newArrayList();
        try {
            table = conn.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("info"));
            scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("Age"));
//            QualifierFilter age = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("Sex")));
//            scan.setStartRow(Bytes.toBytes(start));
//            scan.setStopRow(Bytes.toBytes(end));
//            scan.setFilter(age);
            scan.withStartRow(Bytes.toBytes(start));

            scan.withStartRow(Bytes.toBytes(start));
            scan.withStopRow(Bytes.toBytes(end));
            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                userList.addAll(printRs(result));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return userList;
    }

    private static List<User> printRs(Result rs) {
        List<User> userList = Lists.newArrayList();
        List<Cell> cells = rs.listCells();
        for (Cell cell : cells) {
            User user = new User();
            byte[] bytes = CellUtil.cloneValue(cell);
//            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
            if (StringUtils.equals("Part", qualifier)) {
                user.setPart(bytes);
            } else if (StringUtils.equals("Age", qualifier)) {
                user.setAge(Bytes.toDouble(bytes));
            } else if (StringUtils.equals("Sex", qualifier)) {
                user.setSex(Bytes.toString(bytes));
            } else if (StringUtils.equals("name", qualifier)) {
                user.setName(Bytes.toString(bytes));
            }
            userList.add(user);
        }
        return userList;
    }

    public static Map<String, Object> queryDataDouble(String tableName, String start, String end) {
        long currentTimeMillis = System.currentTimeMillis();
        Table table = null;
        List<Double> doubleList = Lists.newArrayList();
        List<Long> times = Lists.newArrayList();
        List<String> rowKeys = Lists.newArrayList();
        try {
            table = conn.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("info"));
            scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("Age"));
//            QualifierFilter age = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("Age")));
//            scan.setStartRow(Bytes.toBytes(start));
//            scan.setStopRow(Bytes.toBytes(end));
            scan.withStartRow(Bytes.toBytes(start));
            scan.withStopRow(Bytes.toBytes(end));
//            scan.setFilter(age);
            ResultScanner scanner = table.getScanner(scan);
            for (Result rs : scanner) {
                List<Cell> cells = rs.listCells();
                for (Cell cell : cells) {
                    byte[] bytes = CellUtil.cloneValue(cell);
//                    String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
//                    if (StringUtils.equals("Age", qualifier)) {
                    long timestamp = cell.getTimestamp();
                    doubleList.add((Bytes.toDouble(bytes) + Math.random()));
                    times.add(timestamp);
                    rowKeys.add(rowKey);
//                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        log.info("获取数据总数为:{},共用时:{} 毫秒", doubleList.size(), (System.currentTimeMillis() - currentTimeMillis));
        Map<String, Object> data = Maps.newHashMap();
        data.put("rowKeys", rowKeys);
        data.put("values", doubleList);
        data.put("times", times);
        return data;
    }

    private static volatile boolean starting = true;

    public static void savingData(String code) throws IOException {
        Table test = conn.getTable(TableName.valueOf("test"));
        Table demo = conn.getTable(TableName.valueOf("demo"));
        new Thread(() -> {
            try {
                int i = 0;
                while (starting) {
                    Thread.sleep(1L);
                    String rowKey = code.toUpperCase() + System.currentTimeMillis();
                    insertList(test, demo, i++, rowKey);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    public static void stoppingCode() {
        starting = false;
    }
}

查询数据量及所需耗时如下:

以上为千万数据的性能测试,应该还有优化空间,但是,一般不做降采样,获取这么多数据做趋势图的可能性也比较小。

等过两天跑出来几亿数据的时候再做亿级的查询,看看能否做性能上的优化。

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐