跨脚本步骤 复杂 SQL血缘识别
2023-02-18 16:30:45 时间
一. 血缘抽取目标
- 强依赖关系:识别sql执行结果保存[hive,mysql,hdfs]物理库表字段依赖链路,入库字段由哪些物理库表字段产生[select]。
- 弱依赖关系:识别整个过程参与的物理表字段列表,排除select部分,包含[partitionBy > joinOn > where > groupby > having > orderby ]
二. 血缘抽取对象
- 公司内部POC脚本, 层级结构
{
sciptMeta: 脚本基础信息
jobs: {//任务列表
jobid: [ //领域任务集合
E:{
stepMeta: //基础数据抽取步骤信息
sql: //执行sql
},
T:{
stepMeta: //数据清洗合并步骤信息
sql: //执行sql
}
L:{
stepMeta: //结果写库步骤
sql: //执行sql
}
]
}
}
- poc脚本例子[历史遗留]
# -*-coding:UTF-8 -*-
import numpy as np
etl_template = {"run_mode": "day",
"delay_days": 1,
"use_project_tz":True,
"interval_days":1,
"extend_args":{
"kid":"1,2,3,4",
"year":"<!getyear(date)!>",
"month":"<!getmonth(date)!>",
"sdate":"<!getyear(date)!><!getmonth(date)!>01",
"edate":"<!curmonthend(date)!>",
"sdateformat":"<!getyear(date)!>-<!getmonth(date)!>-01",
"edateformat":"<!getyear(date)!>-<!getmonth(date)!>-31",
"active_partition":"<!getyear(date)!><!getmonth(date)!>",
"online_partition": "<!getyear(date)!><!getmonth(date)!>"
},
"jobs": [
{ # cost:6分钟
"job_name": "job_0",
"analysis": [
{ # 从mysql抽取 项目信息
"step_name": "e_0",
"db_type": "mysql",
"dbname": "ad",
"xy_user": "inf",
"sql": """
select gid, project_code, project_name from ad.if_project where project_code = '%(project)s'
"""
},
{#缓存上线信息
#"etl_class_name": "XQLExtractionEtl",
"step_name": "e_2",
"pre_sql": ["set time_zone='%(tz)s:00'"],
"db_type": "mysql",
"dbname": "ad",
"xy_user": "inf",
"sql_assemble":"BatchAssemble",
"batch_id":"id",
"batch_size":50000,
#"data_save_location":"hdfs",
"sql": """
select dates,
<#if '%(project)s'=='rcnco' #>
upper(concat(deviceid, '|', bundleid)) as deviceid
<#endif#>
<#if '%(project)s'!='rcnco' #>
deviceid
<#endif#>
from l_activate_date WHERE (dates >= '%(sdateformat)s' AND dates <= '%(edateformat)s')
"""
},
],
"transform": [
{
"if":"<!sizef(job_0.e_2)!> >0",
"step_name": "t_1",
"etl_class_name": "XQLTransformEtl",
"data_from": ["job_0.e_2"],
"data_save_location": "hdfs",
"data_merge": False,
"hive_file_num": 50,
"tag":"ad",
"sql": """
load delta.`/poc/cache/ad_cube/ods_conf_project/` as t_conf_project;
set gid="select gid from t_conf_project where project_code='%(project)s'" options type="sql";
select deviceid,dates,'%(year)s' as year,'%(month)s' as month
from job_0.e_2 as t1;
save overwrite t1 as delta.`/poc/cache/ad_cube/<!job_0.e_0.gid!>/ods_activate_date/` options `replaceWhere`="year='%(year)s' and month='%(month)s'" partitionBy 'year,month';
""",
},
],
"loading": [
{
'db_type': 'cache',
'step_name': 'l_1',
'data_load_type': 'CacheLoad',
'xy_user': 'lq',
'dbname': 'ad',
'data_from': 'job_0.e_2',
'cache_name': 'ad_channel_v6/total/ad_channel_user_online_total',
'cache_amount': "increment",
"cache_by":"deviceid|dates",
}
]},
],
}
三. 血缘抽取技术方法
- Antlr4: 抽取SQL语法结构,通过visitor 遍历出SQL树
- 递归收敛:抽取模型通过递归最后收敛到 QueryDefault(sql依赖关系主体)与 TableName (表与字段信息主体)
四. 关键G4语法
querySql
: queryTerm
orderPart?
limitPart?
;
queryTerm
: queryPrimary #primaryQuery
| left=queryTerm
operator=(INTERSECT | UNION | EXCEPT) setQuantifier?
right=queryTerm #unionQuery
;
queryPrimary
: queryDefault
| '(' querySql ')'
;
queryDefault
: selectPart
fromPart?
lateralView*
wherePart?
groupPart?
havingPart?
;
selectPart : SELECT setQuantifier? selectItem (',' selectItem)* ;
fromPart: FROM relation (',' relation)* lateralView* pivotClause?;
lateralView : LATERAL VIEW (OUTER)? qualifiedName '(' (expression (',' expression)*)? ')' tblName=identifier (AS? colName+=identifier (',' colName+=identifier)*)?;
wherePart : WHERE where=booleanExpression ;
groupPart : GROUP BY setQuantifier? sortItem (',' sortItem)* groupWith?;
havingPart : HAVING having=booleanExpression ;
pivotClause : PIVOT '(' aggregates=namedExpressionSeq FOR pivotColumn IN '(' pivotValues+=pivotValue (',' pivotValues+=pivotValue)* ')' ')';
relation
: left=relation right=rightRelation #joinRelation
| sampledRelation #defaultRelation
;
rightRelation
: CROSS JOIN sampledRelation #crossJoinRightRelation
| joinType JOIN relation joinCriteria #joinRightRelation
| NATURAL joinType JOIN sampledRelation #naturalJoinRightRelation
;
sampledRelation
: relationPrimary (AS? tableAlias = strictIdentifier)?
;
relationPrimary
: qualifiedName #tableName
| '(' querySql ')' #subQueryRelation
| '(' relation ')' #parenthesizedRelation
;
五. 对象模型
- SQL树对象模型
- 血缘实体模型
六. 抽取效果
- 效果
HDFS.parquet.`/ad/city/build`: {
uid <- [
sd.mysql.city.citydata-uid
]
is_upgrade <- [
sd.mysql.inf.if_conf_building-bid1,
sd.mysql.inf.if_conf_building-blv1,
sd.mysql.inf.if_conf_building-bid2,
sd.mysql.inf.if_conf_building-blv2,
sd.mysql.inf.if_conf_building-bid3,
sd.mysql.inf.if_conf_building-blv3,
sd.mysql.inf.if_conf_building-bid4,
sd.mysql.inf.if_conf_building-blv4,
sd.mysql.city.citydata-citydata
]
}
- 测试用例
package com.onemt.da.datamap.flow.test;
import com.onemt.da.datamap.flow.api.bean.PocScript;
import com.onemt.da.datamap.flow.api.bean.PocStep;
import org.junit.Test;
import java.util.Arrays;
/**
* @author: 王金绍
* @create: 2022-12-15 09:07
**/
public class LaterViewTests extends BaseTest {
PocScript pocScript = new PocScript();
public PocStep job6_e_01(){
PocStep pocStep = new PocStep(pocScript);
pocStep.setJobId("job_6");
pocStep.setStepId("e_01");
pocStep.setDsList(Arrays.asList("mysql.inf"));
pocStep.setStepType("analysis");
pocStep.setExecutor("ExtractionEtl");
pocStep.setSql("select pid,lvl as citylvl,food,wood,iron,mithril,crystal,bid1,bid2,bid3,bid4,blv1,blv2,blv3,blv4\n" +
"from if_conf_building\n" +
"where pid=101");
return pocStep;
}
public PocStep job4_t_2(){
PocStep pocStep = new PocStep(pocScript);
pocStep.setJobId("job_4");
pocStep.setStepId("t_2");
pocStep.setDsList(Arrays.asList("job_4.t_1"));
pocStep.setStepType("transform");
pocStep.setExecutor("XQLTransformEtl");
pocStep.setSql("select uid,type2_name,blv from job_4_t_1 as tmp;\n" +
"select uid,ifnull(citylvl,0) citylvl,ifnull(b102, 0) b102,ifnull(b301,0) as b301,ifnull(b103,0) b103,ifnull(b104,0) b104,ifnull(b105,0) b105,ifnull(b111,0) b111,ifnull(b112,0) b112,ifnull(b113,0) b113,ifnull(b114,0) b114,ifnull(b115,0) b115,ifnull(b116,0) b116,ifnull(b117,0) b117,ifnull(b118,0) b118,ifnull(b119,0) b119,ifnull(b120,0) b120,ifnull(b121,0) b121,ifnull(b122,0) b122,ifnull(b201,0) b201,ifnull(b202,0) b202,ifnull(b203,0) b203,ifnull(b204,0) b204,ifnull(b205,0) b205,ifnull(b206,0) b206\n" +
"from tmp\n" +
"pivot\n" +
"(\n" +
" ifnull(max(blv),0) for\n" +
" type2_name in (\"citylvl\",\"b102\",\"b301\",\"b103\",\"b104\",\"b105\",\"b111\",\"b112\",\"b113\",\"b114\",\"b115\",\"b116\",\"b117\",\"b118\",\"b119\",\"b120\",\"b121\",\"b122\",\"b201\",\"b202\",\"b203\",\"b204\",\"b205\",\"b206\")\n" +
") as job_4_t_2;");
return pocStep;
}
public PocStep job4_t_1(){
PocStep pocStep = new PocStep(pocScript);
pocStep.setJobId("job_4");
pocStep.setStepId("t_1");
pocStep.setDsList(Arrays.asList("job_3.e_1"));
pocStep.setStepType("transform");
pocStep.setExecutor("XQLTransformEtl");
pocStep.setSql("select uid, get_json_object(citydata, '$.blist') as blist from job_3_e_1 as a1;\n" +
"select uid, explode(split(jsonArr2String(blist, \"||\"),'\\\\|\\\\|')) explode_col from a1 as a2;\n" +
"select uid,\n" +
" cast(get_json_object(explode_col, '$.binfo.bid')as int) as bid,\n" +
" cast(get_json_object(explode_col, '$.binfo.lv')as int) as blv\n" +
"from a2 as a3;\n" +
"select uid,bid,\n" +
" case\n" +
" when bid=101 then 'citylvl'\n" +
" else concat ('b',bid)\n" +
" end as type2_name,\n" +
" max(blv) as blv\n" +
"from a3\n" +
"where bid in (101,102,301,103,104,105,111,112,113,114,115,116,117,118,119,120,121,122,201,202,203,204,205,206)\n" +
"group by uid,bid");
return pocStep;
}
public PocStep job3_e_1(){
PocStep pocStep = new PocStep(pocScript);
pocStep.setJobId("job_3");
pocStep.setStepId("e_1");
pocStep.setDsList(Arrays.asList("mysql.city"));
pocStep.setStepType("analysis");
pocStep.setExecutor("ExtractionEtl");
pocStep.setSql("select uid,citydata,UNIX_TIMESTAMP(updatetime) as createtime,'citydata' as `table`\n" +
"from citydata\n" +
"where uid in (112,434)");
return pocStep;
}
public PocStep job6_t_2(){
PocStep pocStep = new PocStep(pocScript);
pocStep.setJobId("job6");
pocStep.setStepId("t_1");
pocStep.setDsList(Arrays.asList("job_4.t_2", "job_6.e_01"));
pocStep.setStepType("transform");
pocStep.setExecutor("XQLTransformEtl");
pocStep.setSql("" +
"load json.`job_4_t_2` as job_4_t_2;load json.`job_6.e_01` as job_6_e_01;" +
"select uid,\n" +
" citylvl+1 as citylvl,\n" +
" ifnull(b102,0) as b102," +
" ifnull(b301,0) as b301,\n" +
" ifnull(b204,0) as b204," +
" ifnull(b103,0) as b103,\n" +
" ifnull(b104,0) as b104,\n" +
" ifnull(b105,0) as b105,\n" +
" ifnull(b111,0) as b111,\n" +
" ifnull(b112,0) as b112,\n" +
" ifnull(b113,0) as b113,\n" +
" ifnull(b114,0) as b114,\n" +
" ifnull(b115,0) as b115,\n" +
" ifnull(b116,0) as b116,\n" +
" ifnull(b117,0) as b117,\n" +
" ifnull(b118,0) as b118,\n" +
" ifnull(b119,0) as b119,\n" +
" ifnull(b120,0) as b120,\n" +
" ifnull(b121,0) as b121,\n" +
" ifnull(b122,0) as b122,\n" +
" ifnull(b201,0) as b201,\n" +
" ifnull(b202,0) as b202,\n" +
" ifnull(b203,0) as b203,\n" +
" ifnull(b205,0) as b205,\n" +
" ifnull(b206,0) as b206\n" +
" from job_4_t_2 as t1;\n" +
"\n" +
"select citylvl,concat(bid1,':',blv1,',',bid2,':',blv2,',',bid3,':',blv3,',',bid4,':',blv4) upgrade_detail from job_6_e_01 as u1;\n" +
"select citylvl,str_to_map(upgrade_detail,',',':') aa from u1 as u2;\n" +
"select citylvl,bid,lvl from u2 t lateral view explode(aa) tf AS bid,lvl as u3;\n" +
"\n" +
"select uid,citylvl,concat('102',':',b102,',','301',':',b301,',','204',':',b204,',','301',':',b301,',','204',':',b204,',','103',':',b103,',','104',':',b104,',','105',':',b105,',','111',':',b111,',','112',':',b112,',','113',':',b113,',','114',':',b114,',','115',':',b115,',','116',':',b116,',','117',':',b117,',','118',':',b118,',','119',':',b119,',','120',':',b120,',','121',':',b121,',','122',':',b122,',','201',':',b201,',','202',':',b202,',','203',':',b203,',','205',':',b205,',','206',':',b206) build_detail from t1 as d1;\n" +
"select uid,citylvl,str_to_map(build_detail,',',':') aa from d1 as d2;\n" +
"select uid,citylvl,bid,lvl from d2 t lateral view explode(aa) tf AS bid,lvl as d3;\n" +
"\n" +
"select a.uid,if(sum(if(a.lvl=b.lvl,1,if(b.lvl is null,1,0)))=23,1,0) is_upgrade from d3 a left join u3 b on a.citylvl=b.citylvl and a.bid=b.bid group by uid as last1;" +
"save append last1 as parquet.`/ad/city/build`;\n");
return pocStep;
}
@Test
public void test(){
pocScript.setScriptName("all/pool_user_pack_push_item");
pocScript.setProject("sd");
pocScript.setStepList(Arrays.asList(job6_e_01(), job3_e_1(), job4_t_1(), job4_t_2(), job6_t_2()));
testScript(pocScript);
}
}
相关文章
- Mac安装Navicat for MySQL(数据库管理工具)教程支持M2M1
- 【MySQL】深入分析 锁机制(一)行锁 加锁规则 之 等值查询
- MYSQL数据库-索引
- MYSQL数据库-复合查询
- MYSQL数据库-内外连接
- [android] 安卓消息推送的几种实现方式
- [android] 安卓进程优先级&为什么使用服务
- MySQL 日志之 binlog 格式 → 关于 MySQL 默认隔离级别的探讨
- [android] 安卓自定义样式和主题
- [javaEE] 开源数据库连接池
- [javaEE] 数据库连接池和动态代理
- [android] 安卓消息推送的几种实现方式
- [android] 安卓进程优先级&为什么使用服务
- Mysql join left查询无法命中索引一例
- 2022-12-11:行程和用户。以下为输出结果,请问sql语句如何写? +------------+-------------------+ | Day |
- Valentina Studio Pro mac/win(数据库管理器)最新版
- 10个人9个答错,另外1个只对一半:数据库的锁,到底锁的是什么?
- (一)Mysql基础
- (二)Mysql基本操作
- (三)SQL语言基础