Hbase工具类(scala版本)
操作Hbase 工具类,scala版本+k8s版
·
操作Hbase 工具类,scala版本+k8s版
object HBaseHelper {
private val logger = LoggerFactory.getLogger(getClass)
val QUORUM: String = PropertiesUtils.getString("hbase.zookeeper.quorum")
val PORT: String = PropertiesUtils.getString("hbase.zookeeper.port")
val ZNODE: String = PropertiesUtils.getString("hbase.zookeeper.zNode")
val IS_KERBEROS: Boolean = PropertiesUtils.getBoolean("hbase.kerberos")
val KRB5_CONF_PATH: String = PropertiesUtils.getString("java.security.krb5.conf")
val KEYTAB_PATH: String = PropertiesUtils.getString("kerberos.keytab.path")
val KERBEROS_USER: String = PropertiesUtils.getString("kerberos.user")
val HBASE_SITE_FILE: String = PropertiesUtils.getString("hbase.site.file")
val CORE_SITE_FILE: String = PropertiesUtils.getString("core.site.file")
val HDFS_SITE_FILE: String = PropertiesUtils.getString("hdfs.site.file")
var connection: Connection = _
var ugi: UserGroupInformation = _
var conf: Configuration = _
def login(): UserGroupInformation = {
if (ugi == null) {
if (conf == null) {
conf = HBaseConfiguration.create()
}
//kerberos
try {
System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH)
conf.set("hadoop.security.authentication", "Kerberos")
conf.set("keytab.file", KEYTAB_PATH)
conf.set("kerberos.principal", KERBEROS_USER)
UserGroupInformation.setConfiguration(conf)
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(KERBEROS_USER, KEYTAB_PATH)
} catch {
case e: IOException =>
logger.error(s"login hbase from keytab error,Cause:$e")
}
}
ugi
}
def getConnection: Connection = {
if (connection == null) {
if (conf == null) {
conf = HBaseConfiguration.create()
}
if (IS_KERBEROS) {
conf.addResource(new Path(CORE_SITE_FILE))
conf.addResource(new Path(HDFS_SITE_FILE))
conf.addResource(new Path(HBASE_SITE_FILE))
} else {
conf.set(HConstants.ZOOKEEPER_QUORUM, QUORUM)
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, PORT)
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZNODE)
}
connection = ConnectionFactory.createConnection(conf)
}
connection
}
def createNamespace(namespace: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = createHBaseNamespace(namespace)
})
} else {
createHBaseNamespace(namespace)
}
}
def createHBaseNamespace(namespace: String): Boolean = {
try {
getConnection.getAdmin.createNamespace(NamespaceDescriptor.create(namespace).build())
true
} catch {
case e: Exception =>
logger.error(s"create hbase namespace $namespace error: $e")
false
}
}
def existsNamespace(namespace: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = existsHBaseNamespace(namespace)
})
} else {
existsHBaseNamespace(namespace)
}
}
def existsHBaseNamespace(namespace: String): Boolean = {
try {
getConnection.getAdmin.getNamespaceDescriptor(namespace)
true
} catch {
case e: Exception =>
logger.error(s"hbase namespace $namespace is not exists")
false
}
}
def dropNamespace(namespace: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = dropHBaseNamespace(namespace)
})
} else {
dropHBaseNamespace(namespace)
}
}
def dropHBaseNamespace(namespace: String): Boolean = {
try {
getConnection.getAdmin.deleteNamespace(namespace)
true
} catch {
case e: Exception =>
logger.error(s"drop hbase namespace $namespace error: $e")
false
}
}
def checkAndCreateNameSpace(namespace: String): Unit = {
if (!existsNamespace(namespace)) {
createNamespace(namespace)
}
}
def createTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = createHBaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
})
} else {
createHBaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
}
}
def createHBaseTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
try {
val hTable = new HTableDescriptor(TableName.valueOf(tableName))
val hColumn = new HColumnDescriptor(columnFamilyName)
hColumn.setMaxVersions(versions)
hTable.addFamily(hColumn)
//设置过期时间,-1为永久
if(timeToLive > -1){
hColumn.setTimeToLive(timeToLive)
}
getConnection.getAdmin.createTable(hTable)
true
} catch {
case e: Exception =>
logger.info(s"create hbase table $tableName error: $e")
false
}
}
def existsTable(tableName: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = existsHBaseTable(tableName)
})
} else {
existsHBaseTable(tableName)
}
}
def existsHBaseTable(tableName: String): Boolean = {
try {
val bool: Boolean = getConnection.getAdmin.tableExists(TableName.valueOf(tableName))
bool
} catch {
case e: Exception =>
logger.info(s"hbase table $tableName is not exists")
false
}
}
def checkAndCreateNameTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Unit = {
if (!existsTable(tableName)) {
createTable(tableName, columnFamilyName, versions, timeToLive: Int)
}
}
def getTable(connection: Connection, tableName: String): Option[Table] = {
try {
Some(connection.getTable(TableName.valueOf(tableName)))
} catch {
case e: Exception =>
logger.error(s"hbase getTable error,Cause:$e")
None
}
}
def getData(tableName: String, get: Get): Option[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Option[JSONObject]] {
override def run(): Option[JSONObject] = getHBaseData(tableName, get)
})
} else {
getHBaseData(tableName, get)
}
}
private def getHBaseData(tableName: String, get: Get): Option[JSONObject] = {
try {
val connection: Connection = getConnection
val result: Result = getTable(connection, tableName).get.get(get)
var obj: JSONObject = null
if (!result.isEmpty) {
val cellArray: Array[Cell] = result.rawCells()
obj = new JSONObject
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
obj.put(cellName, cellValue)
})
Some(obj)
} else {
None
}
} catch {
case e: Exception =>
logger.error(s"hbase getData error,Cause:$e")
None
}
}
def getOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
override def run(): ArrayBuffer[JSONObject] = getHBaseOneVersionData(tableName, getList)
})
} else {
getHBaseOneVersionData(tableName, getList)
}
}
def getHBaseOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
try {
val connection: Connection = getConnection
val resultArray: Array[Result] = getTable(connection, tableName).get.get(getList)
resultArray.foreach(result => {
if (!result.isEmpty) {
val cellArray: Array[Cell] = result.rawCells()
val obj = new JSONObject
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
obj.put(cellName, cellValue)
})
arrayBuffer += obj
}
})
} catch {
case e: Exception =>
logger.error(s"hbase getOneVersionData error,Cause:$e")
None
}
arrayBuffer
}
def getMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
override def run(): ArrayBuffer[JSONObject] = getHBaseMultiVersionData(tableName, getList)
})
} else {
getHBaseMultiVersionData(tableName, getList)
}
}
def getHBaseMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
try {
val connection: Connection = getConnection
val searchArray: Array[Result] = getTable(connection, tableName).get.get(getList)
searchArray.foreach(bk => {
val groupMap: Map[Long, Array[Cell]] = bk.rawCells().groupBy(_.getTimestamp)
for (key <- groupMap.keySet) {
val cellArray: Array[Cell] = groupMap.apply(key)
val obj = new JSONObject
obj.put("timestamp", key)
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
obj.put(cellName, cellValue)
})
arrayBuffer += obj
}
})
} catch {
case e: Exception =>
logger.error(s"HBase getMultiVersionData error:$e")
}
arrayBuffer
}
def scanOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
override def run(): ArrayBuffer[JSONObject] = scanHBaseOneVersionData(tableName, scan)
})
} else {
scanHBaseOneVersionData(tableName, scan)
}
}
def scanHBaseOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
try {
val connection: Connection = getConnection
val resultScanner: ResultScanner = getTable(connection, tableName).get.getScanner(scan)
val it: util.Iterator[Result] = resultScanner.iterator()
while (it.hasNext) {
val bk: Result = it.next()
val cellArray: Array[Cell] = bk.rawCells()
val obj = new JSONObject
obj.put("rowKey", Bytes.toString(bk.getRow))
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.cloneValue(cell))
obj.put(cellName, cellValue)
})
arrayBuffer += obj
}
} catch {
case e: Exception =>
logger.error(s"HBase getMultiVersionData error:$e")
}
arrayBuffer
}
def putData(tableName: String, put: Put): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = putHBaseData(tableName, put)
})
} else {
putHBaseData(tableName, put)
}
}
def putHBaseData(tableName: String, put: Put): Boolean = {
var result = false
try {
val connection: Connection = getConnection
val table: Table = getTable(connection, tableName).get
table.put(put)
result = true
} catch {
case e: Exception =>
logger.error(s"HBase putData error:$e")
result = false
}
result
}
def putDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = putHBaseDataList(tableName, putList)
})
} else {
putHBaseDataList(tableName, putList)
}
}
def putHBaseDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
var result = false
try {
val connection: Connection = getConnection
val table: Table = getTable(connection, tableName).get
table.put(putList)
result = true
} catch {
case e: Exception =>
logger.error(s"HBase putDataList error:$e")
result = false
}
result
}
def delData(tableName: String, delete: Delete): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = delHBaseData(tableName, delete)
})
} else {
delHBaseData(tableName, delete)
}
}
def delHBaseData(tableName: String, delete: Delete): Boolean = {
var result = false
try {
val connection: Connection = getConnection
val table: Table = getTable(connection, tableName).get
table.delete(delete)
result = true
} catch {
case e: Exception =>
logger.error(s"HBase delData error:$e")
result = false
}
result
}
def buildScan(rowKeyPrefix: Array[Byte]): Scan = {
val scan = new Scan()
val prefixFilter = new PrefixFilter(rowKeyPrefix)
scan.setFilter(prefixFilter)
scan.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
scan
}
def buildGet(rowKey: Array[Byte], versions: Int): Get = {
val get = new Get(rowKey)
get.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
//setMaxVersions被弃用了 新版本用readVersions(versions)
//get.setMaxVersions(versions)
get.readVersions(versions)
get
}
def buildPut(rowKey: Array[Byte], gjValue: JSONObject): Put = {
import scala.collection.JavaConversions._
val put = new Put(rowKey)
for (key <- gjValue.keySet()) {
put.addColumn(HbaseConfig.FAMILY_NAME.getBytes, key.getBytes, String.valueOf(gjValue.get(key)).getBytes)
}
put
}
}
参考:https://blog.csdn.net/qq_45669302/article/details/122431873
更多推荐
已为社区贡献1条内容
所有评论(0)