概述

数仓选型对比

数据库 存储过程 性能 可扩展性 安全性 成本 支持度 数据一致性 数据压缩 数据备份和恢复 数据分析功能
PostgreSQL 支持 支持 支持 支持
Oracle 支持 支持 支持 支持
MySQL 支持 支持 支持 支持
MSSQL 支持 支持 支持 支持
Greenplum 支持 支持 支持
Starrocks 支持 支持 支持
GBase 支持 支持 支持 支持
Hive 不支持 支持 支持
Impala 不支持 支持 支持
GaussDB 支持 支持 支持 支持
  • 考虑数据规模、计算规模以及成本
  • 数据规模较小,计算能力要求不高,预算低,选型Postgresql主从
  • 数据规模较大,计算能力要求高,有一定预算,选型Greenplum

当前数仓架构问题

  • 无法保证数据一致性:没有严格的数据血缘依赖,导致数据在计算时可能出现不一致的情况
  • 不可见的任务调度:没有一站式界面管理调度
  • 无对外提供数据能力:数仓无法对外输出表
  • 无任务告警:任务报错对开发人员无感知
  • 长时间的锁等待:业务库直接查询物化视图,物化视图的刷新造成的长时间的锁等待
  • 重复工作较多:开发一套环境,生产一套环境,大量的重复开发工作
  • 大材小用:业务库Mysql,数仓Postgresql,使用kettle大材小用了
  • 任务调度不合理:配置调度任务需要java发版,不太合理;任务调度只能调度一条select语句,不合理;任务调度只能每15分钟跑一次,不合理
  • 任务调度不统一:宽表用java做任务调度,kettle用crontab做任务调度,其余还有的用pg_cron做任务调度
  • 数据安全性低:用户管理与权限控制不完善,敏感信息未做脱敏处理
  • 报表查询效率低:没有数仓分层,导致报表查询效率较低

解决方案

问题 解决方案 说明
无法保证数据一致性 改用海豚调度/等待与唤醒 海豚调度提供层级间的依赖,等待与唤醒提供表级别依赖
不可见的任务调度 改用海豚调度 海豚调度提供可视化的界面来管理任务
无对外提供数据能力 Magic-API Magic-API提供统一的接口平台,对外提供数据
无任务告警 改用海豚调度 海豚调度提供了钉钉、企业微信等告警接口
长时间的锁等待 弃用物化视图,改变etl方式 采用drop table和rename的方式做etl处理
重复工作较多 弃用开发环境,改用pg的用户模式映射 开发环境即在生产环境上,在生产环境中区分开发用户和集中用户,开发用户为个人开发(即开发环境),集中用户为集中跑批的用户(即生产环境)
大材小用 弃用kettle,改用外部数据包装器或datax 业务库只有Mysql,直接通过pg的插件mysql_fdw进行数据抽取;也可以通过海豚调度提供的datax组件进行数据抽取
调度不合理 改用海豚调度 海豚调度提供的sql组件可以执行sql脚本,海豚调度可以自定义调度时间、频率和并发度
任务调度不统一 改用海豚调度 海豚调度提供统一的任务调度平台
数据安全性低 严格控制用户的权限 用户模式一一对应,禁用public的create权限,回收函数的execute权限
报表查询效率低 数仓分层 ods、dw、dm、dim、app(ads)

架构设计

数据仓库设计

在这里插入图片描述

  • public:所有用户的根
  • dev:开发用户组
    • yuzhenchao和yuxiaotan:以开发人员的全拼命名,属于dev用户组,建立的所有的对象都在自己同名的模式下,对其他模式没有create权限,只有usage权限
  • pro:生产用户组
    • mdl:用来集中跑模型的用户,最终生成的模型在mdl模式下
    • apl:用来集中跑应用(报表)的用户,最终生成的结果表在apl模式下
    • jkfw:用于Magic-API连接数仓的用户,jkfw模式下存取magic-api的元数据
  • tool:工具用户,etl相关的工具存储过程将存放在tool模式下
  • readonly:只读用户组,在该组下面的用户拥有只读权限
    • finebi:用于finebi连接数仓的用户
    • dbselect:用于dblink和fdw连接数仓的用户

命名规范

对象类型 对象格式 例子
序列 seq_ seq_dblink_id
临时表 tmp_ tmp_jg624_rb1
ods层表 ods_原库名_原表名 ods_xkorder_order
dw层表 dwd_/dws_/dw_ dwd_order_stage/dws_order_stage/dw_order_stage
dm层表 dm_ dm_order_efficiency
dim层表 dim_ dim_dealer
app层表 接口的实时表:jk_需求号_real
接口的日表:jk_需求号_rb
接口的周表 :jk_需求号_zb
接口的月表:jk_需求号_yb
finebi的实时表:bi_需求号_real
finebi的日表:bi_需求号_rb
finebi的周表:bi_需求号_zb
finebi的月表:bi_需求号_yb
接口的实时表:jk_jg624_real
接口的日表:jk_jg624_rb
接口的周表 :jk_jg624_zb
接口的月表:jk_jg624_yb
finebi的实时表:bi_jg624_real
finebi的日表:bi_jg624_rb
finebi的周表:bi_jg624_zb
finebi的月表:bi_jg624_yb
主键约束 pk_表名_字段名 pk_order_order_no
索引 idx_表名_字段名 idx_order_order_id

模型设计

  • ods层(数据贴源层):用于存储从各个业务系统中提取的原始数据,保留数据的完整性和一致性,做一些简单的处理。比如,空字符串处理成null,is_delete字段统一处理成0和1
  • dwd层(数据明细层):用于存储经过清洗、加工、转换后的数据,保留数据的历史变化,为后续的数据分析和决策提供基础数据。比如,宽带表、移动表、itv表
  • dws层(数据汇总层):用于业务层面汇合的数据,提供更高效的数据查询和分析。比如,dwd层的宽带表、移动表、itv表就会在dws层合并成一个全业务表
  • dm层(数据集市层):存储高度聚合的数据。比如,按月汇总的应收表和实收表
  • dim层(公共维度层):用于存储与业务相关的维度信息,如时间、地域、产品等,为数据分析和决策提供维度支持。比如:代理商表,产品表等
  • app层(应用层):用于存储各种业务应用所需的数据,如报表、分析、可视化等,为业务应用提供数据支持

PostgreSQL的安装

Centos7.6安装postgresql15

【postgresql 数据库运维文档】

数据仓库的建立

创建数据库

create database etl;
\c etl postgres

创建用户组

create role dev;
create role pro;
create role readonly;

创建用户

create role yuzhenchao with login password '${YZC_PWD}' connection limit 20;
create role yuxiaotan with login password '${YXT_PWD}' connection limit 20;
create role mdl with login password '${MDL_PWD}' connection limit 250;
create role apl with login password '${APL_PWD}' connection limit 250;
create role tool with login password '${TOOL_PWD}' connection limit 20;
create role finebi with login password '${FINEBI_PWD}' connection limit 100;
create role dbselect with login password '${DBSELECT_PWD}' connection limit 100;
create role jkfw with login password '${JKFW_PWD}' connection limit 100;

用户加入到用户组

alter group dev add user yuzhenchao;
alter group dev add user yuxiaotan;
alter group pro add user mdl;
alter group pro add user apl;
alter group pro add user jkfw;
alter group readonly add user finebi;
alter group readonly add user dbselect;

创建模式

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
create schema ${USERNAME};

模式授权用户

--用户同名模式授权所有权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
grant create,usage on schema ${USERNAME} to ${USERNAME};

--公开模式的usage权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
grant usage on schema ${USERNAME} to public;

--任何用户都拥有public模式的所有权限
--出于安全,回收任何用户在public的create权限
revoke create on schema public from public;

收回函数的执行权限

/*
 * pg中函数默认公开execute权限
 * 通过pg的基于schema和基于role的默认权限实现
 */
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
--在schema为yuzhenchao上创建的任何函数,除定义者外,其他人调用需要显式授权
alter default privileges for role ${USERNAME} revoke execute on functions from public;
--由yuzhenchao用户创建的任何函数,除定义者外,其他人调用需要显式授权
alter default privileges in schema ${USERNAME} revoke execute on functions from public;

公开表的select权限

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
--在schema为yuzhenchao上创建的任何表默认公开select权限
alter default privileges in schema ${USERNAME} grant select on tables to public;
--由yuzhenchao用户创建的任何表默认公开select权限
alter default privileges for role ${USERNAME} grant select on tables to public;

动态sql函数

