1.Flink HistoryServer用途
HistoryServer可以在Flink 作业终止运行(Flink集群关闭)之后,还可以查询已完成作业的统计信息。此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

2.部署Flink HistoryServer
1、创建 flink historyserver pvc,保存Flink作业归档数据。

[root@k8s-demo001 ~]# cat flink-historyserver-pvc.yaml#Flink Historyserver 持久化存储pvcapiVersion: v1kind: PersistentVolumeClaimmetadata:name: flink-historyserver-pvc# historyserver pvc名称namespace: flink # 指定归属的名命空间spec:storageClassName: nfs-storage #sc名称,更改为实际的sc名称accessModes:- ReadWriteMany #采用ReadWriteMany的访问模式resources:requests:storage: 1Gi#存储容量,根据实际需要更改[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-pvc.yaml

2、配置flink historyserver,创建flink historyserver configmap

[root@k8s-demo001 ~]# cat flink-historyserver-conf.yamlkind: ConfigMapapiVersion: v1metadata:name: flink-historyserver-confnamespace: flinkannotations:kubesphere.io/creator: admindata:flink-conf.yaml: |blob.server.port: 6124kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:2kubernetes.jobmanager.replicas: 1kubernetes.jobmanager.cpu: 1.0$internal.flink.version: v1_13kubernetes.taskmanager.cpu: 1.0jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122kubernetes.service-account: flinkkubernetes.cluster-id: flink-historyserverkubernetes.container.image: flink-hdfs:1.13.6parallelism.default: 2kubernetes.namespace: flinktaskmanager.numberOfTaskSlots: 2kubernetes.rest-service.exposed.type: ClusterIPkubernetes.operator.reconcile.interval: 15 skubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTEkubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactoryjobmanager.memory.process.size: 1024mtaskmanager.memory.process.size: 1024mkubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypointkubernetes.pod-template-file: /tmp/flink_op_generated_podTemplate_17272077926352838674.yamlexecution.target: kubernetes-sessionjobmanager.archive.fs.dir: file:///opt/flink/flink_historyhistoryserver.archive.fs.dir: file:///opt/flink/flink_historyhistoryserver.archive.fs.refresh-interval: 10000historyserver.web.port: 8082web.tmpdir: /opt/flink/webuploadweb.upload.dir: /opt/flink/webuploadweb.cancel.enable: falseinternal.cluster.execution-mode: NORMALqueryable-state.proxy.ports: 6125state.checkpoints.dir: file:///opt/flink/checkpointslog4j.properties: |# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.monitorInterval=30# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.file.ref = MainAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFOlogger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3logger.shaded_zookeeper.level = INFO# Log all infos in the given fileappender.main.name = MainAppenderappender.main.type = RollingFileappender.main.append = trueappender.main.fileName = ${sys:log.file}appender.main.filePattern = ${sys:log.file}.%iappender.main.layout.type = PatternLayoutappender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.main.policies.type = Policiesappender.main.policies.size.type = SizeBasedTriggeringPolicyappender.main.policies.size.size = 100MBappender.main.policies.startup.type = OnStartupTriggeringPolicyappender.main.strategy.type = DefaultRolloverStrategyappender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFFlog4j-console.properties: |# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO # Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF # Flink Deployment Logging Overrides# rootLogger.level = DEBUG[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-conf.yaml

检查

3、创建Historyserver服务

[root@k8s-demo001 ~]# cat flink-historyserver.yamlapiVersion: apps/v1kind: Deploymentmetadata:namespace: flinklabels:app: flink-historyservername: flink-historyservername: flink-historyserverspec:replicas: 1selector:matchLabels:name: flink-historyservertemplate:metadata:namespace: flinklabels:app: flink-historyservername: flink-historyserverspec:hostAliases:# hosts配置- ip: "172.16.252.129"hostnames:- "Kafka-01"- ip: "172.16.252.130"hostnames:- "Kafka-02"- ip: "172.16.252.131"hostnames:- "Kafka-03"containers:- name: flink-historyserverenv:- name: TZvalue: Asia/Shanghaiimage: flink:1.13.6command: [ 'sh','-c','/docker-entrypoint.sh history-server' ]ports:- containerPort: 8082volumeMounts:- name: flink-historyserver-confmountPath: /opt/flink/conf/flink-conf.yamlsubPath: flink-conf.yaml- name: flink-historyserver-confmountPath: /opt/flink/conf/log4j.propertiessubPath: log4j.properties- name: flink-historyserver-confmountPath: /opt/flink/conf/log4j-console.propertiessubPath: log4j-console.properties- name: flink-historyservermountPath: /opt/flink/flink_historyvolumes:# 挂载卷配置- name: flink-historyserver-confconfigMap:name: flink-historyserver-conf- name: flink-historyserverpersistentVolumeClaim:claimName: flink-historyserver-pvc# ---# kind: Service# apiVersion: v1# metadata:# namespace: flink# name: flink-historyserver# spec:# type: NodePort# ports:# - port: 8082# nodePort: 31082# selector:# name: flink-historyserver # ingress按实际情况配置---apiVersion: v1kind: Servicemetadata:labels:app: flink-historyservername: flink-historyservername: flink-historyservernamespace: flinkspec:selector:app: flink-historyserverports:- port: 8082protocol: TCPtargetPort: 8082---apiVersion: networking.k8s.io/v1kind: Ingressmetadata:namespace: flinkname: flink-historyserverannotations:nginx.ingress.kubernetes.io/default-backend: ingress-nginx-controllernginx.ingress.kubernetes.io/use-regex: 'true'spec:ingressClassName: nginxrules:- host: "flink-hs.k8s.io"http:paths:- pathType: Prefixpath: "/"backend:service:name: flink-historyserverport:number: 8082[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver.yaml

