【数据平台】之Cassandra大数据利器-大规模数据迁移sstableloader
过去,在Cassandra中批量加载数据一直很困难。尽管Cassandra从一开始就具有BinaryMemtable接口,但是BinaryMemtable难以使用,并且与普通客户端写入相比,吞吐量有了较小的提高。Cassandra 0.8.1引入了解决此问题的新工具: sstableloader使用 sstableloader有关最新信息,请参见 DataStax社区文档。sstableloade
过去,在Cassandra中批量加载数据一直很困难。尽管Cassandra从一开始就具有BinaryMemtable接口,但是BinaryMemtable难以使用,并且与普通客户端写入相比,吞吐量有了较小的提高。
Cassandra 0.8.1引入了解决此问题的新工具: sstableloader
使用 sstableloader
有关最新信息,请参见 DataStax社区文档。
sstableloader 是一种稳定的数据文件处理工具,将已经生成好的数据流式传输到整个群集。它不是简单地将sstables复制到每一个节点,按照集群复制策略将文件分发给不同节点。
从生产经验来看,此工具使用主要两个主要步骤:
1、将外部数据批量加载到集群中:为此,您必须首先生成sstables来加载数据,正如我们将在本文后面看到的那样。
如下加载csv文件(也可以是其他类型文件),解析后生成SSTable文件,注意:生成的文件路径必须按照一定规范存放
:…/keyspace/tableName/各种元数据文件。注意:生成表数据的主键列不允许重复
,以就是说相同主键的数据不允许重复addRow()。
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory
builder.inDirectory(outputDir)
// set target schema
.forTable(SCHEMA)
// set CQL statement to put data
.using(INSERT_STMT)
// set partitioner if needed
// default is Murmur3Partitioner so set if you use different one.
.withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
// ...snip...
while ((line = csvReader.read()) != null){
// We use Java types here based on
// https://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29
writer.addRow(ticker,
DATE_FORMAT.parse(line.get(0)),
new BigDecimal(line.get(1)),
new BigDecimal(line.get(2)),
new BigDecimal(line.get(3)),
new BigDecimal(line.get(4)),
Long.parseLong(line.get(5)),
new BigDecimal(line.get(6)));
}
writer.close();
可在github查看完整的事例,生成SSTable之后,您只需使用sstableloader以集群为目标,如前所述。cqlstablewriter仍然有一些限制,比如不能并行使用,或者还不支持用户定义的类型。版本不断更新迭代,请继续关注ApacheJira
2、将预先(生成)存在的sstable(通常是快照)加载到具有不同节点数或复制策略的另一个群集中。
进入cassandra的bin目录,找到sstableloader工具:
bin/sstableloader <dir_path>
详细的选项可以参考官方文档的介绍,一般常见的选项有:
-d, –nodes 目标集群的nodes
-u, –username 用户名
-pw, –password 密码
-t, –throttle 限速,单位Mbits/s (默认不限制)
-cph, –connections-per-host 和每个节点建立多少连接
- 手动执行命令,将第一步生产的sstable数据传给集群
sstable文件存放路径为:/home/appuser/output/keyspace/tableName,注意文件路径结尾必须是keyspace/tableName。
./sstableloader -d 192.168.0.1,192.168.0.2,192.168.0.3 -u cassuser -pw upassword -t 100 -cph 100 /home/appuser/output/keyspace/tableName - 代码方式执行命令,实际开发可基于Java进程命令调用
调用:
main(String[] args){
cmdExecute("/home/appuser/apache-cassandra-3.6.x/bin/sstableloader -d 192.168.0.1,192.168.0.2,192.168.0.3 -u cassuser -pw upassword -t 100 -cph 100 /home/output/keyspace/tableName");
}
private String cmdExecute(String commandLine){
Process process;
try { ?process=Runtime.getRuntime().exec(commandLine);
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = br.readLine()) != null) {
log.info(line);
}
int exitCode = process.waitFor();
log.info("exitCode = "+ exitCode);
process.destoryForcibly();
}catch(Exception ex){
log. error("异常", ex);
}
return result;
}
- 批次调度,可通过调度系统,采用
代码方式执行命令
方式进行调用。
更多推荐
所有评论(0)