使用布隆过滤器的flink十亿级数据实时过滤实践一
百亿级数据实时过滤,布隆过滤器,scala实现,flink广播状态使用,BroadcastProcessFunction
·
1项目背景
1.1 需求
实时推荐项目需求如下:根据用户实时行为(如关注,播放,收藏)推荐该UP主(关注的up主,播放视频发布up主,收藏up主)或其相似UP主的作品,UP主及相似UP主下的作品是提前离线召回排序计算好了,存在redis中的数据。
由于是实时触发,有些作者的离线数据是没有生成的(如新的up主),实时推荐时需要将这部分用户下实时行为下的UP主给过滤掉。由于底层有20亿数据(用户+UP主为唯一id生成的数据)需要过滤,因此考虑使用布隆过滤器进行过滤。
1.2 布隆过滤器实时过滤实现思路
有以下两不需要考虑:
1 怎样构建布隆过滤器,即创建更新存储布隆过滤器
2 当布隆过滤器实时更新时,flink里的布隆过滤器怎样随之实时更新
(由于推荐数据是每天更新的,因此布隆过滤器数据也是每天更新,也就要求flink中使用的布隆过滤器也要实时更新)
2 布隆过滤器存入redis
此部分针对上面的问题1 实现
有以下三步:
1 使用com.google.common.hash.BloomFilter 构建布隆过滤器,写入数据
2 将布隆过滤器存入redis
3 从redis读取布隆过滤器数据,转换为com.google.common.hash.BloomFilter 过滤器进行使用
import com.google.common.hash.{BloomFilter, Funnels}
import jutil.JedisClient
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.Charset
object bloomfilterByGuavaTest {
val expectedSize=200000000
val falsePositiveRate=0.01
def main(args: Array[String]): Unit = {
// 1 创建布隆过滤器
val bloomKey="foryou_bloom_filter"
val bloomFilter:BloomFilter[String] = BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("UTF-8")), expectedSize, falsePositiveRate)
//1.1 写入数据
for(i <- 1 to 10) {
bloomFilter.put(s"num${i}")
}
// 2 redis连接并写入序列化的布隆过滤器
// 2.1 构建redis连接器
val redisClient = new JedisClient(Conf.redisHosts)
// 2.2 布隆过滤器写入redis
val outputStream = new ByteArrayOutputStream()
bloomFilter.writeTo(outputStream)
val bitSetByteArray = outputStream.toByteArray
redisClient.del(bloomKey)
redisClient.set(bloomKey, bitSetByteArray)
redisClient.expireBySecond(bloomKey,7*24*60*60)
// 3 从redis读取布隆过滤器,并进行过滤
val bitSetByteArray2:Array[Byte] = redisClient.get(bloomKey).asInstanceOf[Array[Byte]]
val bf = BloomFilter.readFrom[String](new ByteArrayInputStream(bitSetByteArray2), Funnels.stringFunnel(Charset.forName("UTF-8")))
println("是否包含num1",bf.mightContain("num1"))
println("是否包含num8",bf.mightContain("num8"))
println("是否包含num13",bf.mightContain("num13"))
redisClient.close()
}
打印结果如下
3 使用flink 的BroadcastProcessFunction实时更新布隆过滤器
此部分针对上面的问题2实现
思路:
1 构建一个流实定时读取redis中的布隆过滤器(有坑,需要注意Kryo序列化失败问题)
2 将业务流和布隆过滤器流使用connect相结合
3 自定义实现BroadcastProcessFunction方法
基于篇幅原因,此部分写于下篇播客
更多推荐
已为社区贡献1条内容
所有评论(0)