zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

跨脚本步骤 复杂 SQL血缘识别

2023-02-18 16:30:45 时间

一. 血缘抽取目标

  1. 强依赖关系:识别sql执行结果保存[hive,mysql,hdfs]物理库表字段依赖链路,入库字段由哪些物理库表字段产生[select]。
  2. 弱依赖关系:识别整个过程参与的物理表字段列表,排除select部分,包含[partitionBy > joinOn > where > groupby > having > orderby ]

二. 血缘抽取对象

  • 公司内部POC脚本, 层级结构
{
    sciptMeta: 脚本基础信息
    jobs: {//任务列表
        jobid: [ //领域任务集合
            E:{
                stepMeta: //基础数据抽取步骤信息
                sql: //执行sql            
             },
            T:{
                stepMeta: //数据清洗合并步骤信息
                sql: //执行sql            
             }
            L:{
                stepMeta: //结果写库步骤
                sql: //执行sql            
            }
        ]
    } 
}
  • poc脚本例子[历史遗留]
# -*-coding:UTF-8 -*-
import numpy as np
etl_template = {"run_mode": "day",
                "delay_days": 1,
                "use_project_tz":True,
                "interval_days":1,
                "extend_args":{
                    "kid":"1,2,3,4",
                    "year":"<!getyear(date)!>",
                    "month":"<!getmonth(date)!>",
                    "sdate":"<!getyear(date)!><!getmonth(date)!>01",
                    "edate":"<!curmonthend(date)!>",
                    "sdateformat":"<!getyear(date)!>-<!getmonth(date)!>-01",
                    "edateformat":"<!getyear(date)!>-<!getmonth(date)!>-31",
                    "active_partition":"<!getyear(date)!><!getmonth(date)!>",
                    "online_partition": "<!getyear(date)!><!getmonth(date)!>"
                },
                "jobs": [
                    { # cost:6分钟
                    "job_name": "job_0",
                    "analysis": [
                        {  # 从mysql抽取 项目信息
                                "step_name": "e_0",
                                "db_type": "mysql",
                                "dbname": "ad",
                                "xy_user": "inf",
                                "sql": """
select gid, project_code, project_name from ad.if_project where project_code = '%(project)s'
                                """
                        },
                        {#缓存上线信息
                            #"etl_class_name": "XQLExtractionEtl",
                            "step_name": "e_2",
                            "pre_sql": ["set time_zone='%(tz)s:00'"],
                            "db_type": "mysql",
                            "dbname": "ad",
                            "xy_user": "inf",
                            "sql_assemble":"BatchAssemble",
                            "batch_id":"id",
                            "batch_size":50000,
                            #"data_save_location":"hdfs",
                            "sql": """
select dates,
<#if '%(project)s'=='rcnco' #>
upper(concat(deviceid, '|', bundleid)) as deviceid
<#endif#>
<#if '%(project)s'!='rcnco' #>
deviceid
<#endif#>
from l_activate_date WHERE (dates >= '%(sdateformat)s' AND dates <= '%(edateformat)s')
                            """
                        },
                    ],
                    "transform": [
                         {
                            "if":"<!sizef(job_0.e_2)!> >0",
                            "step_name": "t_1",
                            "etl_class_name": "XQLTransformEtl",
                            "data_from": ["job_0.e_2"],
                            "data_save_location": "hdfs",
                            "data_merge": False,
                            "hive_file_num": 50,
                            "tag":"ad",
                            "sql": """
load delta.`/poc/cache/ad_cube/ods_conf_project/` as t_conf_project;
set gid="select gid from t_conf_project where project_code='%(project)s'" options type="sql";

select deviceid,dates,'%(year)s' as year,'%(month)s' as month
from job_0.e_2 as t1;

save overwrite t1 as delta.`/poc/cache/ad_cube/<!job_0.e_0.gid!>/ods_activate_date/` options `replaceWhere`="year='%(year)s' and month='%(month)s'" partitionBy 'year,month';
                        """,
                        },
                    ],
                    "loading": [
                        {
                            'db_type': 'cache',
                            'step_name': 'l_1',
                            'data_load_type': 'CacheLoad',
                            'xy_user': 'lq',
                            'dbname': 'ad',
                            'data_from': 'job_0.e_2',
                            'cache_name': 'ad_channel_v6/total/ad_channel_user_online_total',
                            'cache_amount': "increment",
                            "cache_by":"deviceid|dates",
                        }
                    ]},
                ],
          }