验证:

访问Flink UI:
http://flink-hs.k8s.io/

3.提交flink作业
1、编写提交作业的yaml

这里需要挂在Historyserver的pvc,并配置Historyserver的归档路径到pvc挂载路径

[root@k8s-demo001 ~]# cat application-deployment-checkpoint-ha-hs.yamlapiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata:namespace: flinkname: application-deployment-checkpoint-ha-hs# flink 集群名称spec:image: flink:1.13.6# flink基础镜像flinkVersion: v1_13# flink版本,选择1.13imagePullPolicy: IfNotPresent# 镜像拉取策略,本地没有则从仓库拉取ingress: # ingress配置,用于访问flink web页面template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"className: "nginx"annotations:nginx.ingress.kubernetes.io/rewrite-target: "/$2"flinkConfiguration:taskmanager.numberOfTaskSlots: "2"state.checkpoints.dir: file:///opt/flink/checkpointshigh-availability.type: kuberneteshigh-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HAhigh-availability.storageDir: file:///opt/flink/flink_recovery# JobManager HA数据保存路径jobmanager.archive.fs.dir: file:///opt/flink/flink_history# JobManager 归档路径historyserver.archive.fs.dir: file:///opt/flink/flink_history# Historyserver 归档路径historyserver.archive.fs.refresh-interval: "10000"# Historyserver 文件刷新间隔serviceAccount: flinkjobManager:replicas: 2# HA下, jobManger的副本数要大于1resource:memory: "1024m"cpu: 1taskManager:resource:memory: "1024m"cpu: 1podTemplate:spec:hostAliases:- ip: "172.16.252.129"hostnames:- "Kafka-01"- ip: "172.16.252.130"hostnames:- "Kafka-02"- ip: "172.16.252.131"hostnames:- "Kafka-03"containers:- name: flink-main-containerenv:- name: TZvalue: Asia/ShanghaivolumeMounts:- name: flink-jar# 挂载nfs上的jarmountPath: /opt/flink/jar- name: flink-checkpoints# 挂载checkpoint pvcmountPath: /opt/flink/checkpoints- name: flink-log# 挂载日志 pvcmountPath: /opt/flink/log- name: flink-ha# HA pvc配置mountPath: /opt/flink/flink_recovery- name: flink-historyservermountPath: /opt/flink/flink_historyvolumes:- name: flink-jarpersistentVolumeClaim:claimName: flink-jar-pvc- name: flink-checkpointspersistentVolumeClaim:claimName: flink-checkpoint-application-pvc- name: flink-logpersistentVolumeClaim:claimName: flink-log-pvc- name: flink-hapersistentVolumeClaim:claimName: flink-ha-pvc- name: flink-historyserverpersistentVolumeClaim:claimName: flink-historyserver-pvcjob:jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包entryClass: org.fblinux.StreamWordCountWithCPargs: # 传递到作业main方法的参数- "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"- "flink_test"- "172.16.252.113"- "3306"- "flink_test"- "wc"- "file:///opt/flink/checkpoints"- "10000"- "1"parallelism: 1upgradeMode: stateless[root@k8s-demo001 ~]# kubectl apply -f application-deployment-checkpoint-ha-hs.yaml

作业提交之后,可以手动往Kafka 写入一些数据,然后关闭作业
作业运行中historyserver是没有信息的,作业终止后history service才会查询到相关信息

在这里插入图片描述