基于postgresql传统数据仓库搭建
传统数据仓库 postgresql greenplum 数仓分层 ods dw dm dim ads app dolphinScheduler datax magic-api
·
概述
数仓选型对比
数据库 | 存储过程 | 性能 | 可扩展性 | 安全性 | 成本 | 支持度 | 数据一致性 | 数据压缩 | 数据备份和恢复 | 数据分析功能 |
---|---|---|---|---|---|---|---|---|---|---|
PostgreSQL | 支持 | 高 | 高 | 高 | 低 | 高 | 高 | 支持 | 支持 | 支持 |
Oracle | 支持 | 高 | 高 | 高 | 高 | 高 | 高 | 支持 | 支持 | 支持 |
MySQL | 支持 | 中 | 中 | 中 | 低 | 中 | 中 | 支持 | 支持 | 支持 |
MSSQL | 支持 | 高 | 高 | 高 | 高 | 高 | 高 | 支持 | 支持 | 支持 |
Greenplum | 支持 | 高 | 高 | 高 | 高 | 高 | 高 | 支持 | 支持 | 高 |
Starrocks | 支持 | 高 | 高 | 高 | 中 | 中 | 高 | 支持 | 支持 | 高 |
GBase | 支持 | 高 | 高 | 高 | 中 | 中 | 高 | 支持 | 支持 | 支持 |
Hive | 不支持 | 低 | 高 | 中 | 低 | 中 | 中 | 支持 | 支持 | 高 |
Impala | 不支持 | 高 | 高 | 高 | 中 | 中 | 高 | 支持 | 支持 | 高 |
GaussDB | 支持 | 高 | 高 | 高 | 中 | 中 | 高 | 支持 | 支持 | 支持 |
- 考虑数据规模、计算规模以及成本
- 数据规模较小,计算能力要求不高,预算低,选型Postgresql主从
- 数据规模较大,计算能力要求高,有一定预算,选型Greenplum
当前数仓架构问题
- 无法保证数据一致性:没有严格的数据血缘依赖,导致数据在计算时可能出现不一致的情况
- 不可见的任务调度:没有一站式界面管理调度
- 无对外提供数据能力:数仓无法对外输出表
- 无任务告警:任务报错对开发人员无感知
- 长时间的锁等待:业务库直接查询物化视图,物化视图的刷新造成的长时间的锁等待
- 重复工作较多:开发一套环境,生产一套环境,大量的重复开发工作
- 大材小用:业务库Mysql,数仓Postgresql,使用kettle大材小用了
- 任务调度不合理:配置调度任务需要java发版,不太合理;任务调度只能调度一条select语句,不合理;任务调度只能每15分钟跑一次,不合理
- 任务调度不统一:宽表用java做任务调度,kettle用crontab做任务调度,其余还有的用pg_cron做任务调度
- 数据安全性低:用户管理与权限控制不完善,敏感信息未做脱敏处理
- 报表查询效率低:没有数仓分层,导致报表查询效率较低
解决方案
问题 | 解决方案 | 说明 |
---|---|---|
无法保证数据一致性 | 改用海豚调度/等待与唤醒 | 海豚调度提供层级间的依赖,等待与唤醒提供表级别依赖 |
不可见的任务调度 | 改用海豚调度 | 海豚调度提供可视化的界面来管理任务 |
无对外提供数据能力 | Magic-API | Magic-API提供统一的接口平台,对外提供数据 |
无任务告警 | 改用海豚调度 | 海豚调度提供了钉钉、企业微信等告警接口 |
长时间的锁等待 | 弃用物化视图,改变etl方式 | 采用drop table和rename的方式做etl处理 |
重复工作较多 | 弃用开发环境,改用pg的用户模式映射 | 开发环境即在生产环境上,在生产环境中区分开发用户和集中用户,开发用户为个人开发(即开发环境),集中用户为集中跑批的用户(即生产环境) |
大材小用 | 弃用kettle,改用外部数据包装器或datax | 业务库只有Mysql,直接通过pg的插件mysql_fdw进行数据抽取;也可以通过海豚调度提供的datax组件进行数据抽取 |
调度不合理 | 改用海豚调度 | 海豚调度提供的sql组件可以执行sql脚本,海豚调度可以自定义调度时间、频率和并发度 |
任务调度不统一 | 改用海豚调度 | 海豚调度提供统一的任务调度平台 |
数据安全性低 | 严格控制用户的权限 | 用户模式一一对应,禁用public的create权限,回收函数的execute权限 |
报表查询效率低 | 数仓分层 | ods、dw、dm、dim、app(ads) |
架构设计
数据仓库设计
- public:所有用户的根
- dev:开发用户组
- yuzhenchao和yuxiaotan:以开发人员的全拼命名,属于dev用户组,建立的所有的对象都在自己同名的模式下,对其他模式没有create权限,只有usage权限
- pro:生产用户组
- mdl:用来集中跑模型的用户,最终生成的模型在mdl模式下
- apl:用来集中跑应用(报表)的用户,最终生成的结果表在apl模式下
- jkfw:用于Magic-API连接数仓的用户,jkfw模式下存取magic-api的元数据
- tool:工具用户,etl相关的工具存储过程将存放在tool模式下
- readonly:只读用户组,在该组下面的用户拥有只读权限
- finebi:用于finebi连接数仓的用户
- dbselect:用于dblink和fdw连接数仓的用户
命名规范
对象类型 | 对象格式 | 例子 |
---|---|---|
序列 | seq_ | seq_dblink_id |
临时表 | tmp_ | tmp_jg624_rb1 |
ods层表 | ods_原库名_原表名 | ods_xkorder_order |
dw层表 | dwd_/dws_/dw_ | dwd_order_stage/dws_order_stage/dw_order_stage |
dm层表 | dm_ | dm_order_efficiency |
dim层表 | dim_ | dim_dealer |
app层表 | 接口的实时表:jk_需求号_real 接口的日表:jk_需求号_rb 接口的周表 :jk_需求号_zb 接口的月表:jk_需求号_yb finebi的实时表:bi_需求号_real finebi的日表:bi_需求号_rb finebi的周表:bi_需求号_zb finebi的月表:bi_需求号_yb | 接口的实时表:jk_jg624_real 接口的日表:jk_jg624_rb 接口的周表 :jk_jg624_zb 接口的月表:jk_jg624_yb finebi的实时表:bi_jg624_real finebi的日表:bi_jg624_rb finebi的周表:bi_jg624_zb finebi的月表:bi_jg624_yb |
主键约束 | pk_表名_字段名 | pk_order_order_no |
索引 | idx_表名_字段名 | idx_order_order_id |
模型设计
- ods层(数据贴源层):用于存储从各个业务系统中提取的原始数据,保留数据的完整性和一致性,做一些简单的处理。比如,空字符串处理成null,is_delete字段统一处理成0和1
- dwd层(数据明细层):用于存储经过清洗、加工、转换后的数据,保留数据的历史变化,为后续的数据分析和决策提供基础数据。比如,宽带表、移动表、itv表
- dws层(数据汇总层):用于业务层面汇合的数据,提供更高效的数据查询和分析。比如,dwd层的宽带表、移动表、itv表就会在dws层合并成一个全业务表
- dm层(数据集市层):存储高度聚合的数据。比如,按月汇总的应收表和实收表
- dim层(公共维度层):用于存储与业务相关的维度信息,如时间、地域、产品等,为数据分析和决策提供维度支持。比如:代理商表,产品表等
- app层(应用层):用于存储各种业务应用所需的数据,如报表、分析、可视化等,为业务应用提供数据支持
PostgreSQL的安装
数据仓库的建立
创建数据库
create database etl;
\c etl postgres
创建用户组
create role dev;
create role pro;
create role readonly;
创建用户
create role yuzhenchao with login password '${YZC_PWD}' connection limit 20;
create role yuxiaotan with login password '${YXT_PWD}' connection limit 20;
create role mdl with login password '${MDL_PWD}' connection limit 250;
create role apl with login password '${APL_PWD}' connection limit 250;
create role tool with login password '${TOOL_PWD}' connection limit 20;
create role finebi with login password '${FINEBI_PWD}' connection limit 100;
create role dbselect with login password '${DBSELECT_PWD}' connection limit 100;
create role jkfw with login password '${JKFW_PWD}' connection limit 100;
用户加入到用户组
alter group dev add user yuzhenchao;
alter group dev add user yuxiaotan;
alter group pro add user mdl;
alter group pro add user apl;
alter group pro add user jkfw;
alter group readonly add user finebi;
alter group readonly add user dbselect;
创建模式
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
create schema ${USERNAME};
模式授权用户
--用户同名模式授权所有权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
grant create,usage on schema ${USERNAME} to ${USERNAME};
--公开模式的usage权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
grant usage on schema ${USERNAME} to public;
--任何用户都拥有public模式的所有权限
--出于安全,回收任何用户在public的create权限
revoke create on schema public from public;
收回函数的执行权限
/*
* pg中函数默认公开execute权限
* 通过pg的基于schema和基于role的默认权限实现
*/
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
--在schema为yuzhenchao上创建的任何函数,除定义者外,其他人调用需要显式授权
alter default privileges for role ${USERNAME} revoke execute on functions from public;
--由yuzhenchao用户创建的任何函数,除定义者外,其他人调用需要显式授权
alter default privileges in schema ${USERNAME} revoke execute on functions from public;
公开表的select权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw
--在schema为yuzhenchao上创建的任何表默认公开select权限
alter default privileges in schema ${USERNAME} grant select on tables to public;
--由yuzhenchao用户创建的任何表默认公开select权限
alter default privileges for role ${USERNAME} grant select on tables to public;
动态sql函数
/*
* 为了方便各用户的管理
* 需要用定义者权限创建动态sql函数
* 最终由tool用户集中管理
*/
create or replace function tool.sp_exec(
vsql character varying
)
returns void
language plpgsql
security definer
as $function$
/*
* 作者 : v-yuzhenc
* 功能 : 以集定义者权限执行sql
* vsql : 需要执行的sql语句
* */
begin
execute vsql;
end;
$function$
;
alter function tool.sp_exec(varchar) owner to tool;
grant all on function tool.sp_exec(varchar) to tool;
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、jkfw
create or replace function ${USERNAME}.sp_exec(
vsql character varying
)
returns void
language plpgsql
security definer
as $function$
/*
* 作者 : v-yuzhenc
* 功能 : 以集定义者权限执行sql
* vsql : 需要执行的sql语句
* */
begin
execute vsql;
end;
$function$
;
alter function ${USERNAME}.sp_exec(varchar) owner to ${USERNAME};
grant all on function ${USERNAME}.sp_exec(varchar) to ${USERNAME};
grant execute on function ${USERNAME}.sp_exec(varchar) to tool;
集中处理函数
create or replace function tool.sp_execsql(
exec_sql character varying
,exec_user character varying
)
returns void
language plpgsql
security definer
as $function$
/* 作者 : v-yuzhenc
* 功能 : 集中处理程序,以某用户的权限执行某条sql语句
* exec_sql : 需要执行的sql语句
* exec_user : 需要以哪个用户的权限执行该sql语句
* */
declare
p_user varchar := exec_user;
o_search_path varchar;
begin
--记录原来的模式搜索路径
execute 'show search_path;' into o_search_path;
--临时切换模式搜索路径
execute 'SET search_path TO '||p_user||',public,oracle';
case p_user
when 'yuzhenchao' then perform yuzhenchao.sp_exec(exec_sql);
when 'yuxiaotan' then perform yuxiaotan.sp_exec(exec_sql);
when 'mdl' then perform mdl.sp_exec(exec_sql);
when 'apl' then perform apl.sp_exec(exec_sql);
when 'tool' then perform tool.sp_exec(exec_sql);
when 'jkfw' then perform jkfw.sp_exec(exec_sql);
else raise exception '未配置该用户:%',p_user;
end case;
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
exception when others then
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
raise exception '%',sqlerrm;
end;
$function$
;
alter function tool.sp_execsql(varchar, varchar) owner to tool;
grant all on function tool.sp_execsql(varchar, varchar) to tool;
fdw实现数据抽取
安装mysql_fdw
安装postgres_fdw
create extension postgres_fdw;
授权tool用户fdw的使用
grant all on foreign data wrapper mysql_fdw to tool;
grant all on foreign data WRAPPER postgres_fdw to tool;
创建连接信息表
\c etl tool
create table tool.dblink_connection_info (
connname varchar(63) not null
,conntype varchar(63) null
,hostname varchar(15) null
,port varchar(15) null
,dbname varchar(63) null
,username varchar(63) null
,userpwd varchar(63) null
,fdw_server varchar(63) null
,createtime timestamp null default current_timestamp
,constraint dblink_connection_info_pkey primary key (connname)
);
comment on table tool.dblink_connection_info is '存放用于dblink的连接信息,不公开';
comment on column tool.dblink_connection_info.connname is '连接名字,自定义即可,唯一';
comment on column tool.dblink_connection_info.conntype is '数据源类型';
comment on column tool.dblink_connection_info.hostname is '数据库所在主机ip地址';
comment on column tool.dblink_connection_info.port is '数据库所在端口';
comment on column tool.dblink_connection_info.dbname is '数据库名字';
comment on column tool.dblink_connection_info.username is '用于连接的用户名';
comment on column tool.dblink_connection_info.userpwd is '用户名对应的密码';
comment on column tool.dblink_connection_info.fdw_server is '对应的fdw_server的名字';
comment on column tool.dblink_connection_info.createtime is '创建时间';
alter table tool.dblink_connection_info owner to tool;
revoke select on tool.dblink_connection_info from public;
grant select(connname,conntype,hostname,port,dbname,username,fdw_server ,createtime) on tool.dblink_connection_info to public;
创建序列
create sequence tool.seq_tmp_fdw_id
increment by 1
minvalue 1
maxvalue 9999
start 1
cache 1
cycle;
创建fdw_server和用户映射
create server ${MYSQL_SERVER_NAME} foreign data wrapper mysql_fdw options (host '${MYSQL_HOSTNAME}', port '${MYSQL_PORT}');
create user mapping for public server ${MYSQL_SERVER_NAME} options (username '${MYSQL_USERNAME}', password '${MYSQL_USERPWD}');
create server ${PG_SERVER_NAME} foreign data wrapper postgres_fdw options (host '${PG_HOSTNAME}', port '${PG_PORT}',dbname '${PG_DATABASE}');
create user mapping for public server ${PG_SERVER_NAME} options (user '${PG_USERNAME}', password '${PG_USERPWD}');
辅助函数
get_ddl
create or replace function tool.get_ddl(
schematable character varying
,getmode character varying default 'table'::character varying
,newtablename character varying default null::character varying
)
returns text
language plpgsql
as $function$
/* 作者 : v-yuzhenc
* 功能 : 给定表名(区分大小写),返回当前表名的建表语句,备注语句
* 默认当前模式,其他模式请加 模式.表名
* schematable : schemaname.tablename或者tablename
* getmode : 默认table(获取表的建表语句)
* view(获取视图的建视图语句)
* viewtable(获取视图对应的建表语句)
* newtablename : 以指定新表名返回建表语句,默认与原表名相同
* */
declare
p_tablename varchar;
p_schemaname varchar := user::varchar(64);
p_newtablename varchar := newtablename;
p_result text := null;
p_array varchar[];
begin
--校验getmode是否正确,不正确直接向外抛异常
if getmode not in ('table','view','viewtable') then
raise exception '参数2必须为table、view或者viewtable!';
end if;
--如果传参为null直接抛出异常
if schematable is null then
raise exception '表名或视图名不能为空!';
end if;
--含有多个点时,直接抛出异常
--if instr(schematable,'.',1,2) <> 0 then
-- raise exception '表名或视图名输入不正确!';
--end if;
--解析schematable
p_array := string_to_array(schematable,'.');
if p_array[2] is null then
p_tablename := p_array[1];
else
p_tablename := trim(p_array[2]);
p_schemaname := trim(p_array[1]);
end if;
p_newtablename := coalesce (p_newtablename,lower(p_tablename));
if getmode in ('table','viewtable') then
if getmode = 'table' and not exists (select 1 from pg_tables where tablename = p_tablename and schemaname = p_schemaname) then
raise exception '%.%表不存在!',p_schemaname,p_tablename;
elsif getmode = 'viewtable' and not exists (select 1 from pg_views where viewname = p_tablename and schemaname = p_schemaname) then
raise exception '%.%视图不存在!',p_schemaname,p_tablename;
end if;
select
'drop table if exists "'||p_newtablename||'";'||
chr(10)||'create table "'||p_newtablename||'" ('||
chr(10)||
string_agg(chr(9)||
case when attnum = 1 then ' ' else ',' end||
'"'||c.attname||'" '|| --字段名
format_type(c.atttypid, c.atttypmod)|| --字段类型
coalesce (' default '||substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid) for 128),'')|| --字段默认值
case when c.attnotnull = true then ' not null' else ' null' end,chr(10) order by c.attnum
)||
--主键约束
coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
chr(10)||');'||
--压缩信息
--coalesce(' with ( '||chr(10)||chr(9)||' '||array_to_string(a.reloptions,chr(10)||chr(9)||',')||chr(10)||')','') ||
--分布策略
--case when e.policytype = 'r' then ' distributed replicated;' when e.policytype = 'p' then coalesce(' distributed by ('||
--string_agg(case when array_position(string_to_array(array_to_string(e.distkey::int2[],','),',')::int[],c.attnum::int,1) <> 0 then '"'||c.attname||'"' end,',' order by string_to_array(array_to_string(e.distkey::int2[],','),',')::int[])||
--');',' distributed randomly;') else ' distributed randomly;' end||
--表备注(注释)
coalesce(chr(10)||'comment on table "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||
--字段备注(注释)
coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
into p_result
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
left join pg_attrdef d
on (c.attrelid = d.adrelid and c.attnum = d.adnum)
--left join gp_distribution_policy e
--on (a.oid = e.localoid)
left join pg_description f
on (a.oid = f.objoid and c.attnum = f.objsubid)
left join (
select d.indrelid
,string_agg('"'||c.attname||'"',',' order by c.attnum) prikey
from pg_class a, pg_namespace b, pg_attribute c, pg_index d
where a.relnamespace = b.oid
and a.oid = c.attrelid
and a.oid = d.indrelid
and d.indisprimary = true
and c.attnum = any(d.indkey)
and a.relname = p_tablename
and b.nspname = p_schemaname
group by d.indrelid
) g
on (a.oid = g.indrelid)
left join pg_description h
on (a.oid = h.objoid and h.objsubid = 0)
where c.attnum > 0
and not c.attisdropped
and a.relname = p_tablename
and b.nspname = p_schemaname
group by b.nspname,a.relname,a.reloptions,h.description,g.prikey;
else
if getmode = 'view' and not exists (select 1 from pg_views where viewname = p_tablename and schemaname = p_schemaname) then
raise exception '%.%视图不存在!',p_schemaname,p_tablename;
end if;
select
' CREATE OR REPLACE VIEW "'||p_newtablename||'" AS '||chr(10)||d.definition||
--表备注(注释)
coalesce(chr(10)||'comment on view "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||
--字段备注(注释)
coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
into p_result
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
left join pg_description f
on (a.oid = f.objoid and c.attnum = f.objsubid)
inner join pg_views d
on (b.nspname = d.schemaname and a.relname = d.viewname)
left join pg_description h
on (a.oid = h.objoid and h.objsubid = 0)
where d.viewname = p_tablename
and d.schemaname = p_schemaname
group by a.relname,d.definition,h.description;
end if;
return p_result;
end;
$function$
;
grant execute on function tool.get_ddl(varchar, varchar, varchar) to public;
get_ddl_pg2mysql
create or replace function tool.get_ddl_pg2mysql(
tablename character varying
,schemaname character varying
,newtablename character varying default null::character varying
)
returns text
language plpgsql
AS $function$
/* 作者 : v-yuzhenc
* 功能 : 给定本地pg数据库的表名、模式名,
* 以mysql的语法返回指定模式下指定表的ddl语句
* tablename : 指定pg的表名
* schemaname : 指定pg的模式名
* newtablename : 以指定新表名返回建表语句,默认与原表名相同
* */
declare
p_tablename varchar := tablename;
p_schemaname varchar := schemaname;
p_newtablename varchar := newtablename;
p_result text := null;
existbj int;
v_sql varchar;
begin
--如果传参为null直接抛出异常
if p_tablename is null then
raise exception '表名或视图名不能为空!';
end if;
if p_schemaname is null then
raise exception '模式名不能为空!';
end if;
p_newtablename := coalesce (p_newtablename,lower(p_tablename));
--判断表或视图是否存在
execute $v_sql$
select 1 from pg_tables
where tablename = '$v_sql$||p_tablename||$v_sql$'
and schemaname = '$v_sql$||p_schemaname||$v_sql$'
union all
select 1 from pg_views
where viewname = '$v_sql$||p_tablename||$v_sql$'
and schemaname = '$v_sql$||p_schemaname||$v_sql$'
;
$v_sql$ into existbj;
if existbj is null then
raise exception '表或视图不存在!';
end if;
v_sql := $v_sql$
select
'drop table if exists `'||'$v_sql$||p_newtablename||$v_sql$'||'`;'||
chr(10)||'create table `'||'$v_sql$||p_newtablename||$v_sql$'||'` ('||
chr(10)||
string_agg(chr(9)||
case when c.attnum = 1 then ' ' else ',' end||
'`'||c.attname||'` '|| --字段名
case
when i.data_type = 'int' then 'int(11)'
when i.data_type = 'character varying' then case when regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+') is null or regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+')::int > 16341 then 'text' else replace (format_type(c.atttypid, c.atttypmod),'character varying','varchar') end
when i.data_type = 'character' then replace(format_type(c.atttypid, c.atttypmod),'character','char')
when i.data_type = 'date' then 'date'
when i.data_type = 'timestamp with time zone' then replace(replace (format_type(c.atttypid, c.atttypmod),' with time zone', ''),'timestamp','datetime')
when i.data_type = 'timestamp without time zone' then replace(replace (format_type(c.atttypid, c.atttypmod),' without time zone', ''),'timestamp','datetime')
when i.data_type = 'bigint' then 'bigint(20)'
when i.data_type = 'double precision' then 'double'
when i.data_type = 'smallint' then 'smallint(6)'
when i.data_type = 'text' then 'text'
when i.data_type = 'bytea' then 'blob'
when i.data_type = 'real' then 'float'
when i.data_type = 'numeric' then format_type(c.atttypid, c.atttypmod)
when i.data_type = 'time' then 'interval'
when i.data_type = 'json' then 'json'
else 'text'
end|| --字段类型
case when c.attnotnull = true or ((d.typtype = 'd'::"char") AND d.typnotnull) then ' not null' else ' null' end||
coalesce(' comment '''||replace(f.description,'''','''''')||'''','')
,chr(10) order by c.attnum
)||
--主键约束
coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
chr(10)||')'||coalesce('comment '''||replace(h.description,'''','''''')||'''','')||
';'
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
left join pg_type d
on (c.atttypid = d.oid)
left join pg_description f
on (a.oid = f.objoid and c.attnum = f.objsubid)
left join (
select d.indrelid
,string_agg('`'||c.attname||'`',',' order by c.attnum) prikey
from pg_class a, pg_namespace b, pg_attribute c, pg_index d
where a.relnamespace = b.oid
and a.oid = c.attrelid
and a.oid = d.indrelid
and d.indisprimary = true
and c.attnum = any(d.indkey)
and a.relname = '$v_sql$||p_tablename||$v_sql$'
and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
group by d.indrelid
) g
on (a.oid = g.indrelid)
left join pg_description h
on (a.oid = h.objoid and h.objsubid = 0)
left join information_schema.columns i
on (a.relname = i.table_name and b.nspname = i.table_schema and c.attnum = i.ordinal_position)
where c.attnum > 0
and not c.attisdropped
and a.relname = '$v_sql$||p_tablename||$v_sql$'
and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
group by b.nspname,a.relname,h.description,g.prikey;
$v_sql$
;
execute v_sql into p_result;
return p_result;
end;
$function$
;
grant execute on function tool.get_ddl_pg2mysql(varchar, varchar, varchar) to public;
get_ddl_remote_mysql2pg
create or replace function tool.get_ddl_remote_mysql2pg(
tablename character varying
,schemaname character varying
,newtablename character varying default null::character varying
,remote_connname character varying default '${CONNNAME}'::character varying
)
returns text
language plpgsql
security definer
as $function$
/* 作者 : v-yuzhenc
* 功能 : 给定远程mysql数据库的表名、库名和连接信息,
* 以pg的语法返回指定库下指定表的ddl语句
* tablename : 指定mysql的表名
* schemaname : 指定mysql的库名
* newtablename : 以指定新表名返回建表语句,默认与原表名相同
* remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
* */
declare
p_tablename varchar := tablename;
p_schemaname varchar := schemaname;
p_newtablename varchar := newtablename;
p_remote_connname varchar := remote_connname;
p_result text := null;
tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
tbname_1 varchar := 'tmp_fdw_tables_'||tmp_fdw_id;
tbname_2 varchar := 'tmp_fdw_views_'||tmp_fdw_id;
tbname_3 varchar := 'tmp_fdw_columns_'||tmp_fdw_id;
existbj int;
v_sql varchar;
o_search_path varchar; --模式搜索路径
begin
--如果传参为null直接抛出异常
if p_tablename is null then
raise exception '表名或视图名不能为空!';
end if;
if p_schemaname is null then
raise exception '模式名(库名)不能为空!';
end if;
p_newtablename := coalesce (p_newtablename,lower(p_tablename));
--记录原来的模式搜索路径
execute 'show search_path;' into o_search_path;
--临时切换模式搜索路径
execute 'SET search_path TO tool,'||o_search_path;
--创建外部表
select $v_sql$
--存在临时的外部表时,直接删除
drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
--创建tables映射表
create foreign table $v_sql$||tbname_1||$v_sql$(
table_name varchar(64)
,table_schema varchar(64)
,table_comment text
) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'tables');
--创建views映射表
create foreign table $v_sql$||tbname_2||$v_sql$(
table_name varchar(64)
,table_schema varchar(64)
) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'views');
--创建columns映射表
create foreign table $v_sql$||tbname_3||$v_sql$(
table_schema varchar(64)
,table_name varchar(64)
,column_name varchar(64)
,ordinal_position int
,is_nullable varchar(3)
,data_type text
,column_type text
,column_comment text
,column_key varchar(3)
) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'columns');
$v_sql$
into v_sql
from tool.dblink_connection_info
where connname = p_remote_connname
;
execute v_sql;
--判断表或视图是否存在
execute $v_sql$
select 1 from "$v_sql$||tbname_1||$v_sql$"
where table_name = '$v_sql$||p_tablename||$v_sql$'
and table_schema = '$v_sql$||p_schemaname||$v_sql$'
union all
select 1 from "$v_sql$||tbname_2||$v_sql$"
where table_name = '$v_sql$||p_tablename||$v_sql$'
and table_schema = '$v_sql$||p_schemaname||$v_sql$'
;
$v_sql$ into existbj;
if existbj is null then
raise exception '表或视图不存在!';
end if;
v_sql := $v_sql$
with tmp_a as (
(select
'$v_sql$||p_newtablename||$v_sql$' as table_name
,table_schema
,coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(case when table_comment = '' then null else table_comment end,'''','''''')||''';','') as table_comment
from $v_sql$||tbname_1||$v_sql$
where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
limit 1)
union all
(select
'$v_sql$||p_newtablename||$v_sql$' as table_name
,table_schema
,null as table_comment
from $v_sql$||tbname_2||$v_sql$
where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
limit 1)
), tmp_b as (
select
'$v_sql$||p_newtablename||$v_sql$' as table_name
,table_schema
,string_agg(chr(9)||
case when ordinal_position = 1 then ' ' else ',' end||
--字段名
'"'||lower(column_name)||'"'||' '||
--数据类型
case
when data_type = 'int' then data_type
when data_type = 'varchar' then replace (column_type,'varchar(0)','varchar(1)')
when data_type = 'char' then replace(replace(column_type,'char','varchar'),'varchar(0)','varchar(1)')
when data_type = 'date' then 'date'
when data_type = 'datetime' then replace (column_type, data_type, 'timestamp')
when data_type = 'timestamp' then 'timestamp'
when data_type = 'bigint' then 'bigint'
when data_type = 'double' then 'double precision'
when data_type = 'smallint' then 'smallint'
when data_type = 'decimal' then replace (column_type,'unsigned zerofill','')
when data_type = 'longtext' then 'text'
when data_type = 'text' then 'text'
when data_type = 'tinyint' then 'int'
when data_type = 'longblob' then 'bytea'
when data_type = 'blob' then 'bytea'
when data_type = 'float' then 'real'
when data_type = 'tinytext' then 'text'
when data_type = 'mediumtext' then 'text'
when data_type = 'numeric' then 'numeric'
when data_type = 'time' then 'interval'
when data_type = 'json' then 'json'
else 'varchar'
end||' '||
--空约束
case
when is_nullable = 'NO' then 'not null'
when is_nullable = 'NO' then 'null'
else 'null'
end
,chr(10) order by ordinal_position
)||
--主键约束
coalesce(chr(10)||chr(9)||',primary key ('||string_agg(case when column_key = 'PRI' then '"'||lower(column_name)||'"' end,',' order by ordinal_position)||')','')
as column_info
--字段备注
,coalesce(chr(10)||string_agg('comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||lower(column_name)||'" is '''||replace(case when column_comment = '' then null else column_comment end,'''','''''')||''';',chr(10) order by ordinal_position),'') column_comment
from $v_sql$||tbname_3||$v_sql$
where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
group by
table_schema
limit 1
)
select
'drop table if exists "'||a.table_name||'";'||chr(10)||
'create table "'||a.table_name||'" ('||chr(10)||
b.column_info||chr(10)||
');'||a.table_comment||b.column_comment
from tmp_a a
inner join tmp_b b
on (a.table_schema = b.table_schema and a.table_name = b.table_name)
;
$v_sql$
;
execute v_sql into p_result;
--删除临时外部表
execute $v_sql$
--存在临时的外部表时,直接删除
drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
$v_sql$
;
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
return p_result;
exception when others then
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.get_ddl_remote_mysql2pg(varchar,varchar,varchar,varchar) to public;
get_ddl_remote_pg2pg
create or replace function tool.get_ddl_remote_pg2pg(
tablename character varying
,schemaname character varying
,newtablename character varying default null::character varying
,remote_connname character varying default '${CONNNAME}'::character varying
)
returns text
language plpgsql
security definer
as $function$
/* 作者 : v-yuzhenc
* 功能 : 给定远程pg数据库的表名、模式名和连接信息,
* 以pg的语法返回指定模式下指定表的ddl语句
* tablename : 指定pg的表名
* schemaname : 指定pg的模式名
* newtablename : 以指定新表名返回建表语句,默认与原表名相同
* remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
* */
declare
p_tablename varchar := tablename;
p_schemaname varchar := schemaname;
p_newtablename varchar := newtablename;
p_remote_connname varchar := remote_connname;
p_result text := null;
tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
tbname_1 varchar := 'tmp_fdw_pg_class_'||tmp_fdw_id;
tbname_2 varchar := 'tmp_fdw_pg_namespace_'||tmp_fdw_id;
tbname_3 varchar := 'tmp_fdw_pg_attribute_'||tmp_fdw_id;
tbname_5 varchar := 'tmp_fdw_pg_description_'||tmp_fdw_id;
tbname_6 varchar := 'tmp_fdw_pg_index_'||tmp_fdw_id;
tbname_8 varchar := 'tmp_fdw_pg_tables_'||tmp_fdw_id;
tbname_9 varchar := 'tmp_fdw_pg_views_'||tmp_fdw_id;
existbj int;
v_sql varchar;
o_search_path varchar; --模式搜索路径
begin
--如果传参为null直接抛出异常
if p_tablename is null then
raise exception '表名或视图名不能为空!';
end if;
if p_schemaname is null then
raise exception '模式名不能为空!';
end if;
p_newtablename := coalesce (p_newtablename,lower(p_tablename));
--记录原来的模式搜索路径
execute 'show search_path;' into o_search_path;
--临时切换模式搜索路径
execute 'SET search_path TO tool,'||o_search_path;
--创建外部表
select $v_sql$
--存在临时的外部表时,直接删除
drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
drop foreign table if exists $v_sql$||tbname_5||$v_sql$;
drop foreign table if exists $v_sql$||tbname_6||$v_sql$;
drop foreign table if exists $v_sql$||tbname_8||$v_sql$;
drop foreign table if exists $v_sql$||tbname_9||$v_sql$;
--创建pg_class映射表
create foreign table $v_sql$||tbname_1||$v_sql$(
"oid" oid not null
,"relname" name not null
,"relnamespace" oid not null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_class');
create foreign table $v_sql$||tbname_2||$v_sql$(
"oid" oid not null
,"nspname" name not null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_namespace');
create foreign table $v_sql$||tbname_3||$v_sql$(
"attrelid" oid not null
,"attname" name not null
,"atttypid" oid not null
,"attnum" smallint not null
,"atttypmod" integer not null
,"attnotnull" boolean not null
,"attisdropped" boolean not null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_attribute');
create foreign table $v_sql$||tbname_5||$v_sql$(
"objoid" oid not null
,"classoid" oid not null
,"objsubid" integer not null
,"description" text not null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_description');
create foreign table $v_sql$||tbname_6||$v_sql$(
"indrelid" oid not null
,"indisprimary" boolean not null
,"indkey" int2vector not null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_index');
create foreign table $v_sql$||tbname_8||$v_sql$(
"schemaname" name null
,"tablename" name null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_tables');
create foreign table $v_sql$||tbname_9||$v_sql$(
"schemaname" name null
,"viewname" name null
) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_views');
$v_sql$
into v_sql
from tool.dblink_connection_info
where connname = p_remote_connname
;
execute v_sql;
--判断表或视图是否存在
execute $v_sql$
select 1 from "$v_sql$||tbname_8||$v_sql$"
where tablename = '$v_sql$||p_tablename||$v_sql$'
and schemaname = '$v_sql$||p_schemaname||$v_sql$'
union all
select 1 from "$v_sql$||tbname_9||$v_sql$"
where viewname = '$v_sql$||p_tablename||$v_sql$'
and schemaname = '$v_sql$||p_schemaname||$v_sql$'
;
$v_sql$ into existbj;
if existbj is null then
raise exception '表或视图不存在!';
end if;
v_sql := $v_sql$
select
'drop table if exists "'||'$v_sql$||p_newtablename||$v_sql$'||'";'||
chr(10)||'create table "'||'$v_sql$||p_newtablename||$v_sql$'||'" ('||
chr(10)||
string_agg(chr(9)||
case when c.attnum = 1 then ' ' else ',' end||
'"'||c.attname||'" '|| --字段名
format_type(c.atttypid, c.atttypmod)|| --字段类型
case when c.attnotnull = true then ' not null' else ' null' end,chr(10) order by c.attnum
)||
--主键约束
coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
chr(10)||');'||
--表备注(注释)
coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(h.description,'''','''''')||''';','')||
--字段备注(注释)
coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
from $v_sql$||tbname_1||$v_sql$ a
inner join $v_sql$||tbname_2||$v_sql$ b
on (a.relnamespace = b.oid)
inner join $v_sql$||tbname_3||$v_sql$ c
on (a.oid = c.attrelid)
left join $v_sql$||tbname_5||$v_sql$ f
on (a.oid = f.objoid and c.attnum = f.objsubid)
left join (
select d.indrelid
,string_agg('"'||c.attname||'"',',' order by c.attnum) prikey
from $v_sql$||tbname_1||$v_sql$ a, $v_sql$||tbname_2||$v_sql$ b, $v_sql$||tbname_3||$v_sql$ c, $v_sql$||tbname_6||$v_sql$ d
where a.relnamespace = b.oid
and a.oid = c.attrelid
and a.oid = d.indrelid
and d.indisprimary = true
and c.attnum = any(d.indkey)
and a.relname = '$v_sql$||p_tablename||$v_sql$'
and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
group by d.indrelid
) g
on (a.oid = g.indrelid)
left join $v_sql$||tbname_5||$v_sql$ h
on (a.oid = h.objoid and h.objsubid = 0)
where c.attnum > 0
and not c.attisdropped
and a.relname = '$v_sql$||p_tablename||$v_sql$'
and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
group by b.nspname,a.relname,h.description,g.prikey;
$v_sql$
;
execute v_sql into p_result;
--删除临时外部表
execute $v_sql$
--存在临时的外部表时,直接删除
drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
drop foreign table if exists $v_sql$||tbname_5||$v_sql$;
drop foreign table if exists $v_sql$||tbname_6||$v_sql$;
drop foreign table if exists $v_sql$||tbname_8||$v_sql$;
drop foreign table if exists $v_sql$||tbname_9||$v_sql$;
$v_sql$
;
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
return p_result;
exception when others then
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.get_ddl_remote_pg2pg(varchar, varchar, varchar, varchar) to public;
sp_extract_mapping
create or replace function tool.sp_extract_mapping(
tablename character varying
,schemaname character varying
,newtablename character varying
,remote_connname character varying default '${CONNNAME}'::character varying
)
returns void
language plpgsql
security definer
as $function$
/* 作者 : v-yuzhenc
* 功能 : 给定远程(远程连接)表的表名,模式名(mysql为库名),
* 在tool用户下创建一个指定新表名的外部映射表,
* 访问该映射表,相当于直接访问远程表。
* select * from tool.newtablename;
* tablename : 指定远程表的表名
* schemaname : 指定远程表的模式名(mysql为库名)
* newtablename : 在tool用户下创建的外部表表名
* remote_connname : 远程连接名,来自tool.dblink_connection_info.connname
* */
declare
p_tablename varchar := tablename;
p_schemaname varchar := schemaname;
p_newtablename varchar := newtablename;
p_remote_connname tool.dblink_connection_info.connname%type := remote_connname;
v_datasource_type tool.dblink_connection_info.conntype%type;
v_sql varchar;
o_search_path varchar; --模式搜索路径
begin
--判断表是否为空
if p_tablename is null then
raise exception '参数tablename不能为空!';
end if;
if p_schemaname is null then
raise exception '参数schemaname不能为空!';
end if;
if p_newtablename not like 'tmp\_%' then
raise exception '参数newtablename必须以tmp_开头!';
end if;
--记录原来的模式搜索路径
execute 'show search_path;' into o_search_path;
--临时切换模式搜索路径
execute 'SET search_path TO tool,'||o_search_path;
v_datasource_type := conntype from tool.dblink_connection_info where connname = p_remote_connname limit 1;
case v_datasource_type
when 'pg' then v_sql := tool.get_ddl_remote_pg2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);
when 'mysql' then v_sql := tool.get_ddl_remote_mysql2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);
else v_sql := null;
end case;
if v_sql is null then
raise exception '不支持的数据源类型!目前只支持mysql和pg!';
end if;
--拼接外部表ddl
select replace(replace(replace(replace(replace(regexp_replace(v_sql
,' not null| null','','g'),
'drop table ','drop foreign table '
),
'create table ','create foreign table '
),
'comment on ','--comment on '
),
',primary key ','--,primary key '
),
');',') server '||fdw_server||' options('||case conntype when 'pg' then 'schema_name ' when 'mysql' then 'dbname' end||''''||p_schemaname||''''||',table_name '||''''||p_tablename||''''||');'
)
into v_sql
from tool.dblink_connection_info
where connname = p_remote_connname;
execute v_sql;
--execute 'show search_path;' into o_search_path;
--raise notice '%',v_sql;
--raise notice '%',o_search_path;
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
exception when others then
--恢复模式搜索路径
execute 'SET search_path TO '||o_search_path;
raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.sp_extract_mapping(varchar, varchar, varchar, varchar) to public;
等待与唤醒
创建dblink插件
\c etl postgres
create extension dblink;
ETL任务通知表
\c etl tool
CREATE TABLE tool.etl_task_notice (
table_name varchar(64) NULL, -- 表名
schema_name varchar(64) NULL, -- 模式名
update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP -- 表的更新时间
);
COMMENT ON TABLE tool.etl_task_notice IS 'etl任务通知,etl任务完成时,得到结果表后向该表中插入一条数据';
COMMENT ON COLUMN tool.etl_task_notice.table_name IS '表名';
COMMENT ON COLUMN tool.etl_task_notice.schema_name IS '模式名';
COMMENT ON COLUMN tool.etl_task_notice.update_time IS '表的更新时间';
ALTER TABLE tool.etl_task_notice OWNER TO tool;
创建序列
create sequence tool.seq_dblink_sessionid
increment by 1
minvalue 1
maxvalue 9999
start 1
cache 1
cycle;
辅助函数
is_return_result
create or replace function tool.is_return_result(
select_statement character varying
,retrytimes integer default 60
)
returns integer
language plpgsql
security definer
as $function$
/*
* 作者:v-yuzhenc
* 功能:执行动态select语句,并且该执行过程是自治的,
* 判断是否有结果返回,有则返回1,否则返回0
* vsql:执行的动态sql
* retrytimes:拿不到连接时拿连接重试次数,默认重试60次
* */
declare
p_select_statement varchar := select_statement;
v_sql varchar; --动态sql
p_retrytimes int := retrytimes;
p_count int := 0;
p_session_id varchar := nextval('seq_dblink_sessionid')::varchar;
p_session_name varchar := 'dblink_'||p_session_id;
p_result int := 0;
dblink_conn varchar;
begin
--尝试拿连接
while true loop
begin
--获取dblink连接
select 'host='||hostname||' port='||port||' dbname='||dbname||' user='||username||' password='||userpwd
into dblink_conn
from tool.dblink_connection_info
where connname = '101.34.75.200-pg-etl';
perform dblink_connect(p_session_name,dblink_conn);
exit;
exception when others then
if p_count >= p_retrytimes then
exit;
end if;
p_count := p_count + 1;
--睡眠1s再拿连接
perform pg_sleep(1);
continue;
end;
end loop;
v_sql := 'select 1 from (' || p_select_statement || ') a limit 1';
--执行动态sql语句
select count(1) into p_result from dblink(p_session_name,v_sql) as (id int);
--关闭dblink连接
perform dblink_disconnect(p_session_name);
return p_result;
--报错时得先把连接关掉再把错误抛出来
exception when others then
begin
perform dblink_disconnect(p_session_name);
exception when others then
null;
end;
raise exception '%',SQLERRM;
end;
$function$
;
grant all on function tool.is_return_result(varchar, int4) to public;
get_15_interval_time
create or replace function tool.get_15_interval_time(
v_time timestamp with time zone default current_timestamp
,time_type character varying default 'before'::character varying
)
returns timestamp with time zone
language plpgsql
as $function$
/*
* 作者:v-yuzhenc
* 功能:获取某个时间所在分钟0-15,15-30,30-45,45-60的先后时间
* v_time:指定时间
* time_type:时间类型,时间段的前一段则BEFORE(默认),时间段的后一段则AFTER
* */
declare
p_v_time timestamptz := v_time;
p_time_type varchar := upper(time_type);
v_result timestamptz;
begin
if p_time_type not in ('BEFORE','AFTER') then
raise exception 'p_time_type必须是BEFORE或者AFTER!';
end if;
if p_time_type = 'BEFORE' then
v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||
case
when to_char(p_v_time,'mi')::numeric-0 >= 0 and to_char(p_v_time,'mi')::numeric-0 < 15 then '00'
when to_char(p_v_time,'mi')::numeric-15 >= 0 and to_char(p_v_time,'mi')::numeric-15 < 15 then '15'
when to_char(p_v_time,'mi')::numeric-30 >= 0 and to_char(p_v_time,'mi')::numeric-30 < 15 then '30'
when to_char(p_v_time,'mi')::numeric-45 >= 0 and to_char(p_v_time,'mi')::numeric-45 < 15 then '45'
end||'00','yyyymmddhh24miss');
elsif p_time_type = 'AFTER' then
v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||
case
when to_char(p_v_time,'mi')::numeric-0 >= 0 and to_char(p_v_time,'mi')::numeric-0 < 15 then '00'
when to_char(p_v_time,'mi')::numeric-15 >= 0 and to_char(p_v_time,'mi')::numeric-15 < 15 then '15'
when to_char(p_v_time,'mi')::numeric-30 >= 0 and to_char(p_v_time,'mi')::numeric-30 < 15 then '30'
when to_char(p_v_time,'mi')::numeric-45 >= 0 and to_char(p_v_time,'mi')::numeric-45 < 15 then '45'
end||'00','yyyymmddhh24miss') + interval '15 minutes';
end if;
return v_result;
end;
$function$
;
grant execute on function tool.get_15_interval_time(timestamptz, varchar) to public;
wait_table
create or replace function tool.wait_table(
sql_statement character varying
,check_freq integer default 30
,check_time integer default 300
)
returns integer
language plpgsql
as $function$
/*
* 作者:v-yuzhenc
* 功能:等待某个sql执行返回结果集,只能是select语句,等待成功返回1,等待失败返回0
* sql_statement:sql语句
* check_freq:检测频率,默认每30s检测一次
* check_time:最多检测多少秒,默认5分钟
* */
declare
p_sql_statement varchar := sql_statement;
p_check_freq numeric := check_freq;
p_check_time numeric := check_time;
v_result int := 0; --返回结果
v_sql varchar(32767); --动态sql
v_id numeric;
check_begin numeric := 0;
v_hint varchar(400);
begin
--只能select语句
if (trim(lower(p_sql_statement)) ~ '^select') = false then
raise exception 'sql_statement参数必须以select开头!';
end if;
--开始检测
raise notice '----------开始检测----------';
--动态sql
v_sql := 'select 1 from (' || p_sql_statement || ') a limit 1';
raise notice '检测语句为:%',v_sql;
loop
v_hint := '当前检测时间为:'||to_char(clock_timestamp(),'yyyy-mm-dd hh24:mi:ss');
raise notice '%',v_hint;
begin
v_id := tool.is_return_result(v_sql);
--判断v_id是否为空
if v_id = 1 then
--等表标识置为1
v_result := 1;
else
--当前检测没有检测通过,则初始时间后移
check_begin := check_begin + p_check_freq;
end if;
--若动态sql因为某表的不存在而产生异常,则不退出,继续下一次等表
exception when others then
check_begin := check_begin + p_check_freq;
end;
--退出循环的条件就是:等表超时或者等表标识为1
exit when check_begin > p_check_time or v_result = 1;
perform pg_sleep(p_check_freq);
end loop;
if v_result = 1 then
raise notice '----------等表成功----------';
else
raise notice '----------等表超时----------';
raise exception '----------等表超时----------';
end if;
return v_result;
end;
$function$
;
grant execute on function tool.wait_table(varchar, int4, int4) to public;
sp_wait
create or replace function tool.sp_wait(
tablename character varying
,schemaname character varying
,waittype character varying default 'real'::character varying
,checkfreq integer default 20
,checktime integer default 600
)
returns void
language plpgsql
as $function$
/*
* 作者:v-yuzhenc
* 功能:等待某张表被sp_notify存过唤醒。
* tablename : 等待的表名
* schemaname : 表名对应的模式名
* waittype : 等待类型 year month day hour real
* year:当年被唤醒过
* month:当月被唤醒过
* day:当天被唤醒过
* hour:当前所在的小时被唤醒过
* real:伪实时,0-15,15-30,30-45,45-60,当前分钟所在的分钟范围被唤醒过
* checkfreq:检测频率,默认每隔20s检测一次
* checktime:最多检测多少秒,默认检测600s
* */
declare
p_tablename varchar := lower(tablename);
p_schemaname varchar := lower(schemaname);
p_waittype varchar := lower(waittype);
p_checkfreq int := checkfreq;
p_checktime int := checktime;
v_sql varchar;
begin
if p_tablename is null then
raise exception '参数tablename不能为空!';
end if;
if p_schemaname is null then
raise exception '参数schemaname不能为空!';
end if;
if p_waittype not in ('year','month','day','hour','real') then
raise exception '参数waittype范围[''year'',''month'',''day'',''hour'',''real'']!';
end if;
if p_checkfreq >= 61 or p_checkfreq <= 0 then
raise exception '参数checkfreq范围 1 ~ 60 !';
end if;
if p_checktime >= 1801 or p_checktime <= 0 then
raise exception '参数checktime范围 1 ~ 1800 !';
end if;
v_sql := $v_sql$select 1 from tool.etl_task_notice
where table_name = '$v_sql$||p_tablename||$v_sql$'
and schema_name = '$v_sql$||p_schemaname||$v_sql$'
and $v_sql$||
case p_waittype
when 'year' then $v_sql$to_char(update_time,'yyyy') = to_char(current_timestamp,'yyyy')$v_sql$
when 'month' then $v_sql$to_char(update_time,'yyyymm') = to_char(current_timestamp,'yyyymm')$v_sql$
when 'hour' then $v_sql$to_char(update_time,'yyyymmddhh24') = to_char(current_timestamp,'yyyymmddhh24')$v_sql$
when 'real' then $v_sql$update_time >= tool.get_15_interval_time(current_timestamp,'BEFORE') and update_time <= tool.get_15_interval_time(current_timestamp,'AFTER')$v_sql$
end
;
--raise notice '%',v_sql;
perform tool.wait_table(v_sql,p_checkfreq,p_checktime);
end;
$function$
;
grant all on function tool.sp_wait(varchar, varchar, varchar, int4, int4) to public;
sp_notify
create or replace function tool.sp_notify(
tablename character varying
,schemaname character varying
)
returns void
language plpgsql
security definer
as $function$
/*
* 作者:v-yuzhenc
* 功能:唤醒某个模式下的某个表,
* 即通知该表已经etl完成
* tablename:表名
* schemaname:模式名
* */
declare
p_tablename tool.etl_task_notice.table_name%type := lower(tablename);
p_schemaname tool.etl_task_notice.schema_name%type := lower(schemaname);
begin
if p_tablename is null then
raise exception '参数tablename不能为空';
end if;
if p_schemaname is null then
raise exception '参数schemaname不能为空';
end if;
insert into tool.etl_task_notice(
table_name
,schema_name
) values (
p_tablename
,p_schemaname
);
end;
$function$
;
grant all on function tool.sp_notify(varchar, varchar) to public;
海豚调度
安装
架构
搭建
对象 | 对象实例 | 说明 |
---|---|---|
租户 | root | 操作系统执行用户 |
普通用户 | yuzhenchao | 姓名全拼 |
普通用户 | yuxiaotan | 姓名全拼 |
数据源 | dbselect@223.242.39.75:5432/dp | 海豚调度只有select权限的数据源 |
数据源 | mdl@101.34.75.200:5432/etl | etl数据库中模型开发的数据源 |
数据源 | apl@101.34.75.200:5432/etl | etl数据库中应用层开发的数据源 |
项目 | 海豚调度元数据建模 | 对海豚调度的元数据进行etl处理 |
项目 | 海豚调度元数据应用开发——yuzhenchao | yuzhenchao的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度 |
项目 | 海豚调度元数据应用开发——yuxiaotan | yuxiaotan的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度 |
告警实例 | 模型告警 | 模型不管成功还是失败,都需要进行告警 |
告警实例 | 应用告警-yuzhenchao | 应用告警通知指定的人yuzhenchao,一般只告警失败的任务 |
告警实例 | 应用告警-yuxiaotan | 应用告警通知指定的人yuxiaotan,一般只告警失败的任务 |
告警组 | 模型告警 | 直接与告警实例一一对应 |
告警组 | 应用告警-yuzhenchao | 直接与告警实例一一对应 |
告警组 | 应用告警-yuxiaotan | 直接与告警实例一一对应 |
环境管理 | datax-exec | datax的执行环境配置 |
创建租户root
创建普通用户
创建数据源
创建项目
创建告警实例(钉钉告警)
创建告警组
创建datax环境
注:
- 我这里选择的默认分组,所以需要在每台worker的机器上安装datax,并且让DATAX_HOME=/usr/local/datax
- 数据库版本过高时,需要在对应的插件目录将旧版本的驱动删除,否则会连接失败
授权管理
ETL过程
可回溯的etl过程——替代变量
变量名 | 变量值 | 变量说明 |
---|---|---|
today | $[yyyyMMdd] | 今天 |
yesterday | $[yyyyMMdd-1] | 昨天 |
day2before | $[yyyyMMdd-2] | 2天前 |
day3before | $[yyyyMMdd-3] | 3天前 |
day4before | $[yyyyMMdd-4] | 4天前 |
day5before | $[yyyyMMdd-5] | 5天前 |
day6before | $[yyyyMMdd-6] | 6天前 |
day7before | $[yyyyMMdd-7] | 7天前 |
模型层工作流
ods层
- ods层抽取数据,为了避免表产生死锁等待以及读脏数据,我们采用临时表方式进行数据抽取
- 先创建临时表,将数据抽取到临时表,最终将临时表重命名为目标表
源库名 | 源表名 | 目标模式 | 目标表名 | 表说明 |
---|---|---|---|---|
dp | t_ds_project | mdl | ods_dp_t_ds_project | 项目表 |
dp | t_ds_process_definition | mdl | ods_dp_t_ds_process_definition | 工作流定义表 |
dp | t_ds_tenant | mdl | ods_dp_t_ds_tenant | 租户表 |
dp | t_ds_schedules | mdl | ods_dp_t_ds_schedules | 调度表 |
dp | t_ds_process_task_relation | mdl | ods_dp_t_ds_process_task_relation | 工作流任务关系表 |
dp | t_ds_task_definition | mdl | ods_dp_t_ds_task_definition | 任务定义表 |
dp | t_ds_process_instance | mdl | ods_dp_t_ds_process_instance | 工作流实例表 |
dp | t_ds_task_instance | mdl | ods_dp_t_ds_task_instance | 任务实例表 |
dp | t_ds_relation_process_instance | mdl | ods_dp_t_ds_relation_process_instance | 存放的数据用于处理流程定义中含有子流程的情况 |
dp | t_ds_session | mdl | ods_dp_t_ds_session | 会话表 |
dp | t_ds_user | mdl | ods_dp_t_ds_user | 用户表 |
dp | t_ds_datasource | mdl | ods_dp_t_ds_datasource | 数据源表 |
dp | t_ds_access_token | mdl | ods_dp_t_ds_access_token | 访问令牌表 |
dp | t_ds_relation_datasource_user | mdl | ods_dp_t_ds_relation_datasource_user | 用户数据源关系表 |
dp | t_ds_queue | mdl | ods_dp_t_ds_queue | 队列表 |
fdw实现抽数(不建议)
- 先选择项目
- 选择工作流定义
- 选择创建工作流
- 拖拉拽sql图标
- 在开发用户下开发ods的脚本,然后在海豚调度上配置任务,我们以表 t_ds_project举例
--通过该函数获取建表语句
--select tool.get_ddl_remote_pg2pg('t_ds_project','public','tmp_ods_dp_t_ds_project');
drop table if exists "tmp_ods_dp_t_ds_project";
create table "tmp_ods_dp_t_ds_project" (
"id" integer not null
,"name" character varying(100) null
,"code" bigint not null
,"description" character varying(255) null
,"user_id" integer null
,"flag" integer null
,"create_time" timestamp without time zone null
,"update_time" timestamp without time zone null
,primary key ("id")
);
comment on table "tmp_ods_dp_t_ds_project" is '项目表';
comment on column "tmp_ods_dp_t_ds_project"."id" is '项目id';
comment on column "tmp_ods_dp_t_ds_project"."name" is '项目名称';
comment on column "tmp_ods_dp_t_ds_project"."code" is '项目编码';
comment on column "tmp_ods_dp_t_ds_project"."description" is '项目描述';
comment on column "tmp_ods_dp_t_ds_project"."user_id" is '用户id,对应t_ds_user.id';
comment on column "tmp_ods_dp_t_ds_project"."create_time" is '项目创建时间';
comment on column "tmp_ods_dp_t_ds_project"."update_time" is '项目最近更新时间';
--创建映射表
do $$
begin
perform tool.sp_extract_mapping('t_ds_project','public','tmp_ods_dp_t_ds_project');
end$$;
--抽取数据
insert into tmp_ods_dp_t_ds_project
select * from tool.tmp_ods_dp_t_ds_project
;
do $$
begin
--增加last_pg_time时间字段
alter table tmp_ods_dp_t_ds_project add last_pg_time timestamp default current_timestamp;
--空字符串处理成null
perform tool.replace_to_null('tmp_ods_dp_t_ds_project');
end$$;
do $$
begin
--如果ods表存在就删除
drop table if exists ods_dp_t_ds_project;
--表重命名
alter table tmp_ods_dp_t_ds_project rename to ods_dp_t_ds_project;
end$$;
do $$
begin
--ods_dp_t_ds_project
perform tool.sp_notify('ods_dp_t_ds_project','mdl');
--表分析
analyze ods_dp_t_ds_project;
end$$;
- 点击保存
- 点击上线然后运行工作流
- 查看日志
- 其他同理,连线可以控制并发度
datax实现抽数(建议)
- 创建工作流,拖拉拽sql组件和datax组件
- 编写脚本,任务配置
- 前置sql组件
drop table if exists "tmp_ods_dp_t_ds_project";
create table "tmp_ods_dp_t_ds_project" (
"id" integer not null
,"name" character varying(100) null
,"code" bigint not null
,"description" character varying(255) null
,"user_id" integer null
,"flag" integer null
,"create_time" timestamp without time zone null
,"update_time" timestamp without time zone null
,primary key ("id")
);
comment on table "tmp_ods_dp_t_ds_project" is '项目表';
comment on column "tmp_ods_dp_t_ds_project"."id" is '项目id';
comment on column "tmp_ods_dp_t_ds_project"."name" is '项目名称';
comment on column "tmp_ods_dp_t_ds_project"."code" is '项目编码';
comment on column "tmp_ods_dp_t_ds_project"."description" is '项目描述';
comment on column "tmp_ods_dp_t_ds_project"."user_id" is '用户id,对应t_ds_user.id';
comment on column "tmp_ods_dp_t_ds_project"."create_time" is '项目创建时间';
comment on column "tmp_ods_dp_t_ds_project"."update_time" is '项目最近更新时间';
- datax的select语句
--数据抽取select语句
select
"id"
,"name"
,"code"
,"description"
,"user_id"
,"flag"
,"create_time"
,"update_time"
from t_ds_project
;
--后置操作
do $$
begin
--增加last_pg_time时间字段
alter table tmp_ods_dp_t_ds_project add last_pg_time timestamp default current_timestamp;
--空字符串处理成null
perform tool.replace_to_null('tmp_ods_dp_t_ds_project');
end$$;
do $$
begin
--如果ods表存在就删除
drop table if exists ods_dp_t_ds_project;
--表重命名
alter table tmp_ods_dp_t_ds_project rename to ods_dp_t_ds_project;
end$$;
do $$
begin
--ods_dp_t_ds_project
perform tool.sp_notify('ods_dp_t_ds_project','mdl');
--表分析
analyze ods_dp_t_ds_project;
end$$;
- 其他同理
- 效率对比
dw层——sql脚本
dim层——sql脚本
dm层——sql脚本
总控
应用层工作流
Magic-API统一接口平台
其他函数
array_position
create or replace function tool.array_position(
arrayint integer[]
,elementint integer
,times integer default 1
)
returns integer
language plpgsql
as $function$
/* 作者 : v-yuzhenc
* 功能 : 返回数组指定元素所在的位置,未匹配到返回0
* arrayint : 数组
* element : 指定元素
* times : 第几次出现的位置
* */
declare
p_times int := 0;
p_result int := 0;
begin
if array_length(arrayint,1) is null then
return p_result;
end if;
for i in 1..array_length(arrayint,1) loop
if arrayint[i] = elementint then
p_times := p_times + 1;
end if;
if p_times = times then
return i;
end if;
end loop;
return p_result;
end;
$function$
;
grant execute on function tool.array_position(_int4, int4, int4) to public;
replace_to_null
create or replace function tool.replace_to_null(
tablename character varying
,schemaname character varying default ("current_user"())::character varying(64)
,execuser varchar default current_user::varchar
)
returns void
language plpgsql
security definer
as $function$
/* 作者 : v-yuzhenc
* 功能:扫描指定表的所有varchar和text类型的字段,将字段值为''替换成null
* tablename : 需要扫描的表名
* schemaname : 需要扫描的模式名
* */
declare
p_tablename varchar := lower(tablename);
p_schemaname varchar := lower(schemaname);
p_execuser varchar(64) := execuser;--调用者
existbj int := 0; --存在标记
v_sql varchar; --动态sql
begin
if p_schemaname <> p_execuser then
raise exception '你只有权限操作自己模式下的表!';
end if;
--扫描varchar和text字段
select count(1)
into existbj
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
inner join pg_type d
on (c.atttypid = d.oid)
where c.attnum > 0
and d.typname in ('varchar','text')
and a.relname = p_tablename
and b.nspname = p_schemaname;
--若不存在varchar或者text字段,则不做处理
if existbj = 0 then
raise notice '%.%表不存在或者不需要处理空字符串!',p_schemaname,p_tablename;
return;
end if;
--拼接处理空字符串语句
select
string_agg('update '||p_tablename||'
set '||c.attname||' = null where '||c.attname||' = '''';',chr(10))
into v_sql
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
inner join pg_type d
on (c.atttypid = d.oid)
where c.attnum > 0
and d.typname in ('varchar','text')
and a.relname = p_tablename
and b.nspname = p_schemaname;
--通过集中处理程序执行动态sql
perform tool.sp_execsql(v_sql,p_schemaname);
end;
$function$
;
grant execute on function tool.replace_to_null(varchar, varchar,varchar) to public;
sp_jzdb
create or replace function tool.sp_jzdb(
tablename character varying
,oldschema character varying
,newschema character varying
,tablename_new character varying default null::character varying
,execuser varchar default current_user::varchar
)
returns void
language plpgsql
security definer
as $function$
/*
* 作者 : v-yuzhenc
* 功能:集中导表,将指定模式下的表以新的表名导入到新的模式下
* tablename : 指定模式下的表,不区分大小写
* oldschema : 指定模式,不区分大小写
* newschema : 新的模式,不区分大小写
* tablename_new : 新的表名,不区分大小写,默认与旧表名相同
* execuser : 调用者用户名,无需传参,默认值即可
* */
declare
p_tablename varchar(64) := lower(tablename);
p_tablename_new varchar(64) := coalesce (lower(tablename_new),p_tablename);
p_oldschema varchar(64) := lower(oldschema);
p_newschema varchar(64) := lower(newschema);
p_execuser varchar(64) := execuser; --调用者用户名,无需人工传参
jzdb_tname varchar(64) := 'jzdb_'||p_tablename_new;
existbj numeric;
v_sql varchar;
bak_tname varchar;
begin
--调用者可以将自己的表导入到其他模式
--也可以将其他模式的表导入到自己模式
if p_execuser <> p_oldschema and p_execuser <> p_newschema then
raise exception 'oldschema和newschema参数之一必须与当前用户名一致,因为只允许操作与自己相关的表';
end if;
--建表
v_sql := tool.get_ddl(p_oldschema||'.'||p_tablename,'table',jzdb_tname);
perform tool.sp_execsql(v_sql,p_newschema);
--插入数据
perform tool.sp_execsql('insert into "'||jzdb_tname||'" select * from "'||p_oldschema||'"."'||p_tablename||'";',p_newschema);
--判断新模式下的新表是否有重名的
select count(1)
into existbj
from pg_tables a
where a.tablename = p_tablename_new
and a.schemaname = p_newschema;
if existbj <> 0 then
--如果有重名的表存在
--则做备份,加前缀 o_模式名_表名_版本号
select
count(1)+1 v_no --版本号
into existbj
from pg_tables a
where substr(a.tablename,4+length(p_newschema),length(p_tablename_new)) = p_tablename_new
and a.schemaname = 'tool'
;
--新表名建在tool下,加前缀 o_模式名_表名_版本号
bak_tname := 'o_'||p_newschema||'_'||p_tablename_new||'_'||existbj::varchar;
--建表
v_sql := tool.get_ddl(p_newschema||'.'||p_tablename_new,'table',bak_tname);
perform tool.sp_execsql(v_sql,'tool');
--插入数据
perform tool.sp_execsql('insert into "'||bak_tname||'" select * from "'||p_newschema||'"."'||p_tablename_new||'";','tool');
--分析表
perform tool.sp_execsql('analyze "'||bak_tname||'";','tool');
perform tool.sp_execsql('drop table "'||p_tablename_new||'";',p_newschema);
end if;
--重命名表
perform tool.sp_execsql('alter table "'||jzdb_tname||'" rename to '||p_tablename_new,p_newschema);
--分析表
perform tool.sp_execsql('analyze "'||p_tablename_new||'";',p_newschema);
end;
$function$
;
grant execute on function tool.sp_jzdb(varchar, varchar, varchar, varchar,varchar) to public;
sp_sqlexec_efficient
create or replace function tool.sp_sqlexec_efficient(
sqlexec character varying
,exectimes integer default 1
)
returns character varying
language plpgsql
as $function$
/*
* 作者:v-yuzhenc
* 功能:sql执行效率检测,事物不会提交,
* 返回每次执行时间和平均执行时间
* sqlexec:需要检测的sql
* exectimes:需要检测的次数,范围1-10
* */
declare
begin_time timestamp;
end_time timestamp;
exec_duration interval;
v_result varchar := '';
p_sqlexec varchar := sqlexec;
p_exectimes int := exectimes;
exec_begin int := 0;
sum_sqlexec interval := '00:00:00.000000'::interval;
sqlexec_avg interval;
begin
if p_exectimes <= 0 or p_exectimes >= 11 then
raise exception 'exectimes参数值范围为1~10';
end if;
while exec_begin >= 0 and exec_begin <= p_exectimes-1 loop
begin
begin_time := clock_timestamp();
execute p_sqlexec;
end_time = clock_timestamp();
raise exception '回滚' using errcode = '12345';
exception when sqlstate '12345' then
null;
end;
exec_duration = end_time-begin_time;
v_result := v_result||to_char(begin_time,'yyyy-mm-dd hh24:mi:ss.us')||'~'||to_char(end_time,'yyyy-mm-dd hh24:mi:ss.us')||':'||exec_duration::varchar||';'||chr(10);
sum_sqlexec := sum_sqlexec + exec_duration;
exec_begin := exec_begin + 1;
end loop;
sqlexec_avg := sum_sqlexec/p_exectimes;
v_result := v_result||'avg:'||sqlexec_avg::varchar||';';
return v_result;
end;
$function$
;
grant execute on function tool.sp_sqlexec_efficient(varchar, int4) to public;
更多推荐
已为社区贡献1条内容
所有评论(0)