三. 血缘抽取技术方法

  • Antlr4: 抽取SQL语法结构,通过visitor 遍历出SQL树
  • 递归收敛:抽取模型通过递归最后收敛到 QueryDefault(sql依赖关系主体)与 TableName (表与字段信息主体)

四. 关键G4语法

querySql
    : queryTerm
      orderPart?
      limitPart?
    ;
    
queryTerm
    : queryPrimary                                                                          #primaryQuery
    | left=queryTerm
      operator=(INTERSECT | UNION | EXCEPT) setQuantifier?
      right=queryTerm                                                                       #unionQuery
    ;

queryPrimary
    : queryDefault
    | '(' querySql  ')'
    ;

queryDefault
    : selectPart
      fromPart?
      lateralView*
      wherePart?
      groupPart?
      havingPart?
    ;

selectPart : SELECT setQuantifier? selectItem (',' selectItem)* ;
fromPart: FROM relation (',' relation)* lateralView* pivotClause?;
lateralView : LATERAL VIEW (OUTER)? qualifiedName '(' (expression (',' expression)*)? ')' tblName=identifier (AS? colName+=identifier (',' colName+=identifier)*)?;
wherePart : WHERE where=booleanExpression ;
groupPart : GROUP BY setQuantifier? sortItem (',' sortItem)* groupWith?;
havingPart : HAVING having=booleanExpression ;
pivotClause : PIVOT '(' aggregates=namedExpressionSeq FOR pivotColumn IN '(' pivotValues+=pivotValue (',' pivotValues+=pivotValue)* ')' ')';


relation
    : left=relation right=rightRelation                                                     #joinRelation
    | sampledRelation                                                                       #defaultRelation
    ;

rightRelation
    : CROSS JOIN sampledRelation                                                            #crossJoinRightRelation
    | joinType JOIN relation joinCriteria                                                   #joinRightRelation
    | NATURAL joinType JOIN sampledRelation                                                 #naturalJoinRightRelation
    ;

sampledRelation
    : relationPrimary (AS? tableAlias = strictIdentifier)?
    ;

relationPrimary
    : qualifiedName                                                                         #tableName
    | '(' querySql ')'                                                                      #subQueryRelation
    | '(' relation ')'                                                                      #parenthesizedRelation
    ;

五. 对象模型

  • SQL树对象模型
SQL树对象
  • 血缘实体模型
血缘实体对象

六. 抽取效果

  • 效果
HDFS.parquet.`/ad/city/build`: {
    uid <- [
        sd.mysql.city.citydata-uid
        ]
    is_upgrade <- [
        sd.mysql.inf.if_conf_building-bid1,
        sd.mysql.inf.if_conf_building-blv1,
        sd.mysql.inf.if_conf_building-bid2,
        sd.mysql.inf.if_conf_building-blv2,
        sd.mysql.inf.if_conf_building-bid3,
        sd.mysql.inf.if_conf_building-blv3,
        sd.mysql.inf.if_conf_building-bid4,
        sd.mysql.inf.if_conf_building-blv4,
        sd.mysql.city.citydata-citydata
    ]
}

  • 测试用例
package com.onemt.da.datamap.flow.test;

import com.onemt.da.datamap.flow.api.bean.PocScript;
import com.onemt.da.datamap.flow.api.bean.PocStep;
import org.junit.Test;

import java.util.Arrays;

/**
 * @author: 王金绍
 * @create: 2022-12-15 09:07
 **/
public class LaterViewTests extends BaseTest {

    PocScript pocScript = new PocScript();

