Flink中的拼接流connect
的使用其实非常简单,就是leftStream.connect(rightStream)
的方式,但是有一点我们需要清楚,使用connect
后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStream
和rightStream
,也可以使用不同的逻辑处理leftStream
和rightStream
.
如下图:
下面的演示代码也可以通过这个图结合来看,其实connect
算子最主要的作用就是共享状态,如常用的广播状态
.
- 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/** * @Author: J * @Version: 1.0 * @CreateTime: 2023/8/7 * @Description: 多流操作-流连接 **/public class FlinkConnect {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 添加数据源1DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));// 添加数据源2DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));// 拼接数据流ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);// 这里使用map算子作为演示SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {/** * map1作为左流 **/@Overridepublic String map1(String value) throws Exception {return "字符串: " + value;}/** * map2作为右流 **/@Overridepublic String map2(Double value) throws Exception {return "数字: " + (value * 100);}});// 打印结果resultStream.print();env.execute("Connect Operator");}}
- 结果
3> 字符串: b1> 数字: 600.02> 字符串: a3> 数字: 1100.02> 数字: 2220.02> 字符串: d2> 数字: 9800.03> 数字: 10000.01> 字符串: c