/*
 * 为了方便各用户的管理
 * 需要用定义者权限创建动态sql函数
 * 最终由tool用户集中管理
 */
create or replace function tool.sp_exec(
    vsql character varying
)
    returns void
    language plpgsql
    security definer
as $function$ 
/*
 * 作者 : v-yuzhenc
 * 功能 : 以集定义者权限执行sql
 * vsql : 需要执行的sql语句
 * */
begin
    execute vsql;
end;
$function$
;
alter function tool.sp_exec(varchar) owner to tool;
grant all on function tool.sp_exec(varchar) to tool;

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、jkfw
create or replace function ${USERNAME}.sp_exec(
    vsql character varying
)
    returns void
    language plpgsql
    security definer
as $function$ 
/*
 * 作者 : v-yuzhenc
 * 功能 : 以集定义者权限执行sql
 * vsql : 需要执行的sql语句
 * */
begin
    execute vsql;
end;
$function$
;
alter function ${USERNAME}.sp_exec(varchar) owner to ${USERNAME};
grant all on function ${USERNAME}.sp_exec(varchar) to ${USERNAME};
grant execute on function ${USERNAME}.sp_exec(varchar) to tool;

集中处理函数

create or replace function tool.sp_execsql(
     exec_sql character varying
    ,exec_user character varying
)
    returns void
    language plpgsql
    security definer
as $function$ 
/* 作者 : v-yuzhenc
 * 功能 : 集中处理程序,以某用户的权限执行某条sql语句
 * exec_sql : 需要执行的sql语句
 * exec_user : 需要以哪个用户的权限执行该sql语句
 * */
declare 
    p_user varchar := exec_user;
    o_search_path varchar;
begin
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO '||p_user||',public,oracle';
    case p_user 
        when 'yuzhenchao' then perform yuzhenchao.sp_exec(exec_sql);
        when 'yuxiaotan' then perform yuxiaotan.sp_exec(exec_sql);
        when 'mdl' then perform mdl.sp_exec(exec_sql);
        when 'apl' then perform apl.sp_exec(exec_sql);
        when 'tool' then perform tool.sp_exec(exec_sql);
        when 'jkfw' then perform jkfw.sp_exec(exec_sql);
        else raise exception '未配置该用户:%',p_user;
    end case;
    --恢复模式搜索路径
    execute 'SET search_path TO '||o_search_path;

    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;

alter function tool.sp_execsql(varchar, varchar) owner to tool;
grant all on function tool.sp_execsql(varchar, varchar) to tool;

fdw实现数据抽取

安装mysql_fdw

mysql_fdw的安装与使用

安装postgres_fdw

create extension postgres_fdw;

授权tool用户fdw的使用

grant all on foreign data wrapper mysql_fdw to tool;
grant all on foreign data WRAPPER postgres_fdw to tool;

创建连接信息表

\c etl tool
create table tool.dblink_connection_info (
	 connname varchar(63) not null
	,conntype varchar(63) null
	,hostname varchar(15) null
	,port varchar(15) null
	,dbname varchar(63) null
	,username varchar(63) null
	,userpwd varchar(63) null
	,fdw_server varchar(63) null
	,createtime timestamp null default current_timestamp
	,constraint dblink_connection_info_pkey primary key (connname)
);

comment on table tool.dblink_connection_info is '存放用于dblink的连接信息,不公开';
comment on column tool.dblink_connection_info.connname is '连接名字,自定义即可,唯一';
comment on column tool.dblink_connection_info.conntype is '数据源类型';
comment on column tool.dblink_connection_info.hostname is '数据库所在主机ip地址';
comment on column tool.dblink_connection_info.port is '数据库所在端口';
comment on column tool.dblink_connection_info.dbname is '数据库名字';
comment on column tool.dblink_connection_info.username is '用于连接的用户名';
comment on column tool.dblink_connection_info.userpwd is '用户名对应的密码';
comment on column tool.dblink_connection_info.fdw_server is '对应的fdw_server的名字';
comment on column tool.dblink_connection_info.createtime is '创建时间';

alter table tool.dblink_connection_info owner to tool;
revoke select on tool.dblink_connection_info from public;
grant select(connname,conntype,hostname,port,dbname,username,fdw_server ,createtime) on tool.dblink_connection_info to public;

创建序列

create sequence tool.seq_tmp_fdw_id
	increment by 1
	minvalue 1
	maxvalue 9999
	start 1
	cache 1
	cycle;

创建fdw_server和用户映射

create server ${MYSQL_SERVER_NAME} foreign data wrapper mysql_fdw options (host '${MYSQL_HOSTNAME}', port '${MYSQL_PORT}');
create user mapping for public server ${MYSQL_SERVER_NAME} options (username '${MYSQL_USERNAME}', password '${MYSQL_USERPWD}');

create server ${PG_SERVER_NAME} foreign data wrapper postgres_fdw options (host '${PG_HOSTNAME}', port '${PG_PORT}',dbname '${PG_DATABASE}');
create user mapping for public server ${PG_SERVER_NAME} options (user '${PG_USERNAME}', password '${PG_USERPWD}');

辅助函数

get_ddl

create or replace function tool.get_ddl(
     schematable character varying
    ,getmode character varying default 'table'::character varying
    ,newtablename character varying default null::character varying
)
    returns text
    language plpgsql
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定表名(区分大小写),返回当前表名的建表语句,备注语句
 * 		默认当前模式,其他模式请加 模式.表名
 * schematable : schemaname.tablename或者tablename
 * getmode : 默认table(获取表的建表语句)
 * 		view(获取视图的建视图语句)
 * 		viewtable(获取视图对应的建表语句)
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * */
declare 
	p_tablename varchar;
	p_schemaname varchar := user::varchar(64);
	p_newtablename varchar := newtablename;
	p_result text := null;
	p_array varchar[];