    public PocStep job6_e_01(){
        PocStep pocStep = new PocStep(pocScript);
        pocStep.setJobId("job_6");
        pocStep.setStepId("e_01");
        pocStep.setDsList(Arrays.asList("mysql.inf"));
        pocStep.setStepType("analysis");
        pocStep.setExecutor("ExtractionEtl");
        pocStep.setSql("select pid,lvl as citylvl,food,wood,iron,mithril,crystal,bid1,bid2,bid3,bid4,blv1,blv2,blv3,blv4\n" +
                "from if_conf_building\n" +
                "where pid=101");
        return pocStep;
    }

    public PocStep job4_t_2(){
        PocStep pocStep = new PocStep(pocScript);
        pocStep.setJobId("job_4");
        pocStep.setStepId("t_2");
        pocStep.setDsList(Arrays.asList("job_4.t_1"));
        pocStep.setStepType("transform");
        pocStep.setExecutor("XQLTransformEtl");
        pocStep.setSql("select uid,type2_name,blv from job_4_t_1 as tmp;\n" +
                "select uid,ifnull(citylvl,0) citylvl,ifnull(b102, 0) b102,ifnull(b301,0) as b301,ifnull(b103,0) b103,ifnull(b104,0) b104,ifnull(b105,0) b105,ifnull(b111,0) b111,ifnull(b112,0) b112,ifnull(b113,0) b113,ifnull(b114,0) b114,ifnull(b115,0) b115,ifnull(b116,0) b116,ifnull(b117,0) b117,ifnull(b118,0) b118,ifnull(b119,0) b119,ifnull(b120,0) b120,ifnull(b121,0) b121,ifnull(b122,0) b122,ifnull(b201,0) b201,ifnull(b202,0) b202,ifnull(b203,0) b203,ifnull(b204,0) b204,ifnull(b205,0) b205,ifnull(b206,0) b206\n" +
                "from tmp\n" +
                "pivot\n" +
                "(\n" +
                "  ifnull(max(blv),0) for\n" +
                "  type2_name in (\"citylvl\",\"b102\",\"b301\",\"b103\",\"b104\",\"b105\",\"b111\",\"b112\",\"b113\",\"b114\",\"b115\",\"b116\",\"b117\",\"b118\",\"b119\",\"b120\",\"b121\",\"b122\",\"b201\",\"b202\",\"b203\",\"b204\",\"b205\",\"b206\")\n" +
                ") as job_4_t_2;");
        return pocStep;
    }

    public PocStep job4_t_1(){
        PocStep pocStep = new PocStep(pocScript);
        pocStep.setJobId("job_4");
        pocStep.setStepId("t_1");
        pocStep.setDsList(Arrays.asList("job_3.e_1"));
        pocStep.setStepType("transform");
        pocStep.setExecutor("XQLTransformEtl");
        pocStep.setSql("select uid, get_json_object(citydata, '$.blist') as blist from job_3_e_1 as a1;\n" +
                "select uid, explode(split(jsonArr2String(blist, \"||\"),'\\\\|\\\\|')) explode_col  from  a1 as a2;\n" +
                "select uid,\n" +
                "    cast(get_json_object(explode_col, '$.binfo.bid')as int) as bid,\n" +
                "    cast(get_json_object(explode_col, '$.binfo.lv')as int) as blv\n" +
                "from a2 as a3;\n" +
                "select uid,bid,\n" +
                "    case\n" +
                "        when bid=101 then 'citylvl'\n" +
                "        else concat ('b',bid)\n" +
                "    end as type2_name,\n" +
                "    max(blv) as blv\n" +
                "from a3\n" +
                "where bid in (101,102,301,103,104,105,111,112,113,114,115,116,117,118,119,120,121,122,201,202,203,204,205,206)\n" +
                "group by uid,bid");
        return pocStep;
    }

    public PocStep job3_e_1(){
        PocStep pocStep = new PocStep(pocScript);
        pocStep.setJobId("job_3");
        pocStep.setStepId("e_1");
        pocStep.setDsList(Arrays.asList("mysql.city"));
        pocStep.setStepType("analysis");
        pocStep.setExecutor("ExtractionEtl");
        pocStep.setSql("select uid,citydata,UNIX_TIMESTAMP(updatetime) as createtime,'citydata' as `table`\n" +
                "from citydata\n" +
                "where uid in (112,434)");
        return pocStep;
    }

