flink既支持批数据处理,也支持流数据处理。flink1.12版本后,批流进行了api统一。开发语言可以选择java和scala,这里选择java。下面以wordcount为例,讲解flink编程的流程。
开发前提:
- idea
- maven
- jdk 1.8
一、maven依赖
<properties><flink.version>1.15.0</flink.version><flink.scala.version>2.12</flink.scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_${flink.scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies>
二、数据结构
wordcount.txt数据结构:
zhangsan,lisi,wangwu
ajdhaj,hdgaj,zhangsan
lisi,wangwu
三、DataSet API读取离线文件实现wordcount统计
package com.first.example;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.*;import org.apache.flink.api.java.operators.AggregateOperator;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.FilterOperator;import org.apache.flink.api.java.operators.FlatMapOperator;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;import org.apache.flink.util.Collector;/** * @author xxxx * @date 2023-09-26 12:48 */public class DataSetWordCount {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource = env.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");FlatMapOperator<String, Tuple2<String, Integer>> tuple2FlatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(",");for (String s1 : split) {if (!StringUtils.isEmpty(s1)) {collector.collect(new Tuple2<>(s1, 1));}}}});AggregateOperator<Tuple2<String, Integer>> total = tuple2FlatMapOperator.groupBy(0).sum(1);//打印输出total.print();}}
编程模式从获取执行环境env开始; readTextFile读取文件;flatMap对数据进行处理;最后经过groupBy和sum进行统一处理。
结果:
(ajdhaj,1)
(wangwu,2)
(lisi,2)
(hdgaj,1)
(zhangsan,2)
四、DataStream API以流执行方式实现wordcount统计
package com.first.example;import org.apache.commons.compress.archivers.StreamingNotSupportedException;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @date 2023-09-26 13:21 */public class StreamingWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> textFile = senv.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");DataStream<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(",");for (String s1 : split) {if (!StringUtils.isEmpty(s1)) {collector.collect(Tuple2.of(s1, 1));}}}});tuple2SingleOutputStreamOperator.keyBy(ele -> ele.f0).sum(1).print(">>>");senv.execute("first streaming....");}}
执行过程:
获取执行环境;读取文件转为流;经过flatMap转换;然后通过keyBy和sum算子进行聚合操作。
结果:
:6> (lisi,1)
:4> (wangwu,1)
:4> (zhangsan,1)
:6> (hdgaj,1)
:6> (lisi,2)
:4> (zhangsan,2)
:4> (wangwu,2)
:2> (ajdhaj,1)
批流对比结果发现:批是整体一个输出,流是按数据逐条统计输出
五、使用批流统一api批数据统计
flink 1.12后使用批流统一api进行处理
package com.first.example;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * 批流完全联合统一 * @date 2023-09-26 15:38 */public class BatchStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();// 默认为流,此处设为批senv.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource<String> textFile = senv.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");DataStream<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(",");for (String s1 : split) {if (!StringUtils.isEmpty(s1)) {collector.collect(Tuple2.of(s1, 1));}}}});tuple2SingleOutputStreamOperator.keyBy(ele -> ele.f0).sum(1).print(">>>");senv.execute("first streaming....");}}
与流处理过程基本一致,添加了运行环境设置:
senv.setRuntimeMode(RuntimeExecutionMode.BATCH);
结果为:
:2> (ajdhaj,1)
:4> (wangwu,2)
:4> (zhangsan,2)
:6> (lisi,2)
:6> (hdgaj,1)
下一节讲解Flink的安装和部署。