1项目背景

1.1 需求

实时推荐项目需求如下:根据用户实时行为(如关注,播放,收藏)推荐该UP主(关注的up主,播放视频发布up主,收藏up主)或其相似UP主的作品,UP主及相似UP主下的作品是提前离线召回排序计算好了,存在redis中的数据。
由于是实时触发,有些作者的离线数据是没有生成的(如新的up主),实时推荐时需要将这部分用户下实时行为下的UP主给过滤掉。由于底层有20亿数据(用户+UP主为唯一id生成的数据)需要过滤,因此考虑使用布隆过滤器进行过滤。

1.2 布隆过滤器实时过滤实现思路

有以下两不需要考虑:
1 怎样构建布隆过滤器,即创建更新存储布隆过滤器
2 当布隆过滤器实时更新时,flink里的布隆过滤器怎样随之实时更新
(由于推荐数据是每天更新的,因此布隆过滤器数据也是每天更新,也就要求flink中使用的布隆过滤器也要实时更新)

2 布隆过滤器存入redis

此部分针对上面的问题1 实现
有以下三步:
1 使用com.google.common.hash.BloomFilter 构建布隆过滤器,写入数据
2 将布隆过滤器存入redis
3 从redis读取布隆过滤器数据,转换为com.google.common.hash.BloomFilter 过滤器进行使用


import com.google.common.hash.{BloomFilter, Funnels}
import jutil.JedisClient
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.Charset


object bloomfilterByGuavaTest {
  val expectedSize=200000000
  val falsePositiveRate=0.01
  def main(args: Array[String]): Unit = {

   // 1 创建布隆过滤器
    val bloomKey="foryou_bloom_filter"
    val bloomFilter:BloomFilter[String] = BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("UTF-8")), expectedSize, falsePositiveRate)
    //1.1 写入数据
    for(i <- 1 to 10) {
      bloomFilter.put(s"num${i}")
       }

    // 2 redis连接并写入序列化的布隆过滤器
    // 2.1 构建redis连接器
    val redisClient = new JedisClient(Conf.redisHosts)
    // 2.2 布隆过滤器写入redis
    val outputStream = new ByteArrayOutputStream()
    bloomFilter.writeTo(outputStream)
    val bitSetByteArray = outputStream.toByteArray
    redisClient.del(bloomKey)
    redisClient.set(bloomKey, bitSetByteArray)
    redisClient.expireBySecond(bloomKey,7*24*60*60)

//   3 从redis读取布隆过滤器,并进行过滤
    val bitSetByteArray2:Array[Byte] = redisClient.get(bloomKey).asInstanceOf[Array[Byte]]
    val bf = BloomFilter.readFrom[String](new ByteArrayInputStream(bitSetByteArray2), Funnels.stringFunnel(Charset.forName("UTF-8")))

    println("是否包含num1",bf.mightContain("num1"))
    println("是否包含num8",bf.mightContain("num8"))
    println("是否包含num13",bf.mightContain("num13"))
    redisClient.close()

  }

打印结果如下
在这里插入图片描述

3 使用flink 的BroadcastProcessFunction实时更新布隆过滤器

此部分针对上面的问题2实现
思路:
1 构建一个流实定时读取redis中的布隆过滤器(有坑,需要注意Kryo序列化失败问题)
2 将业务流和布隆过滤器流使用connect相结合
3 自定义实现BroadcastProcessFunction方法
基于篇幅原因,此部分写于下篇播客

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