PythonSDK

安装

快速安装$ sudo pip install pydatahub

源码安装$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git

$ cd aliyun-datahub-sdk-python

$ sudo python setup.py install

常见问题

1.如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下

$ sudo apt-get install python-dev# for python2.x installs

$ sudo apt-get install python3-dev# for python3.x installs

$ sudo yum install python-devel# for python2.x installs

$ sudo yum install python34-devel# for python3.4 installs

2.如果使用windows操作系统,根据提示信息可到 此处 下载安装对应版本的 Visual C++ SDK。

Windows 10 安装cprotobuf依赖时如果报类似如下错误,也表示需要安装Visual C++ 生成工具:

bulding'cprotobuf.internal'extention

error:[WinError2]Thesystem cannot find the file specified

推荐使用python3.6或以上,会明确提示所需版本及链接信息。

3.Windows 下如果安装依赖时报类似如下错误,是环境问题所致,请搜索相关错误,根据具体情况,拷贝所需文件,或是直接使用 developer command prompt 工具进行安装:

LINK:fatal error LNK1158:cannot run'rc.exe'

4.Windows 7 如果提示如下错误,可安装此 build tools:

error:MicrosoftVisualC++14.0is required.Getit with"Microsoft Visual C++ Build Tools":https://visualstudio.microsoft.com/downloads/

安装验证$ python-c"from datahub import DataHub"

如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

基本概念

准备工作访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。

创建Project或使用SDK接口进行创建

初始化Datahub

importsys

importtraceback

fromdatahubimportDataHub

fromdatahub.exceptionsimportResourceExistException

fromdatahub.modelsimportFieldType,RecordSchema,TupleRecord,BlobRecord,CursorType,RecordType

access_id=***your access id***

access_key=***your access key***

endpoint=***your datahub server endpoint***

dh=DataHub(access_id,access_key,endpoint)

Project操作创建示例

project_name='project'

comment='comment'

try:

dh.create_project(project_name,comment)

print("create project success!")

print("=======================================\n\n")

exceptResourceExistException:

print("project already exist!")

print("=======================================\n\n")

exceptExceptionase:

print(traceback.format_exc())

sys.exit(-1)

Topic操作

Tuple TopicTuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:

类型

含义

值域

Bigint

8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。

-9223372036854775807 ~ 9223372036854775807

String

字符串,只支持UTF-8编码。

单个String列最长允许1MB。

Boolean

布尔型。

可以表示为True/False,true/false, 0/1

Double

8字节双精度浮点数。

-1.0 10308 ~ 1.0 10308

TimeStamp

时间戳类型

表示到微秒的时间戳类型

创建示例

topic_name="tuple_topic"

shard_count=3

life_cycle=7

record_schema=RecordSchema.from_lists(

['bigint_field','string_field','double_field','bool_field','time_field'],

[FieldType.BIGINT,FieldType.STRING,FieldType.DOUBLE,FieldType.BOOLEAN,FieldType.TIMESTAMP])

try:

dh.create_tuple_topic(project_name,topic_name,shard_count,life_cycle,record_schema,comment)

print("create tuple topic success!")

print("=======================================\n\n")

exceptResourceExistException:

print("topic already exist!")

print("=======================================\n\n")

exceptExceptionase:

print(traceback.format_exc())

sys.exit(-1)

Blob TopicBlob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。

topic_name="blob_topic"

shard_count=3

life_cycle=7

try:

dh.create_blob_topic(project_name,topic_name,shard_count,life_cycle,comment)

print("create blob topic success!")

print("=======================================\n\n")

exceptResourceExistException:

print("topic already exist!")

print("=======================================\n\n")

exceptExceptionase:

print(traceback.format_exc())

sys.exit(-1)

数据发布/订阅

获取Shard列表list_shards接口获取topic下的所有shard

shard_result=dh.list_shard(project_name,topic_name)

shards=shard_result.shards

print(len(shards))

返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

发布数据put_records接口向一个topic发布数据

put_result=dh.put_records(project_name,topic_name,records)

print(put_result.failed_record_count)

