最近写了几个简单的spark structured streaming 的代码实例。 目的是熟悉spark 开发环境搭建, spark 代码开发流程。
开发环境:
系统:win 11
java : 1.8
scala:2.13
工具:idea 2022.2 ,maven 3, git 2.37
spark : 3.3.2
一, 使用 spark 结构化流读取文件数据,并做单词统计。
代码:
package org.example;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.streaming.OutputMode;import org.apache.spark.sql.streaming.StreamingQuery;import org.apache.spark.sql.streaming.StreamingQueryException;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructType;import java.util.concurrent.TimeoutException;public class Main { /* 例子:从文件中读取流, 被定义模式,生成dataset ,使用sql api 进行分析。 */ public static void main(String[] args) throws TimeoutException, StreamingQueryException { System.out.println("Hello world!"); SparkSession spark = SparkSession.builder().appName("spark streaming").config("spark.master", "local") .config("spark.sql.warehouse.dir", "file:///app/") .getOrCreate(); spark.sparkContext().setLogLevel("ERROR"); StructType schema = new StructType().add("empId", DataTypes.StringType).add("empName", DataTypes.StringType) .add("department", DataTypes.StringType); Dataset rawData = spark.readStream().option("header", false).format("csv").schema(schema) .csv("D:/za/spark_data/*.csv"); rawData.createOrReplaceTempView("empData"); Dataset result = spark.sql("select count(*), department from empData group by department"); StreamingQuery query = result.writeStream().outputMode("complete").format("console").start(); // 每次触发,全表输出 query.awaitTermination(); }}
输出:
二,使用 spark 结构化流读取socket流,做单词统计,使用Java编程
代码:
package org.example;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Encoders;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.streaming.StreamingQuery;import org.apache.spark.sql.streaming.StreamingQueryException;import java.util.Arrays;import java.util.concurrent.TimeoutException;public class SocketStreaming_wordcount { /* * 从socket 读取字符流,并做word count分析 * * */ public static void main(String[] args) throws TimeoutException, StreamingQueryException { SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .config("spark.master", "local") .getOrCreate(); // dataframe 表示 socket 字符流 Dataset lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load();// 把一行字符串切分为 单词 Dataset words = lines .as(Encoders.STRING()) .flatMap((FlatMapFunction) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());// 对单词分组计数 Dataset wordCounts = words.groupBy("value").count();// 开始查询并打印输出到console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); }}
输出:
二,使用 spark 结构化流读取socket流,做单词统计,使用scala 编程
代码:
package org.exampleimport org.apache.spark.sql.SparkSessionobject Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("streaming_socket_scala") .config("spark.master", "local") .getOrCreate() import spark.implicits._ // 创建datafram 象征从网络socket 接收流 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 切分一行成单词 val words = lines.as[String].flatMap(_.split(" ")) // 进行单词统计 val wordCounts = words.groupBy("value").count() // 开始查询并输出 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() }}
输出:
功能比较简单,代码比较简单,可以在网络上找到很多。 但是也是一个完整的spark结构流代码开发流程。权当熟悉下开发流程。
—一——步—–一 ——个—–脚——–印———-