最近的项目用spark做离线计算,所以有用到一些操作,简单笔记一下
1.Dataset纵向复制数据
当一个dataset中数据量太少,不能有效的随机联查别的dataset时,需要将数据纵向复制,把数据量撑起来。可以采用两种方式:
第一种:
userDataset.unionAll(userDataset).unionAll(userDataset)
这种方式在少量复制时可以用,若要大量复制,则会创建大量dataset对象,且代码写起来也费劲。
第二种:
int[] array = IntStream.range(0, 30).toArray();userDataset= userDataset.withColumn("row_num", functions.explode(functions.lit(array))).drop(functions.col("row_num"));
这种方式由range(0, 30)这个来控制复制的次数,好使一点
示例:
// String env = "yarn";String env = "local[*]";SparkSession sparkSession = SparkSession.builder().appName(appName).master(env ).getOrCreate();hdfsPath = "file:///C:\\\\\\\\recall.csv";Dataset<Row> dataset = createRealView(session, hdfsPath);dataset.show(200);String randStr = String.valueOf((int) ((Math.random() * 9 + 1) * Math.pow(10, 5)));dataset = dataset.withColumn("group_name", functions.lit(RecallTaskEnum.taskName(taskNum)));dataset = dataset.withColumn("group_id",functions.lit(randStr));dataset = dataset.withColumn("creater",functions.lit("sparkTask"));dataset = dataset.withColumn("updater",functions.lit("sparkTask"));dataset = dataset.withColumn("del_flag",functions.lit(1));dataset = dataset.withColumn("created_time",functions.now());dataset = dataset.withColumn("update_time",functions.now());dataset.show(200);int[] array = IntStream.range(0, 10).toArray();dataset = dataset.withColumn("dummy", functions.explode(functions.lit(array))).drop(functions.col("dummy"));dataset.show(200);
其中, recall.csv
client_id,score7056,27057,127058,1200212121,1100212122,100212123,100
那么三次的输出结果为:
+---------+-----+|client_id|score|+---------+-----+| 7056|2|| 7057| 12|| 7058| 1200|| 212121| 1100|| 212122|100|| 212123|100|+---------+-----++---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+|client_id|score|group_name|group_id|creater|updater|del_flag|created_time| update_time|+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------++---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+|client_id|score|group_name|group_id|creater|updater|del_flag|created_time| update_time|+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7056|2|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7057| 12|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 7058| 1200|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212121| 1100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212122|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|| 212123|100|同业存单人群圈选|728804|sparkTask|sparkTask| 1|2023-12-07 15:02:...|2023-12-07 15:02:...|+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
2.创建指定结构的空Dataset
在项目开发中,如果需要读取csv文件或者读取MySQL创建Dataset时,有可能出现异常,为避免服务雪崩,那么可以使用创建指定相同结构的空Dataset去参与计算。
例如:
public static Dataset<Row> getDataByPartition(SparkSession session, String table, String partColumn, String selectColumns, String condition) {Map<String, String> dbOptions = new HashMap<>();dbOptions.put("driver", envMap.get("DRIVER"));dbOptions.put("url", envMap.get("URL"));dbOptions.put("user", envMap.get("USER"));dbOptions.put("password", envMap.get("PASSWORD"));//查询需要的数据String dataSelect = String.format("select %s from %s ", selectColumns, table);dataSelect += condition;Map<String, String> options = new HashMap<>();options.put("query", dataSelect);options.put("numPartitions", String.valueOf(EnvConstant.PARTITION_NUM));options.putAll(dbOptions);eturn session.read().format("jdbc").options(options).load();
这样分区读数据库来创建视图,有可能会产生异常,程序可能会崩溃,加入一下处理
// 逗号分隔格式:selectColumns = "client_id, score, group_name, group_id, creater, updater|del_flag, created_time, update_time";public static Dataset<Row> generateEmptyDataset(SparkSession session, String selectColumns) {StructType schema = new StructType();if (StringUtils.isEmpty(selectColumns)) {schema = schema.add(new StructField("emptyDataset", DataTypes.StringType, false, Metadata.empty()));return session.createDataset(Collections.emptyList(), Encoders.row(schema));}String[] split = selectColumns.split(",");for (String str : split) {String res = str.trim();schema = schema.add(new StructField(res, DataTypes.StringType, false, Metadata.empty()));}return session.createDataset(Collections.emptyList(), Encoders.row(schema));}
根据传入的结构,动态创建schema,从而动态生成Dataset
修改后:
public static Dataset<Row> getDataByPartition(SparkSession session, String table, String partColumn, String selectColumns, String condition) {Map<String, String> dbOptions = new HashMap<>();dbOptions.put("driver", envMap.get("DRIVER"));dbOptions.put("url", envMap.get("URL"));dbOptions.put("user", envMap.get("USER"));dbOptions.put("password", envMap.get("PASSWORD"));//查询需要的数据String dataSelect = String.format("select %s from %s ", selectColumns, table);dataSelect += condition;Map<String, String> options = new HashMap<>();options.put("query", dataSelect);options.put("numPartitions", String.valueOf(EnvConstant.PARTITION_NUM));options.putAll(dbOptions);try {int i = 1/0;return session.read().format("jdbc").options(options).load();} catch (Exception e) {System.out.println("分区查询数据异常:" + e.getMessage());Dataset<Row> emptyDataset = generateEmptyDataset(session, selectColumns);emptyDataset.show();return emptyDataset;}}
当我设置int i = 1/0产生异常后,创建空dataset
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+|client_id|score|group_name|group_id|creater|updater|del_flag|created_time| update_time|+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+| | ||| | ||||+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+