一.安装
# 步骤1:安装依赖
pip install thrift
pip install hbase-thrift
。。。依赖

# 步骤二:将链接上的hbase文件放到site-packages替换原有的hbase文件
https://github.com/aliyun/aliyun-apsaradb-hbase-demo/tree/master/hbase/thrift2/python
二.python脚本操作阿里云增强版hbase
# -*- coding: utf-8 -*-
# @Time    : 2020/4/26 20:13
# @Author  :

from thrift.protocol import TBinaryProtocol
from thrift.transport import THttpClient
from hbase import THBaseService
from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, \
    TNamespaceDescriptor, TGet, TPut, TScan


class HbaseHelper(object):
    """hbase读写封装"""

    def __init__(self, url="http://ld-xxxxx-xxxx-xxxxxxs.xxx.com:9190",
                 accesskeyid="root", accesssignature="root"):

        self.transport = THttpClient.THttpClient(url)
        headers = {}
        # 用户名
        headers["ACCESSKEYID"] = accesskeyid
        # 密码
        headers["ACCESSSIGNATURE"] = accesssignature
        self.transport.setCustomHeaders(headers)
        self.protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
        self.client = THBaseService.Client(self.protocol)

    def single_insert(self, namespace, column_family, family, **kwargs):
        """
        单条插入
        :param namespace:
        :param column_family:
        :param family:
        :param kwargs: {"row_key":row_key,"data":{"k1":"v1","k2":"v2","k3":"v3"}}
        :return:
        """
        self.transport.open()
        tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
        row_key = kwargs.get("row_key", "")
        if not row_key:
            raise Exception("row_key not exit exception")
        data = kwargs.get("data", {})
        self.client.put(table=tableInbytes, tput=TPut(row="{}".format(row_key).encode("utf8"), columnValues=[
            TColumnValue(family="{}".format(family).encode("utf8"), qualifier="{}".format(k).encode("utf8"),
                         value="{}".format(v).encode("utf8")) for k, v in data.items()]))

        self.transport.close()
        return True

    def bulk_insert(self, namespace, column_family, family, kwargs_list):
        """
        批量插入
        :param namespace:
        :param column_family:
        :param family:
        :param row_key:
        :param kwargs_list:[{"row_key":row_key,"data":{"k1":"v1","k2":"v2","k3":"v3"}}]
        :return:
        """
        self.transport.open()
        tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
        puts = [TPut(row="{}".format(v.get("row_key")).encode("utf-8"), columnValues=[
            TColumnValue(family="{}".format(family).encode("utf-8") , qualifier="{}".format( k).encode("utf-8"),
                         value="{}".format( v2).encode("utf-8")) for k, v2 in v.get('data',{}).items()]) for v in kwargs_list]
        self.client.putMultiple(table=tableInbytes, tputs=puts)
        self.transport.close()

    def get_by_row_key(self, namespace, column_family, row_key):
        """
        :param namespace:
        :param column_family:
        :param row_key:
        :return:
        """
        self.transport.open()
        tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
        result = self.client.get(tableInbytes, TGet(row="{}".format(row_key).encode("utf-8")))
        self.transport.close()
        return result

    def scan(self, namespace, column_family, start_row_key, stop_row_key, caching=2):
        """

        :param namespace:
        :param column_family:
        :param start_row_key:
        :param stop_row_key:
        :param caching:
        # # caching的大小为每次从服务器返回的行数,设置太大会导致服务器处理过久,太小会导致范围扫描与服务器做过多交互
        # # 根据每行的大小,caching的值一般设置为10到100之间
        :return:
        """
        self.transport.open()
        tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
        startRow = "{}".format(start_row_key).encode("utf-8")
        stopRow = "{}".format(stop_row_key).encode("utf-8")
        scan = TScan(startRow=startRow, stopRow=stopRow)
        # # 扫描的结果
        results = []

        # # 此函数可以找到比当前row大的最小row,方法是在当前row后加入一个0x00的byte
        # # 从比当前row大的最小row开始scan,可以保证中间不会漏扫描数据
        def createClosestRowAfter(row):
            array = bytearray(row)
            array.append(0x00)
            return bytes(array)

        while True:
            lastResult = None
            # getScannerResults会自动完成open,close 等scanner操作,HBase增强版必须使用此方法进行范围扫描
            currentResults = self.client.getScannerResults(tableInbytes, scan, caching)
            for result in currentResults:
                results.append(result)
                lastResult = result
            # 如果一行都没有扫描出来,说明扫描已经结束,我们已经获得startRow和stopRow之间所有的result
            if lastResult is None:
                break
            # 如果此次扫描是有结果的,我们必须构造一个比当前最后一个result的行大的最小row,继续进行扫描,以便返回所有结果
            else:
                nextStartRow = createClosestRowAfter(lastResult.row)
                scan = TScan(startRow=nextStartRow, stopRow=stopRow)
        self.transport.close()
        return results
        
    def scan_by_prefix(self, namespace, column_family, prefix_row_key, caching=2):
        """
        rk前缀查询
        :param namespace:
        :param column_family:
        :param prefix_row_key: 前缀查询 例如一个rk:000190c1c5b4da6_b48083aaf2bf98801 通过前缀:000190c1c5b4da6找到相应的rk
        :param caching:
        # # caching的大小为每次从服务器返回的行数,设置太大会导致服务器处理过久,太小会导致范围扫描与服务器做过多交互
        # # 根据每行的大小,caching的值一般设置为10到100之间
        :return:
        """
        self.transport.open()
        tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
        startRow = "{}".format(prefix_row_key).encode("utf-8")

        scan = TScan(startRow=startRow)
        # # 扫描的结果
        results = []

        # # 此函数可以找到比当前row大的最小row,方法是在当前row后加入一个0x00的byte
        # # 从比当前row大的最小row开始scan,可以保证中间不会漏扫描数据
        def createClosestRowAfter(row):
            array = bytearray(row)
            array.append(0x00)
            return bytes(array)

        while True:
            lastResult = None
            # getScannerResults会自动完成open,close 等scanner操作,HBase增强版必须使用此方法进行范围扫描
            currentResults = self.client.getScannerResults(tableInbytes, scan, caching)
            if currentResults and prefix_row_key in currentResults[0].row.decode('utf-8'):
                for result in currentResults:
                    results.append(result)
                    lastResult = result
            else:
                break
            # 如果一行都没有扫描出来,说明扫描已经结束,我们已经获得startRow和stopRow之间所有的result
            if lastResult is None:
                break
            # 如果此次扫描是有结果的,我们必须构造一个比当前最后一个result的行大的最小row,继续进行扫描,以便返回所有结果
            else:
                nextStartRow = createClosestRowAfter(lastResult.row)
                scan = TScan(startRow=nextStartRow, )
        self.transport.close()
        return results


