早在2010年中的时候,我就对Casandra特别感兴趣,那时候还是0.7版本,累计2个月每天晚上都花2小时来啃源代码,读写流程都调试N遍了,也整出来几篇小文,不过后台工作方向上最终没有它的用武之地,后面就慢慢没有再持续更进了。这个差不多5年以后的现在,有些场景觉得Cassandra很合适,就又把他拿出来溜一把。

         业务上有类似播放列表的需求,由于需要分析用户的行为数据(用户的播放与下载信息都有单条的记录信息),不光需要能进行数据清洗还要能查询指定用户按照时间维度的详细行为,已经收集到的数据累计有近80亿条。对于写入和读取都有很高的要求,如果每秒写入1000条的话,需要写入2222小时,每秒写入10000条,需要写入222小时,对于读取数据也是一样的,而我根据数据分析的结果不同可能需要把这个数据过滤很多遍,就算每次遍历速度为10000条/秒,那也要遍历222小时,如果就这个速度,这事基本也就不用做了。这个时候,Cassandra就又来到了我的面前了,虽说放下5年了,但是我还是不时的想起它来,多少还是知道它进展到哪里了。

         (一)关于Key的那些事

        直接进入正题了。我觉得composite-keys-in-apache-cassandra这篇文章所描述的概念如果不能理解透彻的话,真不应该去擅自去实验使用C*,当然更加不应该去在产品中使用了。

        (1)单字段作为分区键          

create table bite (
      id varchar PRIMARY KEY,
      feedid varchar,
      score bigint,
      data varchar
  );
 
create index bite_feedid on bite (feedid);
create index bite_score on bite (score);
这里id作为分区Key,就像我们经常在mysql中用到的按照Id维度来分表一样,id字段决定了该条数据到底落地在C*的哪一个节点上。至于后面的二级索引,基本可以当它不存在,号称是索引,真的派不上用场的,就像划船绣腿一样,只能用来吹吹,没有使用价值。我这里就不表了,如果你真的想明白了二级索引的原理,真的彻底知道它的优缺点之前,不要去用它是最好的选择。

           (2)复合key

create table Bite (
      partkey varchar,
      score bigint,
      id varchar,
      data varchar,
      PRIMARY KEY (partkey, score, id)
  ) with clustering order by (score desc);
          这里分区字段是partkey,而score和id字段就是辅助key,直白一点说就是partkey决定了数据在哪一个节点上,score和id字段则决定了在同一个分区key下,数据是按照score和id来排序的。

            (3)复合key增强版

CREATE TABLE t_play_download_event (
 userId bigint,
 opType int,
 entityType int,
 executeTime timestamp,
 entityId bigint,
 sEntityId bigint,
 PRIMARY KEY ((userId, opType, entityType ), executeTime, entityId)
);
这个例子中,userId、opType、entityType联合作为分区键,但是数据可以根据executeTime和entityId来排序,不过在按照entityId来排序的时候必须得指定具体的executeTime的值。
         (二)分区器

       为什么要大书特书分区器呢?个人认为这个至关重要了!我们用C*的时候,怎么也得想清楚我们的数据在Cassandra中是如何分布的,支持那些特性或者又不支持那些特性,否则事半功倍亦或是南辕北辙了,最后的大家都受伤了。比如使用者会说C*这个性能啊真是差啊,什么Benchmark都是吹的;那些C*的老手都难得搭理这些恼骚,一帮不懂的人在那里瞎嚷嚷些啥啊!

      C*中采用虚拟的vnode的方式把row key分散存储到不同vnode,由于vnode可以分得相对较多些,所以数据基本上都均匀分布到不同vnode中去了(或许有人不同意这个说法,但是至少比早期的一个节点上就是一个token范围那强的太多了)。这个强调了一个”均匀“,这个是所有分布式存储都面临的问题了,要实现起来就难了,不同存储由于特性各异,支持的分布特性也各不同,当然这个是核心问题,也会给这个存储带来决定性的影响了。

       话说这么多前凑了,那到底C*中有哪些法宝来分散数据呢?

       Murmur3Partitioner:这个是默认的分区器,也是最最符合C*设计者们的需要的一个分区器。一句话,这个是根据作为row key的字段的hashcode来分布的,它的分布基本是均匀的。不过要记住,row key : 100001 ,100002,100003这些看起来是连续的row key采用hash后就不知道相差几万里了,大家分散到不同机器,不同节点都是可能的。基本可以认为是随机分布好了,也就是说row key只支持精确匹配,不支持大小、范围、前后缀匹配等等你想象到的查询。不过话说回来,在同一个token范围以内,row key是按照hashcode的值有序排列的。这个特性看起来没啥用,不过在特定情况下就不是一般的有用了,那是唯一的大招了。

        RandomPartitioner:这个分区器也是随机分区器,基本特性和Murmur3Partitioner大同小异,在Murmur3Partitioner实现之前这个就是C*的默认分区器。具体细节不说了,反正现在有Murmur3Partitioner分区器了,直接用它就好。

       ByteOrderedPartitioner:这个分区器是支持row key范围查询的。它采用的是row key的字节数据来按照字节序排序。

       OrderPreservingPartitioner:这个分区器也是支持row key范围查询的。它采用的是row key的utf-8编码方式来排序。

       话说回来,我这里就只重点介绍了第一个分区器Murmur3Partitioner,其它都是一带而过。可以这样说吧,用C*的话,你就用Murmur3Partitioner分区器好了,别的分区器不用为好,如果非要用,就要却行自己搞清楚搞明白了。不过,可以说十有八九是不怎么靠谱的。

      (三)实战的故事

       在我存储近80亿条数据的时候,是用4台机器,4个C*的节点,由于这些机器上不是只在运行C*,所以内存根据各机器不同,有的分配8G,有的分配4,还有的只有3G,至于磁盘就没有什么好说的,都是commit log 和 data 存储到一个STAT的硬盘上,裸盘,空间足够大。我所能采用的资源就这么些,也只能勉为其难了,不过好在Cassandra的性能足够好,对于我的场景足够用了。

       直接给出我的事例好了。

        (1)建好keyspace,复制因子为2,其它一路默认(用的的是DevCenter)下去就好了。

        (2)建好column family(t_user_event),其中userId、opType、entityType为rowkey,而executeTime、entityId和作为聚合key,同一个用户对视频或者音频文件的播放或者下载数据全部共用一个rowKey,而具体的操作数据则按照行为时间和操作对象ID来区隔开来。

