在 cassandra 中使用 map reduce 执行批量加载
问题:在 cassandra 中使用 map reduce 执行批量加载
我没有太多使用 cassandra 的经验,所以如果我采取了错误的方法,请原谅。
我正在尝试使用 map reduce 在 cassandra 中进行批量加载
基本上是字数示例
参考:http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/
我已经放置了简单的 Hadoop Wordcount Mapper 示例,并按照上面的示例稍微修改了驱动程序代码和减速器。
我也成功生成了输出文件。现在我的疑问是如何执行加载到 cassandra 部分?我的方法有什么不同吗?
请指教。
这是驱动程序代码的一部分
Job job = new Job();
job.setJobName(getClass().getName());
job.setJarByClass(CassaWordCountJob.class);
Configuration conf = job.getConfiguration();
conf.set("cassandra.output.keyspace", "test");
conf.set("cassandra.output.columnfamily", "words");
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160"); // default
conf.set("cassandra.output.thrift.address", "localhost");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");
job.setMapperClass(CassaWordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(CassaWordCountReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra"));
MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
return job.waitForCompletion(true) ? 0 : 1;
Mapper 与普通的 wordcount 映射器相同,只是标记并发出 Word,1
减速器类的形式是
public class CassaWordCountReducer extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Mutation> columnsToAdd = new ArrayList<Mutation>();
Integer wordCount = 0;
for(IntWritable value : values) {
wordCount += value.get();
}
Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
countCol.setTimestamp(new Date().getTime());
ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
wordCosc.setColumn(countCol);
Mutation countMut = new Mutation();
countMut.column_or_supercolumn = wordCosc;
columnsToAdd.add(countMut);
context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
}
}
解答
要将批量加载到 Cassandra 中,我建议查看来自 DataStax](http://www.datastax.com/dev/blog/bulk-loading)的[这篇文章。基本上你需要为批量加载做两件事:
-
您的输出数据本身不适合 Cassandra,您需要将其转换为 SSTables。
-
拥有 SSTables 后,您需要能够将它们流式传输到 Cassandra。当然你不只是想把每个 SSTable 复制到每个节点,你只想把数据的相关部分复制到每个节点
在您使用BulkOutputFormat的情况下,它应该在幕后使用sstableloader来完成所有这些工作。我从未将它与MultipleOutputs一起使用,但它应该可以正常工作。
我认为您的情况的错误是您没有正确使用MultipleOutputs:当您真正应该写入MultipleOutputs对象时,您仍在执行context.write。你现在这样做的方式,因为你正在写入常规的Context,它将被TextOutputFormat的默认输出格式而不是你在MultipleOutputs中定义的格式拾取。有关如何在减速器中使用MultipleOutputs的更多信息,请参见。
一旦您按照您的定义写入正确的BulkOutputFormat输出格式,您的 SSTables 应该被创建并从集群中的每个节点流式传输到 Cassandra - 您不需要任何额外的步骤,输出格式将为您处理。
另外,我建议您查看这篇文章,其中他们还解释了如何使用BulkOutputFormat,但他们使用的是ConfigHelper,您可能希望查看它以更轻松地配置您的 Cassandra 端点。
更多推荐
所有评论(0)