1.数据

订单表,分别是店铺id、用户id和支付金额

"店铺id,用户id,支付金额","shop-1,user-1,1","shop-1,user-2,1","shop-1,user-2,1","shop-1,user-3,1","shop-1,user-3,1","shop-1,user-1,1","shop-1,user-2,1","shop-1,user-4,1","shop-2,user-4,1","shop-2,user-4,1","shop-2,user-2,1"

2.可运行案例

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class Test03 {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.创建表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 3.读取数据源SingleOutputStreamOperator jsonStream = env.fromElements("shop-1,user-1,1","shop-1,user-2,1","shop-1,user-2,1","shop-1,user-3,1","shop-1,user-3,1","shop-1,user-1,1","shop-1,user-2,1","shop-1,user-4,1","shop-2,user-4,1","shop-2,user-4,1","shop-2,user-2,1");// 4.流转换为表Table table = tableEnv.fromDataStream(jsonStream);// 5. 把注册为一个临时视图tableEnv.createTemporaryView("tableTmp", table);// 6.求每个商店的用户数Table table1 = tableEnv.sqlQuery("select shop_id,sum(num) as num,sum(gmv) as gmv from (select shop_id,user_id, 1 as num,sum(gmv) as gmv from (select SPLIT_INDEX(f0,',',0) as shop_id,SPLIT_INDEX(f0,',',1) as user_id,cast(SPLIT_INDEX(f0,',',2) as bigint) as gmv from tableTmp) t1 group by shop_id,user_id) t2 group by shop_id");// 7.打印tableEnv.toRetractStream(table1, Row.class).print(">>>>>>");// 8.执行env.execute("test");}}

sql:

selectshop_id,sum(num) as num,sum(gmv) as gmvfrom(selectshop_id,user_id,1 as num,sum(gmv) as gmvfrom(selectSPLIT_INDEX(f0, ',', 0) as shop_id,SPLIT_INDEX(f0, ',', 1) as user_id,cast(SPLIT_INDEX(f0, ',', 2) as bigint) as gmvfromtableTmp) t1group byshop_id,user_id) t2group byshop_id

3.运行结果

>>>>>>:7> (true,+U[shop-2, 2, 3])

>>>>>>:1> (true,+U[shop-1, 4, 8])

>>>>>>:7> (true,+I[shop-2, 1, 1])>>>>>>:1> (true,+I[shop-1, 1, 1])>>>>>>:1> (false,-U[shop-1, 1, 1])>>>>>>:7> (false,-U[shop-2, 1, 1])>>>>>>:1> (true,+U[shop-1, 2, 2])>>>>>>:7> (true,+U[shop-2, 2, 2])>>>>>>:1> (false,-U[shop-1, 2, 2])>>>>>>:7> (false,-U[shop-2, 2, 2])>>>>>>:1> (true,+U[shop-1, 1, 1])>>>>>>:7> (true,+U[shop-2, 1, 1])>>>>>>:1> (false,-U[shop-1, 1, 1])>>>>>>:7> (false,-U[shop-2, 1, 1])>>>>>>:7> (true,+U[shop-2, 2, 3])>>>>>>:1> (true,+U[shop-1, 2, 3])>>>>>>:1> (false,-U[shop-1, 2, 3])>>>>>>:1> (true,+U[shop-1, 3, 4])>>>>>>:1> (false,-U[shop-1, 3, 4])>>>>>>:1> (true,+U[shop-1, 2, 3])>>>>>>:1> (false,-U[shop-1, 2, 3])>>>>>>:1> (true,+U[shop-1, 3, 5])>>>>>>:1> (false,-U[shop-1, 3, 5])>>>>>>:1> (true,+U[shop-1, 2, 3])>>>>>>:1> (false,-U[shop-1, 2, 3])>>>>>>:1> (true,+U[shop-1, 3, 6])>>>>>>:1> (false,-U[shop-1, 3, 6])>>>>>>:1> (true,+U[shop-1, 4, 7])>>>>>>:1> (false,-U[shop-1, 4, 7])>>>>>>:1> (true,+U[shop-1, 3, 6])>>>>>>:1> (false,-U[shop-1, 3, 6])>>>>>>:1> (true,+U[shop-1, 4, 8])

4.原理

Flink回撤流原理