python-sqlparse解析SQL表血缘
文章目录
- python-sqlparse解析SQL表血缘
- 前言
- 一、血缘是什么
- 二、准备工作
- 1、了解python-sqlparse库
- 2、python-sqlparse简单实战
- 2.1、直接查询sql解析
- 2.2、子查询sql解析
- 2.3、join 表名解析
- 3、python-sqlparse解析思路
- 3.1、insert_表名解析
- 3.2、from_表名解析
- 三、实操演练
- 1.实操脚本
- 2.实操结果
- 总结
前言
数据治理和分析的难度和阵痛总是伴随着数仓建设日益加剧。为了更好的治理数据和评估影响分析——血缘就是我们绕不过的抓手!
本文主旨:通过sqlparse解析sql获取血缘
一、血缘是什么
数据血缘也称为数据血统或谱系,是来描述数据的来源和派生关系。说白了就是这个数据是怎么来的,经过了哪些过程或阶段,从哪些表,哪些字段计算得来的。
按照血缘关系划分节点,主要有以下三类:流出节点->中间节点->流入节点
流出节点: 数据提供方,血缘关系的源端节点。
中间节点: 血缘关系中类型最多的节点,既承接流入数据,又对外流出数据。
流入节点: 血缘关系的终端节点,一般为应用层,例如可视化报表、仪表板或业务系统。
二、准备工作
当前数仓模型建设通常使用sql语言建设,而sql语言通过查表在插入表示着流出节点(from) 和 流入节点(insert)的关系。接下来就让我们开始着手准备解析sql
1、了解python-sqlparse库
内容引用:作者:fanstuck
Python-sqlparse解析SQL工具库一文详解(一)
Python-sqlparse解析SQL工具库一文详解(二)
2、python-sqlparse简单实战
2.1、直接查询sql解析
python脚本:
import sqlparseif __name__ == '__main__':sql = """insert table dwd_table_name_prod_info_df (ftime,prod_id,prod_name)select ftime,prod_id,prod_namefrom ods_table_name_prod_info_df t1_1where ftime = 20231223"""parsed = sqlparse.parse(sql)[0]count = 0for item in parsed.tokens:print(count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group)count += 1
执行结果(上述sql中换行符 和 连续空格 仅仅保留一个空格,避免结果爆炸)
结论:通过from关键字可定位from_表名
2.2、子查询sql解析
python脚本:
import sqlparseif __name__ == '__main__':sql = """insert table dwd_table_name_prod_info_df (ftime,prod_id,prod_name,prod_price)selectt1.ftime,t1.prod_id,t1.prod_name,t2.prod_pricefrom (select ftime,prod_id,prod_namefrom ods_table_name_prod_info_df t1_1where ftime = 20231223) t1 left join (select prod_id,prod_pricefrom ods_table_name_prod_price_df t2_1where ftime = 20231223) t2 on t1.prod_id = t2.prod_id"""parsed = sqlparse.parse(sql)[0]count = 0for item in parsed.tokens:print(count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group)count += 1
执行结果(上述sql中换行符 和 连续空格 仅仅保留一个空格,避免结果爆炸)
结论:需进入子查询后,按照2.1解析
2.3、join 表名解析
python脚本:
import sqlparseif __name__ == '__main__':sql = """insert table dwd_table_name_prod_info_df (ftime,prod_id,prod_name,prod_price)selectt1.ftime,t1.prod_id,t1.prod_name,t2.prod_pricefrom ods_table_name_prod_info_df t1 left join ods_table_name_prod_price_df t2on t1.prod_id = t2.prod_idwhere ftime = 20231223"""parsed = sqlparse.parse(sql)[0]count = 0for item in parsed.tokens:print(count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group)count += 1
执行结果(上述sql中换行符 和 连续空格 仅仅保留一个空格,避免结果爆炸)
结论:通过join关键字也可直接解析from_表名
3、python-sqlparse解析思路
3.1、insert_表名解析
通过insert table 关键字解析 insert_表名
3.2、from_表名解析
通过from 和 join 关键字解析from_表名
三、实操演练
1.实操脚本
import reimport sqlparsefrom sqlparse.sql import IdentifierList, Identifierfrom sqlparse.tokens import Keyword, Punctuation# 支持的join方式ALL_JOIN_TYPE = ('JOIN', 'INNER JOIN', 'LEFT JOIN', 'LEFT OUTER JOIN', 'RIGHT JOIN', 'FULL OUTER JOIN', 'FULL JOIN', 'FULL OUT JOIN', 'LEFT SEMI JOIN', 'RIGHT SEMI JOIN')def format_sql(sql_str):"""规范sql,剔除备注信息:param sql_str::return:"""format_mid_sql = ""sql_line_list = sql_str.split('\n')for sql_line in sql_line_list:# print("=" * 100)# print(sql_line)while True:res_str = re_check(r'\"([^\"]*)\"|\'([^\']*)\'', sql_line)if res_str[0]:res_sub_sql = re_check(r'--', res_str[1])if res_sub_sql[0]:format_mid_sql += " " + res_sub_sql[1]breakelse:format_mid_sql += " " + res_str[1] + res_str[2]sql_line = res_str[3]else:res_sql = re_check(r'--', sql_line)if res_sql[0]:format_mid_sql += " " + res_sql[1]else:format_mid_sql += " " + sql_linebreakformat_sql = re.sub(r'\s+', ' ', format_mid_sql.replace('\t', ' ').replace('\n', ' '))return format_sqldef re_check(rule, check_str):"""正则模版:param rule::param check_str::return:"""pattern = re.compile(rule, re.I)re_result = pattern.search(check_str)flag = Falsematch_rule = Nonebefore_rule = Noneafter_rule = Noneif re_result:match_rule = check_str[re_result.span()[0]:re_result.span()[1]]before_rule = check_str[:re_result.span()[0]]after_rule = check_str[re_result.span()[1]:]flag = Truereturn flag, before_rule, match_rule, after_ruleclass BloodSqlparseAnalysis(object):def format_subselect(self, parsed):"""规范子查询"""str_par = str(parsed).strip()first_index = str_par.find('(')last_index = str_par.rfind(')')if first_index != -1 and last_index != -1:str_par = str_par[first_index+1:last_index]return str_pardef is_subselect(self, parsed):"""是否子查询:判断依据是否存在() 是否存在select:param parsed::return:"""no_token_str = str(parsed)if no_token_str.find('(') != -1 and no_token_str.find(')') != -1 and no_token_str.upper().find('SELECT') != -1:return Truereturn Falsedef extract_table_identifiers(self, token_stream):"""递归结果返回:param token_stream::return:"""for item in token_stream:if isinstance(item, IdentifierList):for identifier in item.get_identifiers():result_tb_name = identifier.value.split(' ')[0]yield result_tb_nameelif isinstance(item, Identifier):result_tb_name = item.value.split(' ')[0]yield result_tb_nameelif item.ttype is Keyword and item.value.upper() not in ALL_JOIN_TYPE:yield item.valuedef extract_tables(self, sql):"""提取sql中的from | join 后的表名:param sql::return:"""parsed = sqlparse.parse(sql)[0]from_seen = Falsecount = 1for item in parsed.tokens:# 定位问题备注# print(from_seen, '|', count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group)if from_seen:if self.is_subselect(item):from_seen = Falseitem = self.format_subselect(item)for x in self.extract_tables(item):yield xelif (str(item).upper().find('WHERE') != -1) or (item.ttype is Keyword and item.value.upper() not in ALL_JOIN_TYPE) or item.ttype is Punctuation:from_seen = Falsecontinueelse:yield itemelif (item.ttype is Keyword and item.value.upper() == 'FROM') or (item.ttype is Keyword and item.value.upper() in ALL_JOIN_TYPE):from_seen = Truecount += 1def get_all_blood(self, sql):all_tb = self.extract_tables(sql)return list(self.extract_table_identifiers(all_tb))def analysis_sql_blood(self, sql_str):"""根据函数 estimate_sql_type 返回的类型,执行不同的sql解析操作,返回解析后所有的数据源表(剔除临时表):param sql_str: sql字符串:return: [数据源表]"""sql_str = format_sql(sql_str)res_sql_flag = re_check(r'insert\s+table\s+[^\,]*" />, sql_str)if res_sql_flag[0]:sql_flag = self.estimate_sql_type(sql_str)if sql_flag == 'no_insert':#TODO 返回非insertsql,不解析血缘insert_table = ''son_tables = []elif sql_flag == 'no_with':insert_table = self.analysis_insert_tb_name(sql_str)son_tables = self.get_all_blood(sql_str)else:insert_table = self.analysis_insert_tb_name(sql_str)tmp_tb_name_list, split_sql_str_list = self.analysis_with_sql(sql_str)sub_tables = []for sql_sub_str in split_sql_str_list:sub_son_tables = self.get_all_blood(sql_sub_str)sub_tables += sub_son_tablesson_tables = list(set(sub_tables).difference(set(tmp_tb_name_list)))son_tables = list(set(son_tables))else:insert_table = Noneson_tables = Nonereturn insert_table, son_tablesdef estimate_sql_type(self, sql_str):"""判断sql的具体类型:param sql_str: sql字符串:return: 返回三种类型:no_insert|with|no_with"""flag = 'no_insert'# 需要解析血缘的 insert sqlre_res = re_check(r'insert', sql_str)if re_res[0]:re_res_wiht = re_check(r'with\s+.*\s+as\s+\(', sql_str)if re_res_wiht[0]:flag = 'with'# sql中有临时表else:flag = 'no_with' # sql无有临时表return flagdef analysis_with_sql(self, sql_str):"""解析带with临时表的sql字符串,返回一个二维数组: 临时表 和 各临时表计算sql:param sql_str: sql字符串:return: [[临时表名],[计算sql(拆分后)]]"""pattern_tmp_tb_name = re.compile(r'with\s+[^\,]*?\s+?as\s*?\(|\)\s*\,[^\,]*?\s+?as\s*?\(|\)\s*?select\s+?[^\,]*?\s+ftime', flags=re.I)re_res_tb = pattern_tmp_tb_name.findall(sql_str)tmp_tb_name_list = []for have_tb_name_str in re_res_tb:re_res_tb = re_check(r'select', have_tb_name_str)if re_res_tb[0]:continuehave_tb_name_list = have_tb_name_str.split(' ')for index in range(len(have_tb_name_list)):have_tb_name_list[index] = re.sub(r'\,|^with$|^as$|\(|\)|\s|\n|\t', '', have_tb_name_list[index], flags=re.I)tmp_tb_name = ''.join(have_tb_name_list)tmp_tb_name_list.append(tmp_tb_name)split_sql_str_list = re.split(r'with\s+[^\,]*?\s+?as\s*?\(|\)\s*\,[^\,]*?\s+?as\s*?\(|\)\s*?select\s+?[^\,]*?\s+ftime', sql_str, flags=re.I)split_sql_str_list[-1] = 'select ftime' + split_sql_str_list[-1]return tmp_tb_name_list, split_sql_str_listdef analysis_insert_tb_name(self, sql_str):"""根据runsql获取insert后表名:param sql_str: sql字符串:return: [数据源表]"""pattern_tmp_tb_name = re.compile(r'insert\s+table\s+[^\,]*?\s*?\(|insert\s+into\s+table\s+[^\,]*?\s*?\(|insert\s+overwrite\s+table\s+[^\,]*?\s*?\(|insert\s+table\s+[^\,]*?\s+?select|insert\s+into\s+table\s+[^\,]*?\s+?select|insert\s+overwrite\s+table\s+[^\,]*?\s+?select', flags=re.I)re_res_tb_name = pattern_tmp_tb_name.findall(sql_str)table_name = Noneif len(re_res_tb_name) == 1:insert_sql = re_res_tb_name[0]insert_sql = insert_sql.replace(' :: ', "::")insert_list = insert_sql.split(' ')insert_table_name_flag = Falsefor insert_table in insert_list:if insert_table_name_flag:table_name = insert_tablebreakif insert_table.upper() == 'TABLE':insert_table_name_flag = Trueif table_name[-1] == "(":table_name = table_name[:-1]return table_nameif __name__ == '__main__':sql = """insert table dwd_table_name_prod_info_df (ftime,prod_id,prod_name,prod_price)selectt1.ftime,t1.prod_id,t1.prod_name,t2.prod_pricefrom (select ftime,prod_id,prod_namefrom ods_table_name_prod_info_df t1_1where ftime = 20231223) t1 left join (selectt2_1.prod_id,t2_1.prod_price,t2_2.prod_numberfrom ods_table_name_prod_price_df t2_1 left join ods_table_name_prod_number_df t2_2where ftime = 20231223) t2 on t1.prod_id = t2.prod_id"""bsa = BloodSqlparseAnalysis()insert_table, from_table_list = bsa.analysis_sql_blood(sql)print(insert_table)print(from_table_list)
2.实操结果
符合预期
总结
以上就是今天分享的内容,本文仅仅简单介绍了python-sqlparse解析sql的方式,欢迎大家一起讨论呀。