过去,在Cassandra中批量加载数据一直很困难。尽管Cassandra从一开始就具有BinaryMemtable接口,但是BinaryMemtable难以使用,并且与普通客户端写入相比,吞吐量有了较小的提高。
Cassandra 0.8.1引入了解决此问题的新工具: sstableloader

使用 sstableloader

有关最新信息,请参见 DataStax社区文档
sstableloader 是一种稳定的数据文件处理工具,将已经生成好的数据流式传输到整个群集。它不是简单地将sstables复制到每一个节点,按照集群复制策略将文件分发给不同节点。

从生产经验来看,此工具使用主要两个主要步骤:

1、将外部数据批量加载到集群中:为此,您必须首先生成sstables来加载数据,正如我们将在本文后面看到的那样。

如下加载csv文件(也可以是其他类型文件),解析后生成SSTable文件,注意:生成的文件路径必须按照一定规范存放:…/keyspace/tableName/各种元数据文件。注意:生成表数据的主键列不允许重复,以就是说相同主键的数据不允许重复addRow()。

// Prepare SSTable writer 
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory 
builder.inDirectory(outputDir)
       // set target schema 
       .forTable(SCHEMA)
       // set CQL statement to put data 
       .using(INSERT_STMT)
       // set partitioner if needed 
       // default is Murmur3Partitioner so set if you use different one. 
       .withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();

// ...snip... 

while ((line = csvReader.read()) != null){
    // We use Java types here based on 
    // https://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29 
    writer.addRow(ticker,
                  DATE_FORMAT.parse(line.get(0)),
                  new BigDecimal(line.get(1)),
                  new BigDecimal(line.get(2)),
                  new BigDecimal(line.get(3)),
                  new BigDecimal(line.get(4)),
                  Long.parseLong(line.get(5)),
                  new BigDecimal(line.get(6)));
}
writer.close();

可在github查看完整的事例,生成SSTable之后,您只需使用sstableloader以集群为目标,如前所述。cqlstablewriter仍然有一些限制,比如不能并行使用,或者还不支持用户定义的类型。版本不断更新迭代,请继续关注ApacheJira

2、将预先(生成)存在的sstable(通常是快照)加载到具有不同节点数或复制策略的另一个群集中。

进入cassandra的bin目录,找到sstableloader工具:
bin/sstableloader <dir_path>
详细命令
详细的选项可以参考官方文档的介绍,一般常见的选项有:
-d, –nodes 目标集群的nodes
-u, –username 用户名
-pw, –password 密码
-t, –throttle 限速,单位Mbits/s (默认不限制)
-cph, –connections-per-host 和每个节点建立多少连接

  • 手动执行命令,将第一步生产的sstable数据传给集群
    sstable文件存放路径为:/home/appuser/output/keyspace/tableName,注意文件路径结尾必须是keyspace/tableName。
    ./sstableloader -d 192.168.0.1,192.168.0.2,192.168.0.3 -u cassuser -pw upassword -t 100 -cph 100 /home/appuser/output/keyspace/tableName
  • 代码方式执行命令,实际开发可基于Java进程命令调用
调用:
main(String[] args){
   cmdExecute("/home/appuser/apache-cassandra-3.6.x/bin/sstableloader -d 192.168.0.1,192.168.0.2,192.168.0.3 -u cassuser -pw upassword -t 100 -cph 100 /home/output/keyspace/tableName");
}
private String cmdExecute(String commandLine){
		Process process;
		try {		                               ?process=Runtime.getRuntime().exec(commandLine);           
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;
            while ((line = br.readLine()) != null) {
                log.info(line);
            }
            int exitCode = process.waitFor();
            log.info("exitCode = "+ exitCode);
            process.destoryForcibly();
		}catch(Exception ex){
		  log. error("异常", ex);
		}
		return result;
	}
  • 批次调度,可通过调度系统,采用代码方式执行命令方式进行调用。

更多推荐