if __name__ == '__main__':
    hb = HbaseHelper()
    # 创建一个名为facebook的命名空间,理解创建一个Facebook数据库
    namespace = "faceboook"
    # 创建一个数据库为Facebook的数据库,大表为jkt_client,表的所有信息都属于小表facebook_main
    column_family = "jkt_client"
    family = "facebook_main"
    kwargs = {
        "row_key": "2",
        "data": {
            "type": 2,
            "page_id": "-Elías-PCB-161219993935151",
            "home_id": "161219993935151",
            "obj_name": "(<-) Elías - PCB",
            "img_url": "http://spider-silicon.3935151.jpg",
            "c_url": "https://www.facebook.com/-Elías-PCB-161219993935151/?ref=br_rs",
            "company_url": "http://www.eliaspcb.com"
        }
    }
    # 插入单条数据
    # hb.single_insert(namespace, column_family, family, **kwargs)

    kwargs_list = [
        {
            "row_key": "3",
            "data": {
                "type": 2,
                "page_id": "-Hair-Accessories--416385828419351",
                "home_id": "416385828419351",
                "obj_name": "~{ Hair Accessories }~",
                "img_url": "http://spider-ccessories--416385828419351.jpg",
                "c_url": "https://www.facebook.com/-Hair-Accessories--416385828419351/?ref=br_rs",
                "company_url": ""
            }
        },
        {
            "row_key": "4",
            "data": {
                "type": 2,
                "page_id": "-Joc-Lyn-Bicycle-Parts--386837101395047",
                "home_id": "386837101395047",
                "obj_name": "-Joc-Lyn-Bicycle-Parts",
                "img_url": "",
                "c_url": "https://www.facebook.com/-Joc-Lyn-Bicycle-Parts--386837101395047/?ref=br_rs",
                "company_url": ""
            }
        }
    ]
    # 批量插入数据
    # hb.bulk_insert(namespace, column_family, family, kwargs_list)

    # 通过行键获取数据
    # row_key = "4"
    # result = hb.get_by_row_key(namespace, column_family, row_key)
    # print(result)

    # 扫表
    result = hb.scan(namespace, column_family, start_row_key="2", stop_row_key="3",caching=2)
    print(result)


三.控制台查看结果

在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