CREATE TABLE t_user_event (
 userId bigint,
 opType int,
 entityType int,
 executeTime timestamp,
 entityId bigint,
 sEntityId bigint,
 PRIMARY KEY ((userId, opType, entityType ), executeTime, entityId)
) WITH bloom_filter_fp_chance = 0.01
AND comment = ''
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE'
AND caching = {
 'keys' : 'ALL',
 'rows_per_partition' : 'NONE'
}
AND compression = {
 'chunk_length_kb' : 64,
 'crc_check_chance' : 1.0,
 'sstable_compression' : 'LZ4Compressor'
}
AND compaction = {
 'bucket_high' : 1.5,
 'bucket_low' : 0.5,
 'class' : 'SizeTieredCompactionStrategy',
 'cold_reads_to_omit' : 0.05,
 'enabled' : true,
 'max_threshold' : 32,
 'min_sstable_size' : 50,
 'min_threshold' : 4,
 'tombstone_compaction_interval' : 86400,
 'tombstone_threshold' : 0.2,
 'unchecked_tombstone_compaction' : false
};

      (3)我直接就用Java版本的客户端(cassandra-driver-mapping)来写入数据

       注意我们的写入节点是一个单点,就放在Cassandra所属节点的一台服务器上。嗮一下我的写入性能,连续写入50亿条数据,整体的平均速度差不多保持在每秒50000条数据。期间也碰到不少问题,由于Linux的内存不够,都使用到交换区间了,导致机器的负载很高的现象,不过在机器资源相对富裕的机器上则很淡定,有一个不精确的估计,一个靠谱Cassandra的节点,应该分给Heap的内存在4-8G比较靠谱稳定高效。

       话说在读取数据的时候就太费脑筋了。我需要把写入到Cassandra的数据都遍历出来,最开始是用1-2亿范围的数字ID来作为用户ID,至于opType则之后选择使用播放还是下载,至于entityType就只用音视频来区分,这样累计出来的可能rowkey就有4-8亿了。最开始使用的读取基本用到了Cassandra的随机读,结果让人崩溃了,单节点多线程的方式去读,每秒钟才读200-500个而已,我的最大8亿个rowkey不知道何年何月才能遍历完。由此可见,Cassandra的随机读,在内存不足的时候,简直就是直接考验磁盘IO了。我的80亿条数据占了200G*2的空间,而我的4个节点上才有可怜的18G的内存,我那种访问方式看似顺序,而对于Cassandra来说就等于彻底的随机访问了,这样不慢才是怪事。

       想来想去,几无可能改善情况了。

       要想提供速度只得想大招把磁盘随机读变成顺序读才有可能。后来,终于想到如果能顺序遍历token不就可以充分利用顺序读吗?这不可以试一试了,就有了类似下面的CQL出现:

select token(userId,opType,entityType),executeTime,opType,sEntityId,entityId,userId,entityType from lrts_data_center.t_play_download_event where token(userId,opType,entityType) > 0 limit ?10000;
     实验下来,终于靠谱了。 这次,我把[Long. MIN_VALUE,Long.MAX_VALUE]范围内的token分成等分的8个区间,用8个线程来扫描各自负责的范围内的token。最后的结果太惊人了,遍历数据的速度达到每秒种峰值超过40万条,就是80亿条数据遍历下来的平均速度也高达 371820条/秒。这回,我的数据几个小时就可以清理一遍,完全满足要求了。

       (四)那些在等着你的坑

        1、第一个就是内存来了。Java这东东什么都好,就是GC不好,小了限制写入速度,大了也有问题。建议在单节点4-8G。反正我是碰到不少问题了,这里只给出结论。

        2、最常见的异常

2015-11-08 01:31:02,225 ERROR [com.lazyaudio.test.cassandra.task.RowTextParserTask:60] [IO error]errorMsg=Cassandra timeout during write query at consistency ONE (1 replica w
ere required but only 0 acknowledged the write)
com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
        at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73)
这个问题涉及面说多也多,说少也少,说白了还是内存问题。要把Java客户端的超时时间稍微调长点,或者在服务端的内存过小需要调整到4-8G范围,再不行还要能把断连得链接重试来重新链接。

           解决问题的过程才是最爽的,NB的人就是靠着一个有一个战胜困难才能达成的。


Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