1、 数据瓶颈 

数据孤岛是企业发展到一定阶段后普遍遇到的问题。各个部门、业务、产品,各自定义和存储其数据,使得这些数据间难以关联,变成孤岛一般的存在。

OneID的做法是通过统一的实体识别和连接,打破数据孤岛,实现数据通融。各个部门、业务、产品对业务实体的UID(唯一性ID)的定义和实现不一样,使得数据间无法直接关联,成为了数据孤岛。基于手机号、身份证、邮箱、设备ID等信息,结合业务规则、机器学习、图算法等算法,进行 ID-Mapping,将各种 UID 都映射到统一ID上,即ONEID。通过这个统一ID,便可关联起各个数据孤岛的数据,实现数据通融,以确保业务分析、用户画像等数据应用的准确和全面。

2、用户画像

用户画像是大数据的核心组成部分,在很多企业中有着重要作用,透过用户画像,企业可以做到 精准营销及个性服务。标签是用户画像的基础且极其重要,但是通过什么将标签打到用户上,同样被个业务系统的数据孤岛所累,所以实现oneid对公司、对业务系统、对产品营销都是最重要的。

3、图计算

spark 调优:可参考​​​​​​【总结】Spark任务的core,executor,memory资源配置方法_巧克力黒_51CTO博客​​​​​​t​​​​​​tt

        One ID的核心价值是打通数据孤岛,把不同时期孤立建设的系统,用统一的ID串联起来。One ID功能就像是在修桥梁,把各个数据孤岛贯通之后,这些孤岛就连成一片。数据孤岛被打破之后,我们就能更全面、更完整的了解我们的用户、产品、商家,能够更加精准的评价他们的价值,进行进一步的价值发现,为精细化运营夯实数据基础。One ID的核心技术是ID-Mapping,其原理是将各系统的关键要素抽象成图计算用的“点”和“边”,用图计算算法很轻易的判定同一个“对象”,从而构建一个个无向连通图,生成ID映射字典。这个ID映射字典就是一座座通往各个数据孤岛的桥梁。我们通过这些桥梁,可以把相同“对象”在不同孤岛中的数据串联起来。这样,我们就掌控了全局,而非局部。

        基于阿里云的Spark graph图计算实践:

①背景:公司为实现精准营销,短信营销,APP营销等诸多业务场景,准备搭建用户标签体系,设计用户360度用户画像,从各个业务系统中接入数据,在每个用户身上打用户标签。介于各系统唯一标识不一,需要通过图计算方式设计用户ONEID。

②数据情况:由于不同系统数据是离散的,松散的存储,数据量1亿,idmapping 原始表如下:

phone    idcard    ssoid    uid
139******06    \N    U641**********60384    \N
139******06    130**********3213     \N    \N
139******06    \N     \N   13638
139******60    \N    U894**********92416    \N
139******60   621**********7780     \N    \N
139******60    \N     \N    964

预期结果:

oneid    ids

uuid1        139******06,U641**********60384,130**********3213,13638

uuid2        139******60,U894**********92416,621**********7780,964

③逻辑实现

package com.gwm

import java.util.UUID

import org.apache.commons.lang.StringUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.spark_project.jetty.util.StringUtil

import scala.collection.mutable
//import org.apache.spark.aliyun.odps.OdpsOps
/**
 * @author yangyingchun
 * @date 2021/12/30 11:33
 * @version 1.0
 */
object OdpsGraphx {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("OdpsGraphx").setLevel(Level.INFO)

    val spark =
      SparkSession.builder()
        .appName("ReadAndWriteOdps.CaseClassSchema")
//        .config("spark.master", "local[4]") // 需设置spark.master为local[N]才能直接运行,N为并发数。
        //        .config("spark.hadoop.odps.access.id", "LTAI4FjsYeq4RkoaGqHSw9Af")
        //        .config("spark.hadoop.odps.access.key", "tXYc*****************T98EmS")
        //        .config("spark.hadoop.odps.end.point", "http://service.cn-zhangjiakou.maxcompute.aliyun.com/api")
        //        .config("spark.hadoop.odps.project.name", "yyc_odps")
        //        .config("spark.sql.catalogImplementation", "odps")
        .config(new SparkConf())
        .getOrCreate()

    // 导入sparksql的隐式转换
    import spark.implicits._

    //执行查询
    var frame: Dataset[Row] = spark.sql("select distinct phone,idcard,ssoid,uid from idmapping")


    //      var frame: Dataset[String] = spark.read.textFile("E:\\software\\workspace\\graph\\src\\main\\resources\\id-mapping.txt")

    var value: Dataset[Row] = frame.alias("phone,idcard,ssoid,uid").repartition(8)

    var schema: StructType = value.schema
    var names: Array[String] = schema.fieldNames


    //    value.foreach(x=>println(x))

    //通过graph 获取oneid
    //点
    //    println(
    //      """
    //        |yield的用法总结
    //        |针对每一次 for 循环的迭代, yield 会产生一个值,被循环记录下来 (内部实现上,像是一个缓冲区).
    //        |当循环结束后, 会返回所有 yield 的值组成的集合.
    //        |返回集合的类型与被遍历的集合类型是一致的.
    //      """.stripMargin)
    //去掉空 null
    var dset: Dataset[String] = value
      .map(row => {
        var str = ""
        for (i <- 0 to row.size - 1) {
          if (null != row.get(i)) {
            str += names(i) + ":" + row.get(i) + ","
          }
        }
        var res = ""
        if (StringUtils.isNotBlank(str.substring(0, str.length - 1))) {
          res = str.substring(0, str.length - 1)
        }
        res
      }
      )

    // 点
    val dian: RDD[(Long, String)] = dset.rdd.flatMap(row => {
      val arr = row.split(",")
      for (e <- arr if null != e && e.length > 0 && !"NULL".equals(e)) yield (e.hashCode.toLong, e)
    }
    )

    // 边 两个点构成边
    val bian = dset.rdd.flatMap(line => {
      val arr: mutable.ArrayOps[String] = line.split(",")
      for (i <- 0 to arr.length - 2 if null != arr(i) && arr(i).length > 0 && !"NULL".equals(arr(i))) yield Edge(arr(i).hashCode.toLong, arr(i + 1).hashCode.toLong, "")
    })

    //构建图  在同一个线路中会与一个唯一的值
    val tu = Graph(dian, bian)
    val gp: Graph[VertexId, String] = tu.connectedComponents()
    // 获取所有的点 将数据进行了分组
    //生成最大连通图 将最小图计算值替换成uuid

    var rdd: RDD[(String, Iterable[VertexId])] = gp.vertices //所有顶点
      .map(tp => (tp._2, tp._1)) //倒排
      .groupByKey()
      .map(tp => (UUID.randomUUID().toString, tp._2))

    //将数据收集到Driver端  然后广播出去
    var res: collection.Map[VertexId, String] = dian.collectAsMap()
    val bc = spark.sparkContext.broadcast(res)

    // 利用上面的bc数据加工所有的数据
    var rr: RDD[(String, String)] = rdd.mapPartitions(line => {
      var mp: collection.Map[VertexId, String] = bc.value
      line.map(tp => {
        //从广播变量中获取id值的信息并转换
        //        var value1: Iterable[VertexId] = tp._2
        val t2: Iterable[String] = for (ele <- tp._2) yield mp.get(ele).get
        var set: Set[String] = t2.toSet
        (tp._1, set.mkString(","))
      })
    })


    //由于maxcompute string类型最大存8M数据,所以将大于8M的识别过滤
    val result = rr
      .filter(x => {
        if (x._2.length > 8388608) {
          println(x._2)
        }
        x._2.length <= 8388608
      }) //odps string 类型最大存储8M
      .toDF().alias("oneid,ids")


    //result.write.insertInto("id_mapping")
    result.write.mode(SaveMode.Overwrite).insertInto("oneid_mapping")
    //    result.write.mode("overwrite").insertInto("oneid_mapping") // insertOverwrite语义
    spark.close()

  }
}

④提交测试

 ⑤各种问题

参考过:Spark程序运行常见错误解决方法以及优化 - double_kill - 博客园

1、java.lang.OutOfMemoryError: Java heap space

22/01/10 14:06:38 ERROR Utils: Uncaught exception in thread task-result-getter-0
java.lang.OutOfMemoryError: Java heap space
Exception in thread "task-result-getter-0" java.lang.OutOfMemoryError: Java heap space
22/01/10 14:08:41 ERROR Utils: Uncaught exception in thread task-result-getter-6
java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "task-result-getter-6" java.lang.OutOfMemoryError: GC overhead limit exceeded
22/01/10 14:09:20 WARN TransportChannelHandler: Exception in connection from 201c09209.cloud.c11.qm64/10.252.0.83:26402
java.lang.OutOfMemoryError: Java heap space

解决:属性spark.yarn.executor.memoryOverhead用于配置预留的这部分内存,MemoryOverhead是JVM进程中除Java堆以外占用的空间大小,包括方法区(永久代)、Java虚拟机栈、本地方法栈、JVM进程本身所用的内存、直接内存。如果没有配置该属性,则默认值由一个公式math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))进行计算,其中,MEMORY_OVERHEAD_FACTOR=0.1,MEMORY_OVERHEAD_MIN=384m,executorMemory=args.executor-memory。
 

2、java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

解决:

3、AbstractChannel$AnnotatedConnectException: Connection refused: 云ip/ip:36644

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: 云ip/ip:36644
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    ... 2 more

⑥ID强打通

作者:Treant

出处:Treant - 博客园

1. 背景

在构建精准用户画像时,面临着这样一个问题:日志采集不能成功地收集用户的所有ID,且每条业务线有各自定义的UID用来标识用户,从而造成了用户ID的零碎化。因此,为了做用户标签的整合,用户ID之间的强打通(亦称为ID-Mapping)成了迫切的需求。大概三年前,在知乎上有这样一个与之相类似的问题:如何用MR实现并查集以对海量数据pair做聚合;目前为止还无人解答。本文将提供一个可能的基于MR计算框架的解决方案,以实现大数据下的ID强打通。

首先,简要地介绍下Android设备常见的ID:

  • IMEI(International Mobile Equipment Identity),即通常所说的手机序列号、手机“串号”,用于在移动电话网络中识别每一部独立的手机等行动通讯装置;序列号共有15位数字,前6位(TAC)是型号核准号码,代表手机类型。接着2位(FAC)是最后装配号,代表产地。后6位(SNR)是串号,代表生产顺序号。最后1位(SP)一般为0,是检验码,备用。
  • MAC(Media Access Control)一般代指MAC位址,为网卡的标识,用来定义网络设备的位置。
  • IMSI(International Mobile SubscriberIdentification Number),储存在SIM卡中,可用于区别移动用户的有效信息;其总长度不超过15位,同样使用0~9的数字。其中MCC是移动用户所属国家代号,占3位数字,中国的MCC规定为460;MNC是移动网号码,最多由两位数字组成,用于识别移动用户所归属的移动通信网;MSIN是移动用户识别码,用以识别某一移动通信网中的移动用户。
  • Android ID是系统随机生成的设备ID 为一串64位的编码(十六进制的字符串),通过它可以知道设备的寿命(在设备恢复出厂设置或刷机后,该值可能会改变)。
  • IDFA (Identifier for Advertisers) 是苹果推出来的用于广告标识的设备ID,同一设备上的不同APP所获取的IDFA是一致的;但是用户可以自主更改IDFA,所以IDFA并不是和设备一一绑定的。

2. 设计

从图论的角度出发,ID强打通更像是将小连通图合并成一个大连通图;比如,在日志中出现如下三条记录,分别表示三个ID集合(小连通图):

A   B   C
        C   D
            D   E

通过将三个小连通图合并,便可得到一个大连通图——完整的ID集合列表A B C D E淘宝明风介绍了如何用Spark GraphX通过outerJoinVertices等运算符来做大数据下的多图合并;针对ID强打通的场景,也可采用类似的思路:日志数据构建大的稀疏图,然后采用自join的方式做打通。但是,我并没有选用GraphX,理由如下:

  • GraphX只支持有向图,而不支持无向图,而ID之间的关联关系是一个无向连通图;
  • GraphX的join操作不完全可控,“不完全可控”是指在做图合并时我们需要做过滤山寨设备、一对多的ID等操作,而在GraphX封装好的join算子上实现过滤操作则成本过高。

因而,基于MR计算模型(Spark框架)我设计新的ID打通算法;算法流程如下:打通的map阶段将ID集合id_set中每一个Id做key然后进行打散(id_set.map(id -> id_set))),Reduce阶段按key做id_set的合并。通过观察发现:仅需要两步MR便可完成上述打通的操作。以上面的例子做说明,第一步MR完成后,打通ID集合为:A B C D、 C D E,第二步MR完成后便得到完整的ID集合列表A B C D E。但是,在两步MR过程中,所有的key都会对应一个聚合结果,而其中一些聚合结果只是中间结果。故而引入了key_set用于保存聚合时的key值,加入了第三步MR,通过比较key_setid_set来对中间聚合结果进行过滤。算法的伪代码如下:

MR step1:
    Map: 
        input: id_set
        process: flatMap id_set;
        output: id -> (id_set, 1)
    Rduce:
        process: reduceByKey
        output: id -> (id_set, empty key_set, int_value)
        
MR step2:
    Map:
        input: id -> (id_set, empty key_set, int_value)
        process: flatMap id_set, if have id_aggregation, then add key to key_set
        output: id -> (id_set, key_set, int_value)
    Reduce: 
        process: reduceByKey
        output: id -> (id_set, key_set, int_value)
        
MR step3:
    Map:
        input: id -> (id_set, empty key_set, int_value)
        process: flatMap id_set, if have id_aggregation, then add key to key_set
        output: id -> (id_set, key_set, int_value)
    Reduce: 
        process: reduceByKey
        output: id -> (id_set, key_set, int_value)
        
Filters:
    process: if have id_aggregation, then add key to key_set
    filter: if no id_aggregation or key_set == id_set
    distinct

3. 实现

针对上述ID强打通算法,Spark实现代码如下:

case class DvcId(id: String, value: String)

val log: RDD[mutable.Set[DvcId]]
// MR1
val rdd1: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = log
  .flatMap { set =>
    set.map(t => (t, (set, 1)))
  }.reduceByKey { (t1, t2) =>
    t1._1 ++= t2._1
    val added = t1._2 + t2._2
    (t1._1, added)
  }.map { t =>
    (t._1, (t._2._1, mutable.Set.empty[DvcId], t._2._2))
  }
// MR2
val rdd2: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = rdd1
  .flatMap(flatIdSet).reduceByKey(tuple3Add)
// MR3
val rdd3: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = rdd2
  .flatMap(flatIdSet).reduceByKey(tuple3Add)
// filter
val rdd4 = rdd3.filter { t =>
  t._2._2 += t._1
  t._2._3 == 1 || (t._2._1 -- t._2._2).isEmpty
}.map(_._2._1).distinct()

// flat id_set
def flatIdSet(row: (DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))) = {
  row._2._3 match {
    case 1 =>
      Array((row._1, (row._2._1, row._2._2, row._2._3)))
    case _ =>
      row._2._2 += row._1 // add key to keySet
      row._2._1.map(d => (d, (row._2._1, row._2._2, row._2._3))).toArray
  }
}

def tuple3Add(t1: (mutable.Set[DvcId], mutable.Set[DvcId], Int),
              t2: (mutable.Set[DvcId], mutable.Set[DvcId], Int)) = {
  t1._1 ++= t2._1
  t1._2 ++= t2._2
  val added = t1._3 + t2._3
  (t1._1, t1._2, added)
}

其中,引入常量1是为了标记该条记录是否发生了ID聚合的情况。

ID强打通算法实现起来比较简单,但是在实际的应用时,日志数据往往是带噪声的:

  • 有山寨设备;
  • ID之间存在着一对多的情况,比如,各业务线的UID的靠谱程度不一,有的UID会对应到多个设备。

另外,ID强打通后是HDFS的离线数据,为了提供线上服务、保证ID之间的一一对应关系,应选择何种分布式数据库、表应如何设计、如何做到数据更新时而不影响线上服务等等,则是另一个需要思考的问题。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