    public PocStep job6_t_2(){
        PocStep pocStep = new PocStep(pocScript);
        pocStep.setJobId("job6");
        pocStep.setStepId("t_1");
        pocStep.setDsList(Arrays.asList("job_4.t_2", "job_6.e_01"));
        pocStep.setStepType("transform");
        pocStep.setExecutor("XQLTransformEtl");
        pocStep.setSql("" +
                "load json.`job_4_t_2` as job_4_t_2;load json.`job_6.e_01` as job_6_e_01;" +
                "select uid,\n" +
                "        citylvl+1 as citylvl,\n" +
                "        ifnull(b102,0) as b102," +
                "       ifnull(b301,0) as b301,\n" +
                "        ifnull(b204,0) as b204," +
                "       ifnull(b103,0) as b103,\n" +
                "        ifnull(b104,0) as b104,\n" +
                "        ifnull(b105,0) as b105,\n" +
                "        ifnull(b111,0) as b111,\n" +
                "        ifnull(b112,0) as b112,\n" +
                "        ifnull(b113,0) as b113,\n" +
                "        ifnull(b114,0) as b114,\n" +
                "        ifnull(b115,0) as b115,\n" +
                "        ifnull(b116,0) as b116,\n" +
                "        ifnull(b117,0) as b117,\n" +
                "        ifnull(b118,0) as b118,\n" +
                "        ifnull(b119,0) as b119,\n" +
                "        ifnull(b120,0) as b120,\n" +
                "        ifnull(b121,0) as b121,\n" +
                "        ifnull(b122,0) as b122,\n" +
                "        ifnull(b201,0) as b201,\n" +
                "        ifnull(b202,0) as b202,\n" +
                "        ifnull(b203,0) as b203,\n" +
                "        ifnull(b205,0) as b205,\n" +
                "        ifnull(b206,0) as b206\n" +
                "    from job_4_t_2 as t1;\n" +
                "\n" +
                "select citylvl,concat(bid1,':',blv1,',',bid2,':',blv2,',',bid3,':',blv3,',',bid4,':',blv4) upgrade_detail from job_6_e_01 as u1;\n" +
                "select citylvl,str_to_map(upgrade_detail,',',':') aa from u1 as u2;\n" +
                "select citylvl,bid,lvl from u2 t lateral view explode(aa) tf AS bid,lvl as u3;\n" +
                "\n" +
                "select uid,citylvl,concat('102',':',b102,',','301',':',b301,',','204',':',b204,',','301',':',b301,',','204',':',b204,',','103',':',b103,',','104',':',b104,',','105',':',b105,',','111',':',b111,',','112',':',b112,',','113',':',b113,',','114',':',b114,',','115',':',b115,',','116',':',b116,',','117',':',b117,',','118',':',b118,',','119',':',b119,',','120',':',b120,',','121',':',b121,',','122',':',b122,',','201',':',b201,',','202',':',b202,',','203',':',b203,',','205',':',b205,',','206',':',b206) build_detail from t1 as d1;\n" +
                "select uid,citylvl,str_to_map(build_detail,',',':') aa from d1 as d2;\n" +
                "select uid,citylvl,bid,lvl from d2 t lateral view explode(aa) tf AS bid,lvl as d3;\n" +
                "\n" +
                "select a.uid,if(sum(if(a.lvl=b.lvl,1,if(b.lvl is null,1,0)))=23,1,0) is_upgrade from d3 a left join u3 b on a.citylvl=b.citylvl and a.bid=b.bid group by uid as last1;" +
                "save append last1 as parquet.`/ad/city/build`;\n");
        return pocStep;
    }

    @Test
    public void test(){
        pocScript.setScriptName("all/pool_user_pack_push_item");
        pocScript.setProject("sd");
        pocScript.setStepList(Arrays.asList(job6_e_01(), job3_e_1(), job4_t_1(), job4_t_2(), job6_t_2()));
        testScript(pocScript);
    }
}