该题解实现2023国赛样题实时计算的第一题(如果需要历届样题的题解可以添加小编企鹅:2815619722(有偿),可以提供思路以及各种计算或者安装时、运行时的bug,如有侵权联系删除),兄弟们如果有其他关于实时计算的问题 或者实时计算组件问题可以评论询问小编
离线处理地址:
全国职业技能大赛高职组(最新职业院校技能大赛_大数据应用开发样题解析-模块B:数据采集-任务一:离线数据采集-CSDN博客
编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail,同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此列,则使用create_time填充,允许数据延迟5s)。
子任务1描述:
使用Flink消费Kafka中的数据,统计商城实时订单实收金额,将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
任务分析:
- 使用的事件时间,且需要使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime
- 若operate_time为空值或无此列,则使用create_time填充
- 并且需要设置数据的延迟为5s
- 样卷任务说明中,没有给出订单状态分类信息。我们假设订单状态分别为“1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成”,说明:因为官方给出的样本数据只包含一种订单状态”1005″,因此本案例基于官方的样本数据,对订单状态随机重新设置,使其包含取消订单、申请 退回、退回完成等多种不同状的订单。另外,并不是所有订单字段在实时计算中都需要,因此我们实现的数据生成器所生成的订单记录的字段做了适当简化。
- 但是这个任务样例并没有说明支付规则。如何统计实时订单实收金额?需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加。但是,还需要仔细考虑一些业务规则相关的问题。
例如,1001创建订单 -> 1002支付订单,应该是一笔实收金额吧?所以遇到1001状态的订单,不应该累加。
再例如,1001创建订单 -> 1003取消订单,应该没有实收金额吧?
所以个人认为,应采用如下策略:
- 当订单状态为”1002:支付订单”时,则累加订单金额。
- 当订单状态为”1005:申请退回”时,则累减订单金额。
- 其他订单状态,不予理会。
- 如果当前订单下单后又申请退回则需要减去之前累加的支付订单
- 因此,只好按照题意(而不必考虑与业务实际不符的问题):“若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加”,即有效订单姑且认为是”1001″,”1002″,”1004″。
样例数据如下
3503,慕容裕,13243876595,161.00,1001,3003,第3大街第38号楼6单元682门,描述352977,454427428146258,北纯精制黄小米(小黄米 月子米 小米粥 粗粮杂粮 大米伴侣)2.18kg等1件商品,2020-04-26 18:48:15,2020-04-26 18:55:17,28,16.00 3504,华策腾,13866060841,345.00,1001,614,第1大街第4号楼9单元118门,描述167911,331657196776268,迪奥(Dior)烈艳蓝金唇膏/口红 珊瑚粉 ACTRICE 028号 3.5g等2件商品,2020-04-26 18:48:15,2020-04-26 23:10:20,8,18.00
Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.connector.kafka.source.KafkaSourceimport org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.redis.RedisSinkimport org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimport org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import org.apache.flink.util.{Collector, StringUtils}import java.text.SimpleDateFormatimport java.time.Duration//flink需要手动添加隐式转换,implicit//import org.apache.flink.api.scala._object TaskJob11 {// 输入订单事件类型(只取订单编号和订单状态)case class OrderEvent(final_total_amount:Double, order_status: String)// watrmark 允许数据延迟时间val maxOutOfOrderness: Long = 5 // 秒def main(args: Array[String]): Unit = {// 设置流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置并行度,便于观察env.getConfig.setAutoWatermarkInterval(1000)// 设置水印周期env.enableCheckpointing(5000) // 启用检查点// kafka sourceval source = KafkaSource.builder[String].setBootstrapServers("192.168.190.133:9092")// 注意修改为自己的服务器IP.setTopics("order") //.setGroupId("group-ds").setStartingOffsets(OffsetsInitializer.latest())// .earliest().setValueOnlyDeserializer(new SimpleStringSchema).build// 定义redis sink的配置 (默认端口号6379)val conf = new FlinkJedisPoolConfig.Builder().setMaxTotal(1000).setMaxIdle(32).setTimeout(10*1000).setHost("192.168.190.133") // 注意修改为自己的服务器IP.setPort(6379).build()// 定义水印生成策略val watermarkStrategy = WatermarkStrategy// 处理无序数据,允许5s迟到.forBoundedOutOfOrderness[String](Duration.ofSeconds(maxOutOfOrderness))// 分配时间戳.withTimestampAssigner(new SerializableTimestampAssigner[String] {// 提取事件中的create_time和operate_time中较大的时间作为时间戳override def extractTimestamp(element: String, recordTimestamp: Long): Long = {// 2020-04-26 18:48:15,2020-04-26 18:55:17,val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") // 日期解析器val arr = element.split(",") // 以逗号来分隔传入的字符串val create_time = fm.parse(arr(10)).getTime // 解析create_time字段为long值// 处理operate_time字段:如果为空,则用create_time填充,否则解析为long值val operate_time = if(StringUtils.isNullOrWhitespaceOnly(arr(11))) create_time else fm.parse(arr(11)).getTime// 取operate_time和create_time中的最大值返回scala.math.max(operate_time, create_time)}})// 流处理管道val orderStream = env// 指定Kafka数据源.fromSource(source, watermarkStrategy, "Kafka Source")// 分割字段.map(line => line.split(","))// 注意,有可能赛方不讲武德,数据流中混有不完整数据(比如,缺少字段).filter(arr => arr.length==14)// 转换为OrderEvent对象.map(arr => (OrderEvent(arr(3).toDouble,arr(4))) )// 分区(到同一个分区,以便计算累加).keyBy(r => 1)// 执行底层处理函数,累加金额.process(new OrderKeyedProcessFunction)orderStream.print()// data sinkorderStream.addSink(new RedisSink[Double](conf, new RedisSinkMapper))// execute programenv.execute("Flink Streaming Task01")}// 自定义KeyedProcessFunction函数,泛型类型[key, input event, output event]class OrderKeyedProcessFunction extends KeyedProcessFunction[Int, OrderEvent, Double] {// 定义一个状态变量来保存最后的累加金额(由此函数所维护的存储状态)private var lastOrderAmount: ValueState[Double] = _// 初始化override def open(parameters: Configuration): Unit = {lastOrderAmount = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastAmount", classOf[Double]))}// 处理每个元素时调用的方法override def processElement(value:OrderEvent, // 当前传入的事件元素(要被处理的元素)ctx: KeyedProcessFunction[Int, OrderEvent, Double]#Context,out: Collector[Double]): Unit = {// 访问状态值:当前累加总金额var accAmount:Double = lastOrderAmount.valueprintln("accAmount = " + accAmount + ", final_total_amount = " + value.final_total_amount)// 将当前订单记录的金额累加到上一次的总金额上// 假设有效订单为: Array("1001","1002","1004")if(Array("1001","1002","1004").contains(value.order_status)){accAmount = accAmount + value.final_total_amount// 将当前订单的实付金额累加到 累加总金额上out.collect(accAmount)// 发送到下游lastOrderAmount.update(accAmount) // 更新状态}}}// redisMap接口,设置key和value// Redis Sink 核心类是 RedisMappe 接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法class RedisSinkMapper extends RedisMapper[Double] {// getCommandDescription:设置数据使用的数据结构,并设置key的名称override def getCommandDescription: RedisCommandDescription = {// RedisCommand.SET 指定存储类型new RedisCommandDescription(RedisCommand.SET)}/** * 获取 value值 value的数据是键值对 * * @param data * @return *///指定key// 查看所有key:keys * 查看指定key:get totalcountoverride def getKeyFromData(event: Double): String = "totalprice"// 指定valueoverride def getValueFromData(event: Double): String = event.toString}}