目录
版本信息
操作记录
安装datahub v0.10.0
datahub 快速部署
元数据摄取
通过sqlline获取指定sql文件中HiveSQL的字段级血缘关系,并将结果提交到datahub
版本信息
python 3.8.16
datahub v0.10.0
操作记录
安装datahub v0.10.0
详见datahub官网A Metadata Platform for the Modern Data Stack | DataHub
执行命令
python3 -m pip install --upgrade pip wheel setuptoolspython3 -m pip install --upgrade acryl-datahub==0.10.0
查看版本
python3 -m datahub version
datahub 快速部署
将datahub v0.10.0分支下的docker-compose-without-neo4j.quickstart.yml文件准备到本地
datahub/docker-compose-without-neo4j.quickstart.yml at v0.10.0 · datahub-project/datahub · GitHub
确保以下端口未被占用
3306 for MySQL
9200 for Elasticsearch
9092 for the Kafka broker
8081 for Schema Registry
2181 for ZooKeeper
9002 for the DataHub Web Application (datahub-frontend)
8080 for the DataHub Metadata Service (datahub-gms)
如有占用在命令行传参进行替换
datahub docker quickstart --mysql-port 53306
执行
python3 -m datahub docker quickstart -f ./docker-compose-without-neo4j.quickstart.yml --version v0.10.0
开始拉取镜像
成功构建容器,datahub启动成功
访问hadoop105:9002
输入账号、密码datahub
元数据摄取
安装hive插件
python3 -m pip install 'acryl-datahub[hive]'
安装过程中报错
尝试安装依赖项
yum -y install gcc gcc-c++ python-devel.x86_64 cyrus-sasl-devel.x86_64 gcc-c++.x86_64
再次安装hive插件
检查datahub插件
python3 -m datahub check plugins
hive插件成功安装
编写摄取hive元数据的配置文件
source:type: "hive"config: host_port: "hadoop102:10000" # hiveserver2 sink:type: "datahub-rest"config:server: "http://hadoop105:8080" # datahub gms server
开始摄取hive元数据
python3 -m datahub ingest -c ./hive-metadata-ingestion.yml
元数据摄取完成
进入web页面查看
通过sqllineage获取指定sql文件中HiveSQL的字段级血缘关系,并将结果提交到datahub
参考datahub官方文档给出的提交细粒度血缘的脚本datahub/lineage_emitter_dataset_finegrained.py at master · datahub-project/datahub · GitHub
参考sqllineage文档Getting Started — sqllineage 1.3.7 documentation
结合sqllineage,获取指定sql的列级血缘,再调用datahub rest api,将结果提交到datahub
具体py代码如下
from sqllineage.runner import LineageRunnerimport datahub.emitter.mce_builder as builderfrom datahub.emitter.mcp import MetadataChangeProposalWrapperfrom datahub.emitter.rest_emitter import DatahubRestEmitterfrom datahub.metadata.com.linkedin.pegasus2avro.dataset import (DatasetLineageType,FineGrainedLineage,FineGrainedLineageDownstreamType,FineGrainedLineageUpstreamType,Upstream,UpstreamLineage,)import sys'''解析目标sql文件的HiveSQL生成列级血缘,提交到datahubsql文件路径作为命令行参数传入脚本提交到datahub的platform = hive'''# 库名设置def datasetUrn(tableName):return builder.make_dataset_urn("hive", tableName)# platform = hive# 表、列级信息设置def fieldUrn(tableName, fieldName):return builder.make_schema_field_urn(datasetUrn(tableName), fieldName)# 目标sql文件路径sqlFilePath = sys.argv[1]sqlFile = open(sqlFilePath, mode='r', encoding='utf-8')sql = sqlFile.read().__str__()# 获取sql血缘result = LineageRunner(sql)# 获取sql中的下游表名targetTableName = result.target_tables[0].__str__()print(result)print('===============')# 打印列级血缘结果result.print_column_lineage()print('===============')# 获取列级血缘lineage = result.get_column_lineage# 字段级血缘listfineGrainedLineageList = []# 用于冲突检查的上游listupStreamsList = []# 遍历列级血缘for columnTuples in lineage():# 上游listupStreamStrList = []# 下游listdownStreamStrList = []# 逐个字段遍历for column in columnTuples:# 元组中最后一个元素为下游表名与字段名,其他元素为上游表名与字段名# 遍历到最后一个元素,为下游表名与字段名if columnTuples.index(column) == len(columnTuples) - 1:downStreamFieldName = column.raw_name.__str__()downStreamTableName = column.__str__().replace('.' + downStreamFieldName, '').__str__()# print('下游表名:' + downStreamTableName)# print('下游字段名:' + downStreamFieldName)downStreamStrList.append(fieldUrn(downStreamTableName, downStreamFieldName))else:upStreamFieldName = column.raw_name.__str__()upStreamTableName = column.__str__().replace('.' + upStreamFieldName, '').__str__()# print('上游表名:' + upStreamTableName)# print('上游字段名:' + upStreamFieldName)upStreamStrList.append(fieldUrn(upStreamTableName, upStreamFieldName))# 用于检查上游血缘是否冲突upStreamsList.append(Upstream(dataset=datasetUrn(upStreamTableName), type=DatasetLineageType.TRANSFORMED))fineGrainedLineage = FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.DATASET,upstreams=upStreamStrList,downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,downstreams=downStreamStrList)fineGrainedLineageList.append(fineGrainedLineage)fieldLineages = UpstreamLineage(upstreams=upStreamsList, fineGrainedLineages=fineGrainedLineageList)lineageMcp = MetadataChangeProposalWrapper(entityUrn=datasetUrn(targetTableName),# 下游表名aspect=fieldLineages)# 调用datahub REST APIemitter = DatahubRestEmitter('http://datahub-gms:8080') # datahub gms server# Emit metadata!emitter.emit_mcp(lineageMcp)
执行py脚本
python3 sql-lineage-to-datahub.py target.sql
查看web界面