思想

  ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程间隔10s检查一次PG数据库是否有modify_time > 记录的最后修改时间。如果有,将最新的数据同步到ES数据库,并修改记录时间。如果没有,继续每隔10s检查一次。

最终的实现效果:

程序实现的主要代码

def update_pg2es():
    global last_modify_time
    print(type(last_modify_time), last_modify_time)
    print("检测PG数据库是否有数据更新...")
    #连接数据库
    pg_conn = psycopg2.connect(config.CONNECT_PG_DB_URL)
    pg_cursor = pg_conn.cursor()

    select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)
    pg_cursor.execute(select_sql)
    datas = pg_cursor.fetchall()
    # 关闭连接
    pg_conn.close
    pg_cursor.close
    es_data_list = []
    for row in datas:
        # 获取当前行数据的列与值得字典
        column_values = get_row_colum_info(pg_cursor, row)
        es_data = translate_entity_date_to_es(column_values)
        es_data_list.append(es_data)
        last_modify_time = column_values['modify_time']

    index_entity = config.entity_index
    type_entity = config.entity_type
    num = 10000  # 批量存入的个数
    while (len(es_data_list) / num >= 0):
        if (math.floor(len(es_data_list) / num) == 0):  # 最后了
            es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:])  # 取剩下的
            print("存入{}个数据到ES".format(len(es_data_list)))
            break
        print("存入{}个数据到ES".format(num))
        print("前五个数据:")
        print(index_entity, type_entity, es_data_list[:5])
        es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:num])  # 存储一定的数量
        es_data_list = es_data_list[num:]  # 取剩下的

        if "ok" != es_result:
            print("存储实体数据到es中失败")
            break  # 如果某一次存入有问题,直接退出

    # 线程定时执行
    t = threading.Timer(10, update_pg2es)
    t.start()
    
    
if __name__ == '__main__':
	update_pg2es()

  注意:

  • select now()::timestamp(6)without time zone SQL语句可以查询当前时间(不带时区,精度为秒后面保存6位)
  • select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)将sql语句赋值给select_sql,sql语句的意思为选择出修改时间大于记录时间的数据,并将数据按照修改时间进行排序。此处注意记录时间两边需要加单引号才能保证sql语句的正常运行。

更多推荐