print(put_result.failed_records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为PutRecordsResult对象,包含failed_record_count和failed_records成员,failed_records是一个FailedRecord对象的list,FailedRecord对象包含成员index,error_code和error_message

写入Tuple类型Record示例

try:

# block等待所有shard状态ready

dh.wait_shards_ready(project_name,topic_name)

print("shards all ready!!!")

print("=======================================\n\n")

topic_result=dh.get_topic(project_name,topic_name)

print(topic_result)

iftopic_result.record_type!=RecordType.TUPLE:

print("topic type illegal!")

sys.exit(-1)

print("=======================================\n\n")

record_schema=topic_result.record_schema

records0=[]

record0=TupleRecord(schema=record_schema,values=[1,'yc1',10.01,True,1455869335000000])

record0.shard_id='0'

record0.put_attribute('AK','47')

records0.append(record0)

record1=TupleRecord(schema=record_schema)

record1.set_value('bigint_field',2)

record1.set_value('string_field','yc2')

record1.set_value('double_field',None)

record1.set_value('bool_field',False)

record1.set_value('time_field',1455869335000011)

record1.hash_key='4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'

records0.append(record1)

record2=TupleRecord(schema=record_schema)

record2.set_value(0,3)

record2.set_value(1,'yc3')

record2.set_value(2,1.1)

record2.set_value(3,False)

record2.set_value(4,1455869335000011)

record2.attributes={'key':'value'}

record2.partition_key='TestPartitionKey'

records0.append(record2)

put_result=dh.put_records(project_name,topic_name,records0)

print(put_result)

print("put tuple %d records, failed count: %d"%(len(records0),put_result.failed_record_count))

# failed_record_count如果大于0最好对failed record再进行重试

print("=======================================\n\n")

exceptDatahubExceptionase:

print(e)

sys.exit(-1)

写入BLOB类型Record示例

try:

records1=[]

record3=BlobRecord(blob_data='data')

record3.shard_id='0'

record3.put_attribute('a','b')

records1.append(record3)

put_result=dh.put_records(project_name,topic_name,records1)

print(put_result)

exceptDatahubExceptionase:

print(e)

sys.exit(-1)

获取cursor获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIMEOLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record

LATEST: 表示获取的cursor指向当前最新的record

SYSTEM_TIME: 表示获取的cursor指向大于等于该时间(单位毫秒)的第一条record

shard_id='0'

time_stamp=0

cursor_result0=dh.get_cursor(project_name,topic_name,shard_id,CursorType.OLDEST)

cursor_result1=dh.get_cursor(project_name,topic_name,shard_id,CursorType.LATEST)

cursor_result2=dh.get_cursor(project_name,topic_name,shard_id,CursorType.SYSTEM_TIME,time_stamp)

cursor=cursor_result0.cursor

通过get_cursor接口获取用于读取指定位置之后数据的cursor

订阅数据从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。

project_name='project'

shard_id="0"

limit=10

# 读取blob topic的record

topic_name='blob_topic'

get_result=dh.get_blob_records(project_name,topic_name,shard_id,cursor,limit)

# 读取tuple topic的record

topic_name='tuple_topic'

get_result=dh.get_tuple_records(project_name,topic_name,shard_id,record_schema,cursor,limit)

消费Tuple类型Record示例try:

# block等待所有shard状态ready

dh.wait_shards_ready(project_name,topic_name)

print("shards all ready!!!")

print("=======================================\n\n")

topic_result=dh.get_topic(project_name,topic_name)

print(topic_result)

iftopic_result.record_type!=RecordType.TUPLE:

print("topic type illegal!")

sys.exit(-1)

print("=======================================\n\n")

shard_id='0'

limit=10

cursor_result=dh.get_cursor(project_name,topic_name,shard_id,CursorType.OLDEST)

cursor=cursor_result.cursor

whileTrue:

get_result=dh.get_tuple_records(project_name,topic_name,shard_id,record_schema,cursor,limit)

forrecordinget_result.records:

print(record)

if0==get_result.record_count:

time.sleep(1)

cursor=get_result.next_cursor

exceptDatahubExceptionase:

print(e)

sys.exit(-1)

结尾

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