Scala编写Spark的WorkCount创建一个Maven项目
在pom.xml中添加依赖和插件
8 8 UTF-8 3.2.3 2.12.15 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.12 ${spark.version} nexus-aliyun Nexus aliyun default http://maven.aliyun.com/nexus/content/groups/public false never true never ali-plugin http://maven.aliyun.com/nexus/content/groups/public/ false never true never net.alchim31.maven scala-maven-plugin 3.2.2 org.apache.maven.plugins maven-compiler-plugin 3.5.1 net.alchim31.maven scala-maven-plugin scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.apache.maven.plugins maven-compiler-plugin compile compile org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
创建一个scala目录
选择scala目录,右键,将目录转成源码包,或者点击maven的刷新按钮
编写Spark程序
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 1.创建SparkContext * 2.创建RDD * 3.调用RDD的Transformation(s)方法 * 4.调用Action * 5.释放资源 */object WordCount { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("WordCount") //创建SparkContext,使用SparkContext来创建RDD val sc: SparkContext = new SparkContext(conf) //spark写Spark程序,就是对抽象的神奇的大集合【RDD】编程,调用它高度封装的API //使用SparkContext创建RDD val lines: RDD[String] = sc.textFile(args(0)) //Transformation 开始 // //切分压平 val words: RDD[String] = lines.flatMap(_.split(" ")) //将单词和一组合放在元组中 val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) //分组聚合,reduceByKey可以先局部聚合再全局聚合 val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //排序 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) //Transformation 结束 // //调用Action将计算结果保存到HDFS中 sorted.saveAsTextFile(args(1)) //释放资源 sc.stop() }}
使用maven打包
提交任务
•上传jar包到服务器,然后使用sparksubmit命令提交任务
/bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \--master spark://node-1.51doit.cn:7077 \--executor-memory 1g --total-executor-cores 4 \--class cn._51doit.spark.day01.WordCount \/root/spark-in-action-1.0.jar hdfs://node-1.51doit.cn:9000/words.txt hdfs://node-1.51doit.cn:9000/out 参数说明:--master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口--executor-memory 指定每一个executor的使用的内存大小--total-executor-cores指定整个application总共使用了cores--class 指定程序的main方法全类名jar包路径 args0 args1
Java编写Spark的WordCount使用匿名实现类方式
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;public class JavaWordCount { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); //创建JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(sparkConf); //使用JavaSparkContext创建RDD JavaRDD lines = jsc.textFile(args[0]); //调用Transformation(s) //切分压平 JavaRDD words = lines.flatMap(new FlatMapFunction() { @Override public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); //将单词和一组合在一起 JavaPairRDD wordAndOne = words.mapToPair( new PairFunction() { @Override public Tuple2 call(String word) throws Exception { return Tuple2.apply(word, 1); } }); //分组聚合 JavaPairRDD reduced = wordAndOne.reduceByKey( new Function2() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //排序,先调换KV的顺序VK JavaPairRDD swapped = reduced.mapToPair( new PairFunction<Tuple2, Integer, String>() { @Override public Tuple2 call(Tuple2 tp) throws Exception { return tp.swap(); } }); //再排序 JavaPairRDD sorted = swapped.sortByKey(false); //再调换顺序 JavaPairRDD result = sorted.mapToPair( new PairFunction<Tuple2, String, Integer>() { @Override public Tuple2 call(Tuple2 tp) throws Exception { return tp.swap(); } }); //触发Action,将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); }}
使用Lambda表达式方式
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;import java.util.Arrays;public class JavaLambdaWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount"); //创建SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); //创建RDD JavaRDD lines = jsc.textFile(args[0]); //切分压平 JavaRDD words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); //将单词和一组合 JavaPairRDD wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1)); //分组聚合 JavaPairRDD reduced = wordAndOne.reduceByKey((a, b) -> a + b); //调换顺序 JavaPairRDD swapped = reduced.mapToPair(tp -> tp.swap()); //排序 JavaPairRDD sorted = swapped.sortByKey(false); //调换顺序 JavaPairRDD result = sorted.mapToPair(tp -> tp.swap()); //将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); }}
本地运行Spark和Debug
spark程序每次都打包上在提交到集群上比较麻烦且不方便调试,Spark还可以进行Local模式运行,方便测试和调试
在本地运行
//Spark程序local模型运行,local[*]是本地运行,并开启多个线程val conf: SparkConf = new SparkConf() .setAppName("WordCount") .setMaster("local[*]") //设置为local模式执行
并输入运行参数
hdfs://linux01:9000/words.txt hdfs://linux01:9000/out/out01
读取HDFS中的数据
由于往HDFS中的写入数据存在权限问题,所以在代码中设置用户为HDFS目录的所属用户
//往HDFS中写入数据,将程序的所属用户设置成更HDFS一样的用户System.setProperty("HADOOP_USER_NAME", "root")