begin
	--校验getmode是否正确,不正确直接向外抛异常
	if getmode not in ('table','view','viewtable') then 
		raise exception '参数2必须为table、view或者viewtable!';
	end if;
	--如果传参为null直接抛出异常
	if schematable is null then 
		raise exception '表名或视图名不能为空!';
	end if;
	--含有多个点时,直接抛出异常
	--if instr(schematable,'.',1,2) <> 0 then 
	--	raise exception '表名或视图名输入不正确!';
	--end if;

	--解析schematable
	p_array := string_to_array(schematable,'.');
	if p_array[2] is null then
		p_tablename := p_array[1];
	else 
		p_tablename := trim(p_array[2]);
		p_schemaname := trim(p_array[1]);
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
	if getmode in ('table','viewtable') then
		if getmode = 'table' and not exists (select 1 from pg_tables where tablename = p_tablename and schemaname = p_schemaname) then 
			raise exception '%.%表不存在!',p_schemaname,p_tablename;
		elsif getmode = 'viewtable' and not exists (select 1 from pg_views where viewname = p_tablename and schemaname = p_schemaname) then
			raise exception '%.%视图不存在!',p_schemaname,p_tablename;
		end if;
		select 
			'drop table if exists "'||p_newtablename||'";'||
			chr(10)||'create table "'||p_newtablename||'" ('||
			chr(10)||
			string_agg(chr(9)||
				case when attnum = 1 then ' ' else ',' end||
				'"'||c.attname||'" '||  --字段名
				format_type(c.atttypid, c.atttypmod)||  --字段类型
				coalesce (' default '||substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid) for 128),'')||  --字段默认值
				case when c.attnotnull = true then ' not null' else ' null' end,chr(10) order by c.attnum
			)||
			--主键约束
			coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
			chr(10)||');'||
			--压缩信息
			--coalesce(' with ( '||chr(10)||chr(9)||' '||array_to_string(a.reloptions,chr(10)||chr(9)||',')||chr(10)||')','') ||
			--分布策略
			--case when e.policytype = 'r' then ' distributed replicated;' when e.policytype = 'p' then coalesce(' distributed by ('||
			--string_agg(case when array_position(string_to_array(array_to_string(e.distkey::int2[],','),',')::int[],c.attnum::int,1) <> 0 then '"'||c.attname||'"' end,',' order by string_to_array(array_to_string(e.distkey::int2[],','),',')::int[])||
			--');',' distributed randomly;') else ' distributed randomly;' end||
			--表备注(注释)
			coalesce(chr(10)||'comment on table "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||
			--字段备注(注释)
			coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
		into p_result
		from pg_class a 
		inner join pg_namespace b 
		on (a.relnamespace = b.oid)
		inner join pg_attribute c 
		on (a.oid = c.attrelid)
		left join pg_attrdef d 
		on (c.attrelid = d.adrelid and c.attnum = d.adnum)
		--left join gp_distribution_policy e 
		--on (a.oid = e.localoid)
		left join pg_description f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		left join (
		    select d.indrelid
		        ,string_agg('"'||c.attname||'"',',' order by c.attnum) prikey
		    from pg_class a, pg_namespace b, pg_attribute c, pg_index d 
		    where a.relnamespace = b.oid
		        and a.oid = c.attrelid
		        and a.oid = d.indrelid
		        and d.indisprimary = true
		        and c.attnum = any(d.indkey)
		        and a.relname = p_tablename
			    and b.nspname = p_schemaname  
			 group by d.indrelid
		) g 
		on (a.oid = g.indrelid)
		left join pg_description h 
		on (a.oid = h.objoid and h.objsubid = 0)
		where c.attnum > 0
			and not c.attisdropped
			and a.relname = p_tablename
			and b.nspname = p_schemaname
		group by b.nspname,a.relname,a.reloptions,h.description,g.prikey;
	else
		if getmode = 'view' and not exists (select 1 from pg_views where viewname = p_tablename and schemaname = p_schemaname) then
			raise exception '%.%视图不存在!',p_schemaname,p_tablename;
		end if;
		select 
			' CREATE OR REPLACE VIEW "'||p_newtablename||'" AS '||chr(10)||d.definition||
			--表备注(注释)
			coalesce(chr(10)||'comment on view "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||
			--字段备注(注释)
			coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
		into p_result
		from pg_class a 
		inner join pg_namespace b 
		on (a.relnamespace = b.oid)
		inner join pg_attribute c 
		on (a.oid = c.attrelid)
		left join pg_description f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		inner join pg_views d
		on (b.nspname = d.schemaname and a.relname = d.viewname)
		left join pg_description h 
		on (a.oid = h.objoid and h.objsubid = 0)
		where d.viewname = p_tablename
			and d.schemaname = p_schemaname
		group by a.relname,d.definition,h.description;
	end if;
	return p_result;
end;
$function$
;

grant execute on function tool.get_ddl(varchar, varchar, varchar) to public;

get_ddl_pg2mysql

create or replace function tool.get_ddl_pg2mysql(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying default null::character varying
)
    returns text
    language plpgsql
AS $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定本地pg数据库的表名、模式名,
 *        以mysql的语法返回指定模式下指定表的ddl语句
 * tablename : 指定pg的表名
 * schemaname : 指定pg的模式名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * */
declare 
	p_tablename varchar := tablename;
	p_schemaname varchar := schemaname;
	p_newtablename varchar := newtablename;
	p_result text := null;
    existbj int;
    v_sql varchar;
begin
	--如果传参为null直接抛出异常
	if p_tablename is null then 
		raise exception '表名或视图名不能为空!';
	end if;
    if p_schemaname is null then 
		raise exception '模式名不能为空!';
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
    
    --判断表或视图是否存在
    execute $v_sql$
        select 1 from pg_tables
        where tablename = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        union all 
        select 1 from pg_views
        where viewname = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        ;
    $v_sql$ into existbj; 
    
    if existbj is null then
        raise exception '表或视图不存在!';
    end if;
    v_sql := $v_sql$
        select 
			'drop table if exists `'||'$v_sql$||p_newtablename||$v_sql$'||'`;'||
			chr(10)||'create table `'||'$v_sql$||p_newtablename||$v_sql$'||'` ('||
			chr(10)||
			string_agg(chr(9)||
				case when c.attnum = 1 then ' ' else ',' end||
				'`'||c.attname||'` '||  --字段名
				case 
					when i.data_type = 'int' then 'int(11)'
			        when i.data_type = 'character varying' then case when regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+') is null or regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+')::int > 16341 then 'text' else replace (format_type(c.atttypid, c.atttypmod),'character varying','varchar') end
			        when i.data_type = 'character' then replace(format_type(c.atttypid, c.atttypmod),'character','char')
			        when i.data_type = 'date' then 'date'
			        when i.data_type = 'timestamp with time zone' then replace(replace (format_type(c.atttypid, c.atttypmod),' with time zone', ''),'timestamp','datetime')
			        when i.data_type = 'timestamp without time zone' then replace(replace (format_type(c.atttypid, c.atttypmod),' without time zone', ''),'timestamp','datetime')
			        when i.data_type = 'bigint' then 'bigint(20)'
			        when i.data_type = 'double precision' then 'double'
			        when i.data_type = 'smallint' then 'smallint(6)'
			        when i.data_type = 'text' then 'text'
			        when i.data_type = 'bytea' then 'blob'
			        when i.data_type = 'real' then 'float'
			        when i.data_type = 'numeric' then format_type(c.atttypid, c.atttypmod)
			        when i.data_type = 'time' then 'interval'
			        when i.data_type = 'json' then 'json'
			        else 'text'
			    end||  --字段类型
				case when c.attnotnull = true or ((d.typtype = 'd'::"char") AND d.typnotnull) then ' not null' else ' null' end||
				coalesce(' comment '''||replace(f.description,'''','''''')||'''','')
				,chr(10) order by c.attnum
				
			)||
			--主键约束
			coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
			chr(10)||')'||coalesce('comment '''||replace(h.description,'''','''''')||'''','')||
			';'
		from pg_class a 
		inner join pg_namespace b 
		on (a.relnamespace = b.oid)
		inner join pg_attribute c 
		on (a.oid = c.attrelid)
		left join pg_type d
		on (c.atttypid = d.oid)
		left join pg_description f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		left join (
		    select d.indrelid
		        ,string_agg('`'||c.attname||'`',',' order by c.attnum) prikey
		    from pg_class a, pg_namespace b, pg_attribute c, pg_index d 
		    where a.relnamespace = b.oid
		        and a.oid = c.attrelid
		        and a.oid = d.indrelid
		        and d.indisprimary = true
		        and c.attnum = any(d.indkey)
		        and a.relname = '$v_sql$||p_tablename||$v_sql$'
			    and b.nspname = '$v_sql$||p_schemaname||$v_sql$'  
			 group by d.indrelid
		) g 
		on (a.oid = g.indrelid)
		left join pg_description h 
		on (a.oid = h.objoid and h.objsubid = 0)
		left join information_schema.columns i
		on (a.relname = i.table_name and b.nspname = i.table_schema and c.attnum = i.ordinal_position)
		where c.attnum > 0
			and not c.attisdropped
			and a.relname = '$v_sql$||p_tablename||$v_sql$'
			and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
		group by b.nspname,a.relname,h.description,g.prikey;
    $v_sql$
    ;
    execute v_sql into p_result;
    
	return p_result;
end;
$function$
;
grant execute on function tool.get_ddl_pg2mysql(varchar, varchar, varchar) to public;

get_ddl_remote_mysql2pg

create or replace function tool.get_ddl_remote_mysql2pg(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying default null::character varying
    ,remote_connname character varying default '${CONNNAME}'::character varying
)
    returns text
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程mysql数据库的表名、库名和连接信息,
 *        以pg的语法返回指定库下指定表的ddl语句
 * tablename : 指定mysql的表名
 * schemaname : 指定mysql的库名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
 * */
declare 
	p_tablename varchar := tablename;
	p_schemaname varchar := schemaname;
	p_newtablename varchar := newtablename;
    p_remote_connname varchar := remote_connname;
	p_result text := null;
    tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
    tbname_1 varchar := 'tmp_fdw_tables_'||tmp_fdw_id;
    tbname_2 varchar := 'tmp_fdw_views_'||tmp_fdw_id;
    tbname_3 varchar := 'tmp_fdw_columns_'||tmp_fdw_id;
    existbj int;
    v_sql varchar;
    o_search_path varchar;  --模式搜索路径
begin
	--如果传参为null直接抛出异常
	if p_tablename is null then 
		raise exception '表名或视图名不能为空!';
	end if;
    if p_schemaname is null then 
		raise exception '模式名(库名)不能为空!';
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO tool,'||o_search_path;

    --创建外部表
    select $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
        --创建tables映射表
        create foreign table $v_sql$||tbname_1||$v_sql$(
             table_name varchar(64)
			,table_schema varchar(64)
			,table_comment text
        ) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'tables');
        --创建views映射表
        create foreign table $v_sql$||tbname_2||$v_sql$(
             table_name varchar(64)
			,table_schema varchar(64)
        ) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'views');
        --创建columns映射表
        create foreign table $v_sql$||tbname_3||$v_sql$(
             table_schema varchar(64)
			,table_name varchar(64)
			,column_name varchar(64)
			,ordinal_position int
			,is_nullable varchar(3)
			,data_type text
			,column_type text
			,column_comment text
			,column_key varchar(3)
        ) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'columns');
    $v_sql$
    into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname
    ;
    execute v_sql;
    
    --判断表或视图是否存在
    execute $v_sql$
        select 1 from "$v_sql$||tbname_1||$v_sql$"
        where table_name = '$v_sql$||p_tablename||$v_sql$'
            and table_schema = '$v_sql$||p_schemaname||$v_sql$'
        union all 
        select 1 from "$v_sql$||tbname_2||$v_sql$"
        where table_name = '$v_sql$||p_tablename||$v_sql$'
            and table_schema = '$v_sql$||p_schemaname||$v_sql$'
        ;
    $v_sql$ into existbj; 
    
    if existbj is null then
        raise exception '表或视图不存在!';
    end if;
    v_sql := $v_sql$
        with tmp_a as (
		    (select
		         '$v_sql$||p_newtablename||$v_sql$' as table_name
		        ,table_schema
		        ,coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(case when table_comment = '' then null else table_comment end,'''','''''')||''';','') as table_comment
		    from $v_sql$||tbname_1||$v_sql$ 
		    where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
		        and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
		    limit 1)
		    union all 
		    (select
		         '$v_sql$||p_newtablename||$v_sql$' as table_name
		        ,table_schema
		        ,null as table_comment
		    from $v_sql$||tbname_2||$v_sql$ 
		    where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
		        and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
		    limit 1)
		), tmp_b as (
		    select 
		         '$v_sql$||p_newtablename||$v_sql$' as table_name
		        ,table_schema
		        ,string_agg(chr(9)||
		            case when ordinal_position = 1 then ' ' else ',' end||
		            --字段名
		            '"'||lower(column_name)||'"'||' '||
		            --数据类型
		            case 
			            when data_type = 'int' then data_type
				        when data_type = 'varchar' then replace (column_type,'varchar(0)','varchar(1)')
				        when data_type = 'char' then replace(replace(column_type,'char','varchar'),'varchar(0)','varchar(1)')
				        when data_type = 'date' then 'date'
				        when data_type = 'datetime' then replace (column_type, data_type, 'timestamp')
				        when data_type = 'timestamp' then 'timestamp'
				        when data_type = 'bigint' then 'bigint'
				        when data_type = 'double' then 'double precision'
				        when data_type = 'smallint' then 'smallint'
				        when data_type = 'decimal' then replace (column_type,'unsigned zerofill','')
				        when data_type = 'longtext' then 'text'
				        when data_type = 'text' then 'text'
				        when data_type = 'tinyint' then 'int'
				        when data_type = 'longblob' then 'bytea'
				        when data_type = 'blob' then 'bytea'
				        when data_type = 'float' then 'real'
				        when data_type = 'tinytext' then 'text'
				        when data_type = 'mediumtext' then 'text'
				        when data_type = 'numeric' then 'numeric'
				        when data_type = 'time' then 'interval'
				        when data_type = 'json' then 'json'
				        else 'varchar'
				    end||' '||
				    --空约束
				    case 
				    	when is_nullable = 'NO' then 'not null'
				    	when is_nullable = 'NO' then 'null'
				    	else 'null'
				    end
				    ,chr(10) order by ordinal_position
		         )||
		         --主键约束
		         coalesce(chr(10)||chr(9)||',primary key ('||string_agg(case when column_key = 'PRI' then '"'||lower(column_name)||'"' end,',' order by ordinal_position)||')','') 
		        as column_info
		        --字段备注
		        ,coalesce(chr(10)||string_agg('comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||lower(column_name)||'" is '''||replace(case when column_comment = '' then null else column_comment end,'''','''''')||''';',chr(10) order by ordinal_position),'') column_comment
		    from $v_sql$||tbname_3||$v_sql$
		    where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
		        and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
		    group by 
		         table_schema
		    limit 1
		)
		select 
		    'drop table if exists "'||a.table_name||'";'||chr(10)||
			'create table "'||a.table_name||'" ('||chr(10)||
			b.column_info||chr(10)||
			');'||a.table_comment||b.column_comment
		from tmp_a a
		inner join tmp_b b 
		on (a.table_schema = b.table_schema and a.table_name = b.table_name)
		;
    $v_sql$
    ;
    execute v_sql into p_result;
    
    --删除临时外部表
    execute $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
    $v_sql$
    ;
    
    --恢复模式搜索路径
	execute 'SET search_path TO '||o_search_path;
	return p_result;
    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.get_ddl_remote_mysql2pg(varchar,varchar,varchar,varchar) to public;

get_ddl_remote_pg2pg

create or replace function tool.get_ddl_remote_pg2pg(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying default null::character varying
    ,remote_connname character varying default '${CONNNAME}'::character varying
)
    returns text
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程pg数据库的表名、模式名和连接信息,
 *        以pg的语法返回指定模式下指定表的ddl语句
 * tablename : 指定pg的表名
 * schemaname : 指定pg的模式名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
 * */
declare 
	p_tablename varchar := tablename;
	p_schemaname varchar := schemaname;
	p_newtablename varchar := newtablename;
    p_remote_connname varchar := remote_connname;
	p_result text := null;
    tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
    tbname_1 varchar := 'tmp_fdw_pg_class_'||tmp_fdw_id;
    tbname_2 varchar := 'tmp_fdw_pg_namespace_'||tmp_fdw_id;
    tbname_3 varchar := 'tmp_fdw_pg_attribute_'||tmp_fdw_id;
    tbname_5 varchar := 'tmp_fdw_pg_description_'||tmp_fdw_id;
    tbname_6 varchar := 'tmp_fdw_pg_index_'||tmp_fdw_id;
    tbname_8 varchar := 'tmp_fdw_pg_tables_'||tmp_fdw_id;
    tbname_9 varchar := 'tmp_fdw_pg_views_'||tmp_fdw_id;
    existbj int;
    v_sql varchar;
    o_search_path varchar;  --模式搜索路径
begin
	--如果传参为null直接抛出异常
	if p_tablename is null then 
		raise exception '表名或视图名不能为空!';
	end if;
    if p_schemaname is null then 
		raise exception '模式名不能为空!';
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO tool,'||o_search_path;

    --创建外部表
    select $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_5||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_6||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_8||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_9||$v_sql$;
        --创建pg_class映射表
        create foreign table $v_sql$||tbname_1||$v_sql$(
             "oid" oid not null
			,"relname" name not null
			,"relnamespace" oid not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_class');
        create foreign table $v_sql$||tbname_2||$v_sql$(
             "oid" oid not null
			,"nspname" name not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_namespace');
        create foreign table $v_sql$||tbname_3||$v_sql$(
             "attrelid" oid not null
			,"attname" name not null
			,"atttypid" oid not null
			,"attnum" smallint not null
			,"atttypmod" integer not null
			,"attnotnull" boolean not null
			,"attisdropped" boolean not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_attribute');
        create foreign table $v_sql$||tbname_5||$v_sql$(
             "objoid" oid not null
			,"classoid" oid not null
			,"objsubid" integer not null
			,"description" text not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_description');
        create foreign table $v_sql$||tbname_6||$v_sql$(
			 "indrelid" oid not null
			,"indisprimary" boolean not null
			,"indkey" int2vector not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_index');
        create foreign table $v_sql$||tbname_8||$v_sql$(
             "schemaname" name null
			,"tablename" name null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_tables');
        create foreign table $v_sql$||tbname_9||$v_sql$(
             "schemaname" name null
			,"viewname" name null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_views');
    $v_sql$
    into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname
    ;
    execute v_sql;
    
    --判断表或视图是否存在
    execute $v_sql$
        select 1 from "$v_sql$||tbname_8||$v_sql$"
        where tablename = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        union all 
        select 1 from "$v_sql$||tbname_9||$v_sql$"
        where viewname = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        ;
    $v_sql$ into existbj; 
    
    if existbj is null then
        raise exception '表或视图不存在!';
    end if;
    v_sql := $v_sql$
        select 
			'drop table if exists "'||'$v_sql$||p_newtablename||$v_sql$'||'";'||
			chr(10)||'create table "'||'$v_sql$||p_newtablename||$v_sql$'||'" ('||
			chr(10)||
			string_agg(chr(9)||
				case when c.attnum = 1 then ' ' else ',' end||
				'"'||c.attname||'" '||  --字段名
				format_type(c.atttypid, c.atttypmod)||  --字段类型
				case when c.attnotnull = true then ' not null' else ' null' end,chr(10) order by c.attnum
			)||
			--主键约束
			coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
			chr(10)||');'||
			--表备注(注释)
			coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(h.description,'''','''''')||''';','')||
			--字段备注(注释)
			coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
		from $v_sql$||tbname_1||$v_sql$ a 
		inner join $v_sql$||tbname_2||$v_sql$ b 
		on (a.relnamespace = b.oid)
		inner join $v_sql$||tbname_3||$v_sql$ c 
		on (a.oid = c.attrelid)
		left join $v_sql$||tbname_5||$v_sql$ f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		left join (
		    select d.indrelid
		        ,string_agg('"'||c.attname||'"',',' order by c.attnum) prikey
		    from $v_sql$||tbname_1||$v_sql$ a, $v_sql$||tbname_2||$v_sql$ b, $v_sql$||tbname_3||$v_sql$ c, $v_sql$||tbname_6||$v_sql$ d 
		    where a.relnamespace = b.oid
		        and a.oid = c.attrelid
		        and a.oid = d.indrelid
		        and d.indisprimary = true
		        and c.attnum = any(d.indkey)
		        and a.relname = '$v_sql$||p_tablename||$v_sql$'
			    and b.nspname = '$v_sql$||p_schemaname||$v_sql$'  
			 group by d.indrelid
		) g 
		on (a.oid = g.indrelid)
		left join $v_sql$||tbname_5||$v_sql$ h 
		on (a.oid = h.objoid and h.objsubid = 0)
		where c.attnum > 0
			and not c.attisdropped
			and a.relname = '$v_sql$||p_tablename||$v_sql$'
			and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
		group by b.nspname,a.relname,h.description,g.prikey;
    $v_sql$
    ;
    execute v_sql into p_result;
    
    --删除临时外部表
    execute $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_5||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_6||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_8||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_9||$v_sql$;
    $v_sql$
    ;
    
    --恢复模式搜索路径
    execute 'SET search_path TO '||o_search_path;
    return p_result;
    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.get_ddl_remote_pg2pg(varchar, varchar, varchar, varchar) to public;

sp_extract_mapping

create or replace function tool.sp_extract_mapping(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying
    ,remote_connname character varying default '${CONNNAME}'::character varying
)
    returns void
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程(远程连接)表的表名,模式名(mysql为库名),
 *        在tool用户下创建一个指定新表名的外部映射表,
 *        访问该映射表,相当于直接访问远程表。
 *        select * from tool.newtablename;
 * tablename : 指定远程表的表名
 * schemaname : 指定远程表的模式名(mysql为库名)
 * newtablename : 在tool用户下创建的外部表表名
 * remote_connname : 远程连接名,来自tool.dblink_connection_info.connname
 * */
declare 
    p_tablename varchar := tablename;
    p_schemaname varchar := schemaname;
    p_newtablename varchar := newtablename;
    p_remote_connname tool.dblink_connection_info.connname%type := remote_connname;
    v_datasource_type tool.dblink_connection_info.conntype%type;
    v_sql varchar;
    o_search_path varchar;  --模式搜索路径
begin 
    --判断表是否为空
	if p_tablename is null then 
	    raise exception '参数tablename不能为空!';
	end if;

    if p_schemaname is null then 
        raise exception '参数schemaname不能为空!';
    end if;
   
    if p_newtablename not like 'tmp\_%' then 
        raise exception '参数newtablename必须以tmp_开头!';
    end if;
   
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO tool,'||o_search_path;
    
    v_datasource_type := conntype from tool.dblink_connection_info where connname = p_remote_connname limit 1;
    case v_datasource_type 
        when 'pg' then v_sql := tool.get_ddl_remote_pg2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);
        when 'mysql' then v_sql := tool.get_ddl_remote_mysql2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);
        else v_sql := null;
    end case;
    
    if v_sql is null then 
        raise exception '不支持的数据源类型!目前只支持mysql和pg!';
    end if;
    
    --拼接外部表ddl
    select replace(replace(replace(replace(replace(regexp_replace(v_sql
		,' not null| null','','g'),
		'drop table ','drop foreign table '
		),
		'create table ','create foreign table '
		),
		'comment on ','--comment on '
		),
		',primary key ','--,primary key '
		),
		');',') server '||fdw_server||' options('||case conntype when 'pg' then 'schema_name ' when 'mysql' then 'dbname' end||''''||p_schemaname||''''||',table_name '||''''||p_tablename||''''||');'
		)
	into v_sql
	from tool.dblink_connection_info
	where connname = p_remote_connname;
    execute v_sql;
    --execute 'show search_path;' into o_search_path;
    --raise notice '%',v_sql;
    --raise notice '%',o_search_path;
    --恢复模式搜索路径
	execute 'SET search_path TO '||o_search_path;
    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.sp_extract_mapping(varchar, varchar, varchar, varchar) to public;

等待与唤醒

创建dblink插件

\c etl postgres
create extension dblink;

ETL任务通知表

\c etl tool
CREATE TABLE tool.etl_task_notice (
	table_name varchar(64) NULL, -- 表名
	schema_name varchar(64) NULL, -- 模式名
	update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP -- 表的更新时间
);
COMMENT ON TABLE tool.etl_task_notice IS 'etl任务通知,etl任务完成时,得到结果表后向该表中插入一条数据';
COMMENT ON COLUMN tool.etl_task_notice.table_name IS '表名';
COMMENT ON COLUMN tool.etl_task_notice.schema_name IS '模式名';
COMMENT ON COLUMN tool.etl_task_notice.update_time IS '表的更新时间';
ALTER TABLE tool.etl_task_notice OWNER TO tool;

创建序列

create sequence tool.seq_dblink_sessionid
	increment by 1
	minvalue 1
	maxvalue 9999
	start 1
	cache 1
	cycle;

辅助函数

is_return_result

create or replace function tool.is_return_result(
     select_statement character varying
    ,retrytimes integer default 60
)
    returns integer
    language plpgsql
    security definer
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:执行动态select语句,并且该执行过程是自治的,
 *      判断是否有结果返回,有则返回1,否则返回0
 * vsql:执行的动态sql
 * retrytimes:拿不到连接时拿连接重试次数,默认重试60次
 * */
declare
    p_select_statement varchar := select_statement;
    v_sql varchar; --动态sql
    p_retrytimes int := retrytimes;
    p_count int := 0;
    p_session_id varchar := nextval('seq_dblink_sessionid')::varchar;
    p_session_name varchar := 'dblink_'||p_session_id;
    p_result int := 0;
    dblink_conn varchar;
begin
	
    --尝试拿连接
    while true loop
    begin
	    --获取dblink连接
	    select 'host='||hostname||' port='||port||' dbname='||dbname||' user='||username||' password='||userpwd 
	    into dblink_conn
	    from tool.dblink_connection_info
	    where connname = '101.34.75.200-pg-etl';
	    perform dblink_connect(p_session_name,dblink_conn);
	    exit;
	    exception when others then 
	        if p_count >= p_retrytimes then 
	            exit;
	        end if;
	    p_count := p_count + 1;
	    --睡眠1s再拿连接
	    perform pg_sleep(1);
	    continue;
    end;
    end loop;
    v_sql :=  'select 1 from (' || p_select_statement || ') a limit 1';
    --执行动态sql语句
    select count(1) into p_result from dblink(p_session_name,v_sql) as (id int);
    --关闭dblink连接
    perform dblink_disconnect(p_session_name);
    return p_result;
    
    --报错时得先把连接关掉再把错误抛出来
    exception when others then 
	    begin
	        perform dblink_disconnect(p_session_name);
	        exception when others then
	            null;
	    end;
	    raise exception '%',SQLERRM;
end;
$function$
;
grant all on function tool.is_return_result(varchar, int4) to public;

get_15_interval_time

create or replace function tool.get_15_interval_time(
     v_time timestamp with time zone default current_timestamp
    ,time_type character varying default 'before'::character varying
)
    returns timestamp with time zone
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:获取某个时间所在分钟0-15,15-30,30-45,45-60的先后时间
 * v_time:指定时间
 * time_type:时间类型,时间段的前一段则BEFORE(默认),时间段的后一段则AFTER
 * */
declare
    p_v_time timestamptz := v_time;
    p_time_type varchar := upper(time_type);
    v_result timestamptz;
begin
	if p_time_type not in ('BEFORE','AFTER') then 
	    raise exception 'p_time_type必须是BEFORE或者AFTER!';
	end if;
    if p_time_type = 'BEFORE' then 
        v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||
		    case 
		        when to_char(p_v_time,'mi')::numeric-0 >= 0 and to_char(p_v_time,'mi')::numeric-0 < 15 then '00'
		        when to_char(p_v_time,'mi')::numeric-15 >= 0 and to_char(p_v_time,'mi')::numeric-15 < 15 then '15'
		        when to_char(p_v_time,'mi')::numeric-30 >= 0 and to_char(p_v_time,'mi')::numeric-30 < 15 then '30'
		        when to_char(p_v_time,'mi')::numeric-45 >= 0 and  to_char(p_v_time,'mi')::numeric-45 < 15 then '45'
		    end||'00','yyyymmddhh24miss');
    elsif p_time_type = 'AFTER' then 
        v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||
		    case 
		        when to_char(p_v_time,'mi')::numeric-0 >= 0 and to_char(p_v_time,'mi')::numeric-0 < 15 then '00'
		        when to_char(p_v_time,'mi')::numeric-15 >= 0 and to_char(p_v_time,'mi')::numeric-15 < 15 then '15'
		        when to_char(p_v_time,'mi')::numeric-30 >= 0 and to_char(p_v_time,'mi')::numeric-30 < 15 then '30'
		        when to_char(p_v_time,'mi')::numeric-45 >= 0 and  to_char(p_v_time,'mi')::numeric-45 < 15 then '45'
		    end||'00','yyyymmddhh24miss') + interval '15 minutes';
    end if; 
    return v_result;
end;
$function$
;
grant execute on function tool.get_15_interval_time(timestamptz, varchar) to public;

wait_table

create or replace function tool.wait_table(
     sql_statement character varying
    ,check_freq integer default 30
    ,check_time integer default 300
)
    returns integer
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:等待某个sql执行返回结果集,只能是select语句,等待成功返回1,等待失败返回0
 * sql_statement:sql语句
 * check_freq:检测频率,默认每30s检测一次
 * check_time:最多检测多少秒,默认5分钟
 * */
declare 
    p_sql_statement varchar := sql_statement;
    p_check_freq numeric := check_freq;
    p_check_time numeric := check_time;
    v_result int := 0;  --返回结果
    v_sql varchar(32767);  --动态sql
    v_id numeric;
    check_begin numeric := 0;
    v_hint varchar(400);
begin 
	--只能select语句
	if (trim(lower(p_sql_statement)) ~ '^select') = false then
	    raise exception 'sql_statement参数必须以select开头!';
	end if;
    
    --开始检测
    raise notice '----------开始检测----------';
    --动态sql
    v_sql :=  'select 1 from (' || p_sql_statement || ') a limit 1';
    raise notice '检测语句为:%',v_sql;
    loop
        v_hint := '当前检测时间为:'||to_char(clock_timestamp(),'yyyy-mm-dd hh24:mi:ss');
        raise notice '%',v_hint;
        begin
	        v_id := tool.is_return_result(v_sql);
	        --判断v_id是否为空
	        if v_id = 1 then
	            --等表标识置为1
	            v_result := 1;
	        else 
	            --当前检测没有检测通过,则初始时间后移
	            check_begin := check_begin + p_check_freq;
	        end if;
	        
            --若动态sql因为某表的不存在而产生异常,则不退出,继续下一次等表
            exception when others then
                check_begin := check_begin + p_check_freq;
	    end;
	    --退出循环的条件就是:等表超时或者等表标识为1
        exit when check_begin > p_check_time or v_result = 1; 
        perform pg_sleep(p_check_freq);
    end loop;
   
    if v_result = 1 then 
        raise notice '----------等表成功----------';
    else
        raise notice '----------等表超时----------';
        raise exception '----------等表超时----------';
    end if;
    return v_result;
end;
$function$
;
grant execute on function tool.wait_table(varchar, int4, int4) to public;

sp_wait

create or replace function tool.sp_wait(
     tablename character varying
    ,schemaname character varying
    ,waittype character varying default 'real'::character varying
    ,checkfreq integer default 20
    ,checktime integer default 600
)
    returns void
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:等待某张表被sp_notify存过唤醒。
 * tablename : 等待的表名
 * schemaname : 表名对应的模式名
 * waittype : 等待类型 year month day hour real
 *     year:当年被唤醒过
 *     month:当月被唤醒过
 *     day:当天被唤醒过
 *     hour:当前所在的小时被唤醒过
 *     real:伪实时,0-15,15-30,30-45,45-60,当前分钟所在的分钟范围被唤醒过
 * checkfreq:检测频率,默认每隔20s检测一次
 * checktime:最多检测多少秒,默认检测600s
 * */
declare 
    p_tablename varchar := lower(tablename);
    p_schemaname varchar := lower(schemaname);
    p_waittype varchar := lower(waittype);
    p_checkfreq int := checkfreq;
    p_checktime int := checktime;
    v_sql varchar;
begin 
	if p_tablename is null then 
	    raise exception '参数tablename不能为空!';
	end if;
    if p_schemaname is null then 
        raise exception '参数schemaname不能为空!';
    end if;
    if p_waittype not in ('year','month','day','hour','real') then 
        raise exception '参数waittype范围[''year'',''month'',''day'',''hour'',''real'']!';
    end if;
    if p_checkfreq >= 61 or p_checkfreq <= 0 then 
        raise exception '参数checkfreq范围 1 ~ 60 !';
    end if;
    if p_checktime >= 1801 or p_checktime <= 0 then 
        raise exception '参数checktime范围 1 ~ 1800 !';
    end if;
   
    v_sql := $v_sql$select 1 from tool.etl_task_notice
        where table_name = '$v_sql$||p_tablename||$v_sql$'
            and schema_name = '$v_sql$||p_schemaname||$v_sql$'
            and $v_sql$||
            case p_waittype
	            when 'year' then $v_sql$to_char(update_time,'yyyy') = to_char(current_timestamp,'yyyy')$v_sql$
	            when 'month' then $v_sql$to_char(update_time,'yyyymm') = to_char(current_timestamp,'yyyymm')$v_sql$
	            when 'hour' then $v_sql$to_char(update_time,'yyyymmddhh24') = to_char(current_timestamp,'yyyymmddhh24')$v_sql$
	            when 'real' then $v_sql$update_time >= tool.get_15_interval_time(current_timestamp,'BEFORE') and update_time <= tool.get_15_interval_time(current_timestamp,'AFTER')$v_sql$
	        end
    ;
    --raise notice '%',v_sql;
    perform tool.wait_table(v_sql,p_checkfreq,p_checktime);
end;
$function$
;
grant all on function tool.sp_wait(varchar, varchar, varchar, int4, int4) to public;

sp_notify

create or replace function tool.sp_notify(
     tablename character varying
    ,schemaname character varying
)
    returns void
    language plpgsql
    security definer
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:唤醒某个模式下的某个表,
 *       即通知该表已经etl完成
 * tablename:表名
 * schemaname:模式名
 * */
declare 
    p_tablename tool.etl_task_notice.table_name%type := lower(tablename);
    p_schemaname tool.etl_task_notice.schema_name%type := lower(schemaname);
begin 
    if p_tablename is null then 
        raise exception '参数tablename不能为空';
    end if;
   
    if p_schemaname is null then 
        raise exception '参数schemaname不能为空';
    end if;
   
    insert into tool.etl_task_notice(
         table_name 
        ,schema_name
    ) values (
         p_tablename
        ,p_schemaname
    );
end;
$function$
;
grant all on function tool.sp_notify(varchar, varchar) to public;

海豚调度

安装

Centos7.6集群部署海豚调度3.1.5

架构

在这里插入图片描述

搭建

对象 对象实例 说明
租户 root 操作系统执行用户
普通用户 yuzhenchao 姓名全拼
普通用户 yuxiaotan 姓名全拼
数据源 dbselect@223.242.39.75:5432/dp 海豚调度只有select权限的数据源
数据源 mdl@101.34.75.200:5432/etl etl数据库中模型开发的数据源
数据源 apl@101.34.75.200:5432/etl etl数据库中应用层开发的数据源
项目 海豚调度元数据建模 对海豚调度的元数据进行etl处理
项目 海豚调度元数据应用开发——yuzhenchao yuzhenchao的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度
项目 海豚调度元数据应用开发——yuxiaotan yuxiaotan的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度
告警实例 模型告警 模型不管成功还是失败,都需要进行告警
告警实例 应用告警-yuzhenchao 应用告警通知指定的人yuzhenchao,一般只告警失败的任务
告警实例 应用告警-yuxiaotan 应用告警通知指定的人yuxiaotan,一般只告警失败的任务
告警组 模型告警 直接与告警实例一一对应
告警组 应用告警-yuzhenchao 直接与告警实例一一对应
告警组 应用告警-yuxiaotan 直接与告警实例一一对应
环境管理 datax-exec datax的执行环境配置

创建租户root

在这里插入图片描述

创建普通用户

在这里插入图片描述
在这里插入图片描述

创建数据源

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建项目

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建告警实例(钉钉告警)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建告警组

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建datax环境

在这里插入图片描述

注:

  • 我这里选择的默认分组,所以需要在每台worker的机器上安装datax,并且让DATAX_HOME=/usr/local/datax
  • 数据库版本过高时,需要在对应的插件目录将旧版本的驱动删除,否则会连接失败

DataX / userGuid.md

授权管理

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

ETL过程

可回溯的etl过程——替代变量

变量名 变量值 变量说明
today $[yyyyMMdd] 今天
yesterday $[yyyyMMdd-1] 昨天
day2before $[yyyyMMdd-2] 2天前
day3before $[yyyyMMdd-3] 3天前
day4before $[yyyyMMdd-4] 4天前
day5before $[yyyyMMdd-5] 5天前
day6before $[yyyyMMdd-6] 6天前
day7before $[yyyyMMdd-7] 7天前

模型层工作流

ods层

  • ods层抽取数据,为了避免表产生死锁等待以及读脏数据,我们采用临时表方式进行数据抽取
  • 先创建临时表,将数据抽取到临时表,最终将临时表重命名为目标表
源库名 源表名 目标模式 目标表名 表说明
dp t_ds_project mdl ods_dp_t_ds_project 项目表
dp t_ds_process_definition mdl ods_dp_t_ds_process_definition 工作流定义表
dp t_ds_tenant mdl ods_dp_t_ds_tenant 租户表
dp t_ds_schedules mdl ods_dp_t_ds_schedules 调度表
dp t_ds_process_task_relation mdl ods_dp_t_ds_process_task_relation 工作流任务关系表
dp t_ds_task_definition mdl ods_dp_t_ds_task_definition 任务定义表
dp t_ds_process_instance mdl ods_dp_t_ds_process_instance 工作流实例表
dp t_ds_task_instance mdl ods_dp_t_ds_task_instance 任务实例表
dp t_ds_relation_process_instance mdl ods_dp_t_ds_relation_process_instance 存放的数据用于处理流程定义中含有子流程的情况
dp t_ds_session mdl ods_dp_t_ds_session 会话表
dp t_ds_user mdl ods_dp_t_ds_user 用户表
dp t_ds_datasource mdl ods_dp_t_ds_datasource 数据源表
dp t_ds_access_token mdl ods_dp_t_ds_access_token 访问令牌表
dp t_ds_relation_datasource_user mdl ods_dp_t_ds_relation_datasource_user 用户数据源关系表
dp t_ds_queue mdl ods_dp_t_ds_queue 队列表
fdw实现抽数(不建议)
  • 先选择项目
    在这里插入图片描述
  • 选择工作流定义
    在这里插入图片描述
  • 选择创建工作流
    在这里插入图片描述
  • 拖拉拽sql图标
    在这里插入图片描述
  • 在开发用户下开发ods的脚本,然后在海豚调度上配置任务,我们以表 t_ds_project举例
--通过该函数获取建表语句
--select tool.get_ddl_remote_pg2pg('t_ds_project','public','tmp_ods_dp_t_ds_project');
drop table if exists "tmp_ods_dp_t_ds_project";
create table "tmp_ods_dp_t_ds_project" (
	 "id" integer not null
	,"name" character varying(100) null
	,"code" bigint not null
	,"description" character varying(255) null
	,"user_id" integer null
	,"flag" integer null
	,"create_time" timestamp without time zone null
	,"update_time" timestamp without time zone null
	,primary key ("id")
);
comment on table "tmp_ods_dp_t_ds_project" is '项目表';
comment on column "tmp_ods_dp_t_ds_project"."id" is '项目id';
comment on column "tmp_ods_dp_t_ds_project"."name" is '项目名称';
comment on column "tmp_ods_dp_t_ds_project"."code" is '项目编码';
comment on column "tmp_ods_dp_t_ds_project"."description" is '项目描述';
comment on column "tmp_ods_dp_t_ds_project"."user_id" is '用户id,对应t_ds_user.id';
comment on column "tmp_ods_dp_t_ds_project"."create_time" is '项目创建时间';
comment on column "tmp_ods_dp_t_ds_project"."update_time" is '项目最近更新时间';
--创建映射表
do $$
begin
    perform tool.sp_extract_mapping('t_ds_project','public','tmp_ods_dp_t_ds_project');
end$$;

--抽取数据
insert into tmp_ods_dp_t_ds_project
select * from tool.tmp_ods_dp_t_ds_project
;

do $$
begin
	--增加last_pg_time时间字段
	alter table tmp_ods_dp_t_ds_project add last_pg_time timestamp default current_timestamp;
    --空字符串处理成null
    perform tool.replace_to_null('tmp_ods_dp_t_ds_project');
end$$;

do $$
begin
    --如果ods表存在就删除
	drop table if exists ods_dp_t_ds_project;
    --表重命名
	alter table tmp_ods_dp_t_ds_project rename to ods_dp_t_ds_project;
end$$;

do $$
begin
    --ods_dp_t_ds_project
    perform tool.sp_notify('ods_dp_t_ds_project','mdl');
    --表分析
    analyze ods_dp_t_ds_project;
end$$;

在这里插入图片描述

  • 点击保存
    在这里插入图片描述
    在这里插入图片描述
  • 点击上线然后运行工作流
    在这里插入图片描述
    在这里插入图片描述
  • 查看日志
    在这里插入图片描述
  • 其他同理,连线可以控制并发度
    在这里插入图片描述
datax实现抽数(建议)
  • 创建工作流,拖拉拽sql组件和datax组件
    在这里插入图片描述
  • 编写脚本,任务配置
  • 前置sql组件
drop table if exists "tmp_ods_dp_t_ds_project";
create table "tmp_ods_dp_t_ds_project" (
	 "id" integer not null
	,"name" character varying(100) null
	,"code" bigint not null
	,"description" character varying(255) null
	,"user_id" integer null
	,"flag" integer null
	,"create_time" timestamp without time zone null
	,"update_time" timestamp without time zone null
	,primary key ("id")
);
comment on table "tmp_ods_dp_t_ds_project" is '项目表';
comment on column "tmp_ods_dp_t_ds_project"."id" is '项目id';
comment on column "tmp_ods_dp_t_ds_project"."name" is '项目名称';
comment on column "tmp_ods_dp_t_ds_project"."code" is '项目编码';
comment on column "tmp_ods_dp_t_ds_project"."description" is '项目描述';
comment on column "tmp_ods_dp_t_ds_project"."user_id" is '用户id,对应t_ds_user.id';
comment on column "tmp_ods_dp_t_ds_project"."create_time" is '项目创建时间';
comment on column "tmp_ods_dp_t_ds_project"."update_time" is '项目最近更新时间';

在这里插入图片描述

  • datax的select语句
--数据抽取select语句
select 
     "id"
	,"name"
	,"code"
	,"description"
	,"user_id"
	,"flag"
	,"create_time"
	,"update_time"
from t_ds_project
;

--后置操作
do $$
begin
	--增加last_pg_time时间字段
	alter table tmp_ods_dp_t_ds_project add last_pg_time timestamp default current_timestamp;
    --空字符串处理成null
    perform tool.replace_to_null('tmp_ods_dp_t_ds_project');
end$$;
do $$
begin
    --如果ods表存在就删除
	drop table if exists ods_dp_t_ds_project;
    --表重命名
	alter table tmp_ods_dp_t_ds_project rename to ods_dp_t_ds_project;
end$$;

do $$
begin
    --ods_dp_t_ds_project
    perform tool.sp_notify('ods_dp_t_ds_project','mdl');
    
    --表分析
    analyze ods_dp_t_ds_project;
end$$;

在这里插入图片描述

  • 其他同理

在这里插入图片描述

  • 效率对比
    在这里插入图片描述

dw层——sql脚本

dim层——sql脚本

dm层——sql脚本

总控

应用层工作流

Magic-API统一接口平台

其他函数

array_position

create or replace function tool.array_position(
     arrayint integer[]
    ,elementint integer
    ,times integer default 1
)
    returns integer
    language plpgsql
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 返回数组指定元素所在的位置,未匹配到返回0
 * arrayint : 数组
 * element : 指定元素
 * times  : 第几次出现的位置
 * */
declare 
	p_times int := 0;
	p_result int := 0;
begin
	if array_length(arrayint,1) is null then 
		return p_result;
	end if;
	for i in 1..array_length(arrayint,1) loop  
		if arrayint[i] = elementint then 
			p_times := p_times + 1;
		end if;
		if p_times = times then
			return i;
		end if;
	end loop;
	return p_result;
end;
$function$
;
grant execute on function tool.array_position(_int4, int4, int4) to public;

replace_to_null

create or replace function tool.replace_to_null(
     tablename character varying
    ,schemaname character varying default ("current_user"())::character varying(64)
    ,execuser varchar default current_user::varchar
)
    returns void
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能:扫描指定表的所有varchar和text类型的字段,将字段值为''替换成null
 * tablename : 需要扫描的表名
 * schemaname : 需要扫描的模式名
 * */
declare 
	p_tablename varchar := lower(tablename);
	p_schemaname varchar := lower(schemaname);
	p_execuser varchar(64) := execuser;--调用者
	existbj int := 0;  --存在标记
	v_sql varchar;  --动态sql
begin
	if p_schemaname <> p_execuser then 
	    raise exception '你只有权限操作自己模式下的表!';
	end if;
	--扫描varchar和text字段
	select count(1)
	into existbj
	from pg_class a
	inner join pg_namespace b
	on (a.relnamespace = b.oid)
	inner join pg_attribute c
	on (a.oid = c.attrelid)
	inner join pg_type d
	on (c.atttypid = d.oid)
	where c.attnum > 0
		and d.typname in ('varchar','text')
		and a.relname = p_tablename
		and b.nspname = p_schemaname;
	--若不存在varchar或者text字段,则不做处理
	if existbj = 0 then
		raise notice '%.%表不存在或者不需要处理空字符串!',p_schemaname,p_tablename;
		return;
	end if;
	--拼接处理空字符串语句
	select 
		string_agg('update '||p_tablename||' 
	set '||c.attname||' = null where '||c.attname||' = '''';',chr(10))
	into v_sql
	from pg_class a
	inner join pg_namespace b
	on (a.relnamespace = b.oid)
	inner join pg_attribute c
	on (a.oid = c.attrelid)
	inner join pg_type d
	on (c.atttypid = d.oid)
	where c.attnum > 0
		and d.typname in ('varchar','text')
		and a.relname = p_tablename
		and b.nspname = p_schemaname;
	--通过集中处理程序执行动态sql
	perform tool.sp_execsql(v_sql,p_schemaname);
end;
$function$
;
grant execute on function tool.replace_to_null(varchar, varchar,varchar) to public;

sp_jzdb

create or replace function tool.sp_jzdb(
     tablename character varying
    ,oldschema character varying
    ,newschema character varying
    ,tablename_new character varying default null::character varying
    ,execuser varchar default current_user::varchar
)
    returns void
    language plpgsql
    security definer
as $function$ 
/* 
 * 作者 : v-yuzhenc
 * 功能:集中导表,将指定模式下的表以新的表名导入到新的模式下
 * tablename : 指定模式下的表,不区分大小写
 * oldschema : 指定模式,不区分大小写
 * newschema : 新的模式,不区分大小写
 * tablename_new : 新的表名,不区分大小写,默认与旧表名相同
 * execuser : 调用者用户名,无需传参,默认值即可
 * */
declare 
	p_tablename varchar(64) := lower(tablename);
	p_tablename_new varchar(64) := coalesce (lower(tablename_new),p_tablename);
	p_oldschema varchar(64) := lower(oldschema);
	p_newschema varchar(64) := lower(newschema);
    p_execuser varchar(64) := execuser;  --调用者用户名,无需人工传参
    jzdb_tname varchar(64) := 'jzdb_'||p_tablename_new;
    existbj numeric;
    v_sql varchar;
    bak_tname varchar;
begin
	
	--调用者可以将自己的表导入到其他模式
	--也可以将其他模式的表导入到自己模式
	if p_execuser <> p_oldschema and p_execuser <> p_newschema then 
	    raise exception 'oldschema和newschema参数之一必须与当前用户名一致,因为只允许操作与自己相关的表';
	end if;
	--建表
    v_sql := tool.get_ddl(p_oldschema||'.'||p_tablename,'table',jzdb_tname);
    perform tool.sp_execsql(v_sql,p_newschema);

	--插入数据
	perform tool.sp_execsql('insert into "'||jzdb_tname||'" select * from "'||p_oldschema||'"."'||p_tablename||'";',p_newschema);
	
	--判断新模式下的新表是否有重名的
	select count(1)
	into existbj
	from pg_tables a 
	where a.tablename = p_tablename_new
		and a.schemaname = p_newschema;
	if existbj <> 0 then 
	    --如果有重名的表存在
	    --则做备份,加前缀 o_模式名_表名_版本号
	    select 
	        count(1)+1 v_no  --版本号
	    into existbj
	    from pg_tables a
	    where substr(a.tablename,4+length(p_newschema),length(p_tablename_new)) = p_tablename_new
	        and a.schemaname = 'tool'
	    ;
	    --新表名建在tool下,加前缀 o_模式名_表名_版本号
	    bak_tname := 'o_'||p_newschema||'_'||p_tablename_new||'_'||existbj::varchar;
	    --建表
	    v_sql := tool.get_ddl(p_newschema||'.'||p_tablename_new,'table',bak_tname);
	    perform tool.sp_execsql(v_sql,'tool');
	    --插入数据
	    perform tool.sp_execsql('insert into "'||bak_tname||'" select * from "'||p_newschema||'"."'||p_tablename_new||'";','tool');
	    --分析表
	    perform tool.sp_execsql('analyze "'||bak_tname||'";','tool');
		perform tool.sp_execsql('drop table "'||p_tablename_new||'";',p_newschema);
	end if;

	--重命名表
	perform tool.sp_execsql('alter table "'||jzdb_tname||'" rename to '||p_tablename_new,p_newschema);

	--分析表
	perform tool.sp_execsql('analyze "'||p_tablename_new||'";',p_newschema);
    
end;
$function$
;
grant execute on function tool.sp_jzdb(varchar, varchar, varchar, varchar,varchar) to public;

sp_sqlexec_efficient

create or replace function tool.sp_sqlexec_efficient(
     sqlexec character varying
    ,exectimes integer default 1
)
    returns character varying
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:sql执行效率检测,事物不会提交,
 *       返回每次执行时间和平均执行时间
 * sqlexec:需要检测的sql
 * exectimes:需要检测的次数,范围1-10
 * */
declare 
    begin_time timestamp;
    end_time timestamp;
    exec_duration interval;
    v_result varchar := '';
    p_sqlexec varchar := sqlexec;
    p_exectimes int := exectimes;
    exec_begin int := 0;
    sum_sqlexec interval := '00:00:00.000000'::interval;
    sqlexec_avg interval;
begin
	if p_exectimes <= 0 or p_exectimes >= 11 then 
	    raise exception 'exectimes参数值范围为1~10';
	end if;
    while exec_begin >= 0 and exec_begin <= p_exectimes-1 loop
	    begin
		    begin_time := clock_timestamp();
		    execute p_sqlexec;
		    end_time = clock_timestamp();
		    raise exception '回滚' using errcode = '12345';
		    exception when sqlstate '12345' then 
		        null;
		end;
	    exec_duration = end_time-begin_time;
	    v_result := v_result||to_char(begin_time,'yyyy-mm-dd hh24:mi:ss.us')||'~'||to_char(end_time,'yyyy-mm-dd hh24:mi:ss.us')||':'||exec_duration::varchar||';'||chr(10);
	    sum_sqlexec := sum_sqlexec + exec_duration;
	    exec_begin := exec_begin + 1;
	end loop;
    sqlexec_avg := sum_sqlexec/p_exectimes;
    v_result := v_result||'avg:'||sqlexec_avg::varchar||';';
    return v_result;
end;
$function$
;
grant execute on function tool.sp_sqlexec_efficient(varchar, int4) to public;
Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