PG数据库同步数据到ES数据库
思想 ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程间隔10s检查一次PG数据库是否有modify_time > 记录的最后修改时间。如果有,将最新的数据同步到ES数据库,并修改记录时间。如果没有,继续每隔10s检查一次。
·
思想
ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程间隔10s检查一次PG数据库是否有modify_time > 记录的最后修改时间
。如果有,将最新的数据同步到ES数据库,并修改记录时间。如果没有,继续每隔10s检查一次。
最终的实现效果:
程序实现的主要代码
def update_pg2es():
global last_modify_time
print(type(last_modify_time), last_modify_time)
print("检测PG数据库是否有数据更新...")
#连接数据库
pg_conn = psycopg2.connect(config.CONNECT_PG_DB_URL)
pg_cursor = pg_conn.cursor()
select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)
pg_cursor.execute(select_sql)
datas = pg_cursor.fetchall()
# 关闭连接
pg_conn.close
pg_cursor.close
es_data_list = []
for row in datas:
# 获取当前行数据的列与值得字典
column_values = get_row_colum_info(pg_cursor, row)
es_data = translate_entity_date_to_es(column_values)
es_data_list.append(es_data)
last_modify_time = column_values['modify_time']
index_entity = config.entity_index
type_entity = config.entity_type
num = 10000 # 批量存入的个数
while (len(es_data_list) / num >= 0):
if (math.floor(len(es_data_list) / num) == 0): # 最后了
es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:]) # 取剩下的
print("存入{}个数据到ES".format(len(es_data_list)))
break
print("存入{}个数据到ES".format(num))
print("前五个数据:")
print(index_entity, type_entity, es_data_list[:5])
es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:num]) # 存储一定的数量
es_data_list = es_data_list[num:] # 取剩下的
if "ok" != es_result:
print("存储实体数据到es中失败")
break # 如果某一次存入有问题,直接退出
# 线程定时执行
t = threading.Timer(10, update_pg2es)
t.start()
if __name__ == '__main__':
update_pg2es()
注意:
select now()::timestamp(6)without time zone
SQL语句可以查询当前时间(不带时区,精度为秒后面保存6位)select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)
将sql语句赋值给select_sql,sql语句的意思为选择出修改时间大于记录时间的数据,并将数据按照修改时间进行排序。此处注意记录时间两边需要加单引号才能保证sql语句的正常运行。
更多推荐
已为社区贡献1条内容
所有评论(0)