操作Hbase 工具类,scala版本+k8s版

object HBaseHelper {

  private val logger = LoggerFactory.getLogger(getClass)

  val QUORUM: String = PropertiesUtils.getString("hbase.zookeeper.quorum")
  val PORT: String = PropertiesUtils.getString("hbase.zookeeper.port")
  val ZNODE: String = PropertiesUtils.getString("hbase.zookeeper.zNode")

  val IS_KERBEROS: Boolean = PropertiesUtils.getBoolean("hbase.kerberos")
  val KRB5_CONF_PATH: String = PropertiesUtils.getString("java.security.krb5.conf")
  val KEYTAB_PATH: String = PropertiesUtils.getString("kerberos.keytab.path")
  val KERBEROS_USER: String = PropertiesUtils.getString("kerberos.user")
  val HBASE_SITE_FILE: String = PropertiesUtils.getString("hbase.site.file")
  val CORE_SITE_FILE: String = PropertiesUtils.getString("core.site.file")
  val HDFS_SITE_FILE: String = PropertiesUtils.getString("hdfs.site.file")

  var connection: Connection = _
  var ugi: UserGroupInformation = _
  var conf: Configuration = _

  def login(): UserGroupInformation = {
    if (ugi == null) {
      if (conf == null) {
        conf = HBaseConfiguration.create()
      }
      //kerberos
      try {
        System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH)
        conf.set("hadoop.security.authentication", "Kerberos")
        conf.set("keytab.file", KEYTAB_PATH)
        conf.set("kerberos.principal", KERBEROS_USER)
        UserGroupInformation.setConfiguration(conf)
        ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(KERBEROS_USER, KEYTAB_PATH)
      } catch {
        case e: IOException =>
          logger.error(s"login hbase from keytab error,Cause:$e")
      }
    } 
    ugi
  }

  def getConnection: Connection = {
    if (connection == null) {
      if (conf == null) {
        conf = HBaseConfiguration.create()
      }
      if (IS_KERBEROS) {
        conf.addResource(new Path(CORE_SITE_FILE))
        conf.addResource(new Path(HDFS_SITE_FILE))
        conf.addResource(new Path(HBASE_SITE_FILE))
      } else {
        conf.set(HConstants.ZOOKEEPER_QUORUM, QUORUM)
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, PORT)
        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZNODE)
      }
      connection = ConnectionFactory.createConnection(conf)
    }
    connection
  }


  def createNamespace(namespace: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = createHBaseNamespace(namespace)
      })
    } else {
      createHBaseNamespace(namespace)
    }
  }

  def createHBaseNamespace(namespace: String): Boolean = {
    try {
      getConnection.getAdmin.createNamespace(NamespaceDescriptor.create(namespace).build())
      true
    } catch {
      case e: Exception =>
        logger.error(s"create hbase namespace $namespace error: $e")
        false
    }
  }

  def existsNamespace(namespace: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = existsHBaseNamespace(namespace)
      })
    } else {
      existsHBaseNamespace(namespace)
    }
  }

  def existsHBaseNamespace(namespace: String): Boolean = {
    try {
      getConnection.getAdmin.getNamespaceDescriptor(namespace)
      true
    } catch {
      case e: Exception =>
        logger.error(s"hbase namespace $namespace is not exists")
        false
    }
  }

  def dropNamespace(namespace: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = dropHBaseNamespace(namespace)
      })
    } else {
      dropHBaseNamespace(namespace)
    }
  }

  def dropHBaseNamespace(namespace: String): Boolean = {
    try {
      getConnection.getAdmin.deleteNamespace(namespace)
      true
    } catch {
      case e: Exception =>
        logger.error(s"drop hbase namespace $namespace error: $e")
        false
    }
  }

  def checkAndCreateNameSpace(namespace: String): Unit = {
    if (!existsNamespace(namespace)) {
      createNamespace(namespace)
    }
  }

  def createTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = createHBaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
      })
    } else {
      createHBaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
    }
  }

  def createHBaseTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
    try {
      val hTable = new HTableDescriptor(TableName.valueOf(tableName))
      val hColumn = new HColumnDescriptor(columnFamilyName) 
      hColumn.setMaxVersions(versions)
      hTable.addFamily(hColumn)
      //设置过期时间,-1为永久
      if(timeToLive > -1){
        hColumn.setTimeToLive(timeToLive)
      }
      getConnection.getAdmin.createTable(hTable)
      true
    } catch {
      case e: Exception =>
        logger.info(s"create hbase table $tableName error: $e")
        false
    }
  }

  def existsTable(tableName: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = existsHBaseTable(tableName)
      })
    } else {
      existsHBaseTable(tableName)
    }
  }

  def existsHBaseTable(tableName: String): Boolean = {
    try {
      val bool: Boolean = getConnection.getAdmin.tableExists(TableName.valueOf(tableName))
      bool
    } catch {
      case e: Exception =>
        logger.info(s"hbase table $tableName is not exists")
        false
    }
  }

  def checkAndCreateNameTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Unit = {
    if (!existsTable(tableName)) {
      createTable(tableName, columnFamilyName, versions, timeToLive: Int)
    }
  }

  def getTable(connection: Connection, tableName: String): Option[Table] = {
    try {
      Some(connection.getTable(TableName.valueOf(tableName)))
    } catch {
      case e: Exception =>
        logger.error(s"hbase getTable error,Cause:$e")
        None
    }
  }

  def getData(tableName: String, get: Get): Option[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Option[JSONObject]] {
        override def run(): Option[JSONObject] = getHBaseData(tableName, get)
      })
    } else {
      getHBaseData(tableName, get)
    }
  }

  private def getHBaseData(tableName: String, get: Get): Option[JSONObject] = {
    try {
      val connection: Connection = getConnection
      val result: Result = getTable(connection, tableName).get.get(get)
      var obj: JSONObject = null
      if (!result.isEmpty) {
        val cellArray: Array[Cell] = result.rawCells()
        obj = new JSONObject
        cellArray.foreach(cell => {
          val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
          val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
          obj.put(cellName, cellValue)
        })
        Some(obj)
      } else {
        None
      }
    } catch {
      case e: Exception =>
        logger.error(s"hbase getData error,Cause:$e")
        None
    }
  }

  def getOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
        override def run(): ArrayBuffer[JSONObject] = getHBaseOneVersionData(tableName, getList)
      })
    } else {
      getHBaseOneVersionData(tableName, getList)
    }
  }

  def getHBaseOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
    try {
      val connection: Connection = getConnection
      val resultArray: Array[Result] = getTable(connection, tableName).get.get(getList)
      resultArray.foreach(result => {
        if (!result.isEmpty) {
          val cellArray: Array[Cell] = result.rawCells()
          val obj = new JSONObject
          cellArray.foreach(cell => {
            val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
            val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
            obj.put(cellName, cellValue)
          })
          arrayBuffer += obj
        }
      })
    } catch {
      case e: Exception =>
        logger.error(s"hbase getOneVersionData error,Cause:$e")
        None
    }
    arrayBuffer
  }

  def getMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
        override def run(): ArrayBuffer[JSONObject] = getHBaseMultiVersionData(tableName, getList)
      })
    } else {
      getHBaseMultiVersionData(tableName, getList)
    }
  }


  def getHBaseMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
    try {
      val connection: Connection = getConnection
      val searchArray: Array[Result] = getTable(connection, tableName).get.get(getList)
      searchArray.foreach(bk => {
        val groupMap: Map[Long, Array[Cell]] = bk.rawCells().groupBy(_.getTimestamp)
        for (key <- groupMap.keySet) {
          val cellArray: Array[Cell] = groupMap.apply(key)
          val obj = new JSONObject
          obj.put("timestamp", key)
          cellArray.foreach(cell => {
            val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
            val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
            obj.put(cellName, cellValue)
          })
          arrayBuffer += obj
        }
      })
    } catch {
      case e: Exception =>
        logger.error(s"HBase getMultiVersionData error:$e")
    }
    arrayBuffer
  }

  def scanOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
        override def run(): ArrayBuffer[JSONObject] = scanHBaseOneVersionData(tableName, scan)
      })
    } else {
      scanHBaseOneVersionData(tableName, scan)
    }
  }

  def scanHBaseOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
    val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
    try {
      val connection: Connection = getConnection

      val resultScanner: ResultScanner = getTable(connection, tableName).get.getScanner(scan)
      val it: util.Iterator[Result] = resultScanner.iterator()
      while (it.hasNext) {
        val bk: Result = it.next()
        val cellArray: Array[Cell] = bk.rawCells()

        val obj = new JSONObject
        obj.put("rowKey", Bytes.toString(bk.getRow))
        cellArray.foreach(cell => {
          val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
          val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
          obj.put(cellName, cellValue)
        })
        arrayBuffer += obj
      }
    } catch {
      case e: Exception =>
        logger.error(s"HBase getMultiVersionData error:$e")
    }
    arrayBuffer
  }

  def putData(tableName: String, put: Put): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = putHBaseData(tableName, put)
      })
    } else {
      putHBaseData(tableName, put)
    }
  }

  def putHBaseData(tableName: String, put: Put): Boolean = {
    var result = false
    try {
      val connection: Connection = getConnection
      val table: Table = getTable(connection, tableName).get
      table.put(put)
      result = true
    } catch {
      case e: Exception =>
        logger.error(s"HBase putData error:$e")
        result = false
    }
    result
  }

  def putDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = putHBaseDataList(tableName, putList)
      })
    } else {
      putHBaseDataList(tableName, putList)
    }
  }

  def putHBaseDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
    var result = false
    try {
      val connection: Connection = getConnection
      val table: Table = getTable(connection, tableName).get
      table.put(putList)
      result = true
    } catch {
      case e: Exception =>
        logger.error(s"HBase putDataList error:$e")
        result = false
    }
    result
  }

  def delData(tableName: String, delete: Delete): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = delHBaseData(tableName, delete)
      })
    } else {
      delHBaseData(tableName, delete)
    }
  }

  def delHBaseData(tableName: String, delete: Delete): Boolean = {
    var result = false
    try {
      val connection: Connection = getConnection
      val table: Table = getTable(connection, tableName).get
      table.delete(delete)
      result = true
    } catch {
      case e: Exception =>
        logger.error(s"HBase delData error:$e")
        result = false
    }
    result
  }

  def buildScan(rowKeyPrefix: Array[Byte]): Scan = {
    val scan = new Scan()
    val prefixFilter = new PrefixFilter(rowKeyPrefix)
    scan.setFilter(prefixFilter)
    scan.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
    scan
  }

  def buildGet(rowKey: Array[Byte], versions: Int): Get = {
    val get = new Get(rowKey)
    get.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
    //setMaxVersions被弃用了 新版本用readVersions(versions)
    //get.setMaxVersions(versions)
    get.readVersions(versions)
    get
  }

  def buildPut(rowKey: Array[Byte], gjValue: JSONObject): Put = {
    import scala.collection.JavaConversions._
    val put = new Put(rowKey)
    for (key <- gjValue.keySet()) {
      put.addColumn(HbaseConfig.FAMILY_NAME.getBytes, key.getBytes, String.valueOf(gjValue.get(key)).getBytes)
    }
    put
  }

}

参考:https://blog.csdn.net/qq_45669302/article/details/122431873

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