4. 启动Spark Shell编程
4.1 什么是Spark Shell
spark shell是spark中的交互式命令行客户端,可以在spark shell中使用scala编写spark程序,启动后默认已经创建了SparkContext,别名为sc
4.2 启动Spark Shell
Shell /opt/apps/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \ –master spark://node-1.51doit.cn:7077 –executor-memory 1g \ –total-executor-cores 3
如果Master配置了HA高可用,需要指定两个Master(因为这两个Master任意一个都可能是Active状态)
Shell /bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \ –master spark://node-1.51doit.cn:7077,node-2.51doit.cn:7077 \ –executor-memory 1g \ –total-executor-cores 3
参数说明:
–master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口
–executor-memory 指定每一个executor的使用的内存大小
–total-executor-cores 指定整个application总共使用了cores
Shell sc.textFile(“hdfs://node-1.51doit.cn:9000/words.txt”).flatMap(_.split(” “)).map((_, 1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile(“hdfs://node-1.51doit.cn:9000/out”)
5. Spark编程入门
5.1 Scala编写Spark的WorkCount
5.1.1 创建一个Maven项目
5.1.2 在pom.xml中添加依赖和插件
XML
8 8 UTF-8 3.2.3 2.12.15
org.scala-lang scala-library ${scala.version}
org.apache.spark spark-core_2.12 ${spark.version}
nexus-aliyun Nexus aliyun default http://maven.aliyun.com/nexus/content/groups/public
false never
true never
ali-plugin http://maven.aliyun.com/nexus/content/groups/public/
false never
true never
net.alchim31.maven scala-maven-plugin 3.2.2
org.apache.maven.plugins maven-compiler-plugin 3.5.1
net.alchim31.maven scala-maven-plugin
scala-compile-first process-resources
add-source compile
scala-test-compile process-test-resources
testCompile
org.apache.maven.plugins maven-compiler-plugin
compile
compile
org.apache.maven.plugins maven-shade-plugin 2.4.3
package
shade
*:*
META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
5.1.3 创建一个scala目录
选择scala目录,右键,将目录转成源码包,或者点击maven的刷新按钮
5.1.4 编写Spark程序
Scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
/** * 1.创建SparkContext * 2.创建RDD * 3.调用RDD的Transformation(s)方法 * 4.调用Action * 5.释放资源 */ object WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(“WordCount”) //创建SparkContext,使用SparkContext来创建RDD val sc: SparkContext = new SparkContext(conf) //spark写Spark程序,就是对抽象的神奇的大集合【RDD】编程,调用它高度封装的API //使用SparkContext创建RDD val lines: RDD[String] = sc.textFile(args(0))
//Transformation 开始 // //切分压平 val words: RDD[String] = lines.flatMap(_.split(” “)) //将单词和一组合放在元组中 val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) //分组聚合,reduceByKey可以先局部聚合再全局聚合 val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //排序 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) //Transformation 结束 //
//调用Action将计算结果保存到HDFS中 sorted.saveAsTextFile(args(1)) //释放资源 sc.stop() } }
5.1.5 使用maven打包
使用idea图形界面打包:
5.1.6 提交任务
上传jar包到服务器,然后使用sparksubmit命令提交任务 Shell /bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \ –master spark://node-1.51doit.cn:7077 \ –executor-memory 1g –total-executor-cores 4 \ –class cn._51doit.spark.day01.WordCount \ /root/spark-in-action-1.0.jar hdfs://node-1.51doit.cn:9000/words.txt hdfs://node-1.51doit.cn:9000/out
参数说明:
–master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口
–executor-memory 指定每一个executor的使用的内存大小
–total-executor-cores 指定整个application总共使用了cores
–class 指定程序的main方法全类名
jar包路径 args0 args1
5.2 Java编写Spark的WordCount
5.2.1 使用匿名实现类方式
Java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;
import java.util.Arrays; import java.util.Iterator;
public class JavaWordCount {
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName(“JavaWordCount”); //创建JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(sparkConf); //使用JavaSparkContext创建RDD JavaRDD lines = jsc.textFile(args[0]); //调用Transformation(s) //切分压平 JavaRDD words = lines.flatMap(new FlatMapFunction() { @Override public Iterator call(String line) throws Exception { return Arrays.asList(line.split(” “)).iterator(); } }); //将单词和一组合在一起 JavaPairRDD wordAndOne = words.mapToPair( new PairFunction() { @Override public Tuple2 call(String word) throws Exception { return Tuple2.apply(word, 1); } }); //分组聚合 JavaPairRDD reduced = wordAndOne.reduceByKey( new Function2() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //排序,先调换KV的顺序VK JavaPairRDD swapped = reduced.mapToPair( new PairFunction<Tuple2, Integer, String>() { @Override public Tuple2 call(Tuple2 tp) throws Exception { return tp.swap(); } }); //再排序 JavaPairRDD sorted = swapped.sortByKey(false); //再调换顺序 JavaPairRDD result = sorted.mapToPair( new PairFunction<Tuple2, String, Integer>() { @Override public Tuple2 call(Tuple2 tp) throws Exception { return tp.swap(); } }); //触发Action,将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); } }
5.2.2 使用Lambda表达式方式
Java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;
import java.util.Arrays;
public class JavaLambdaWordCount {
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(“JavaLambdaWordCount”); //创建SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); //创建RDD JavaRDD lines = jsc.textFile(args[0]); //切分压平 JavaRDD words = lines.flatMap(line -> Arrays.asList(line.split(” “)).iterator()); //将单词和一组合 JavaPairRDD wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1)); //分组聚合 JavaPairRDD reduced = wordAndOne.reduceByKey((a, b) -> a + b); //调换顺序 JavaPairRDD swapped = reduced.mapToPair(tp -> tp.swap()); //排序 JavaPairRDD sorted = swapped.sortByKey(false); //调换顺序 JavaPairRDD result = sorted.mapToPair(tp -> tp.swap()); //将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); } }
5.3 本地运行Spark和Debug
spark程序每次都打包上在提交到集群上比较麻烦且不方便调试,Spark还可以进行Local模式运行,方便测试和调试
5.3.1 在本地运行
Scala //Spark程序local模型运行,local[*]是本地运行,并开启多个线程 val conf: SparkConf = new SparkConf() .setAppName(“WordCount”) .setMaster(“local[*]”) //设置为local模式执行
5.3.2 读取HDFS中的数据
由于往HDFS中的写入数据存在权限问题,所以在代码中设置用户为HDFS目录的所属用户
Scala //往HDFS中写入数据,将程序的所属用户设置成更HDFS一样的用户 System.setProperty(“HADOOP_USER_NAME”, “root”)
5.4 使用PySpark(选学)
5.4.1 配置python环境
① 在所有节点上按照python3,版本必须是python3.6及以上版本
Shell yum install -y python3
② 修改所有节点的环境变量
Shell export JAVA_HOME=/usr/local/jdk1.8.0_251export PYSPARK_PYTHON=python3 export HADOOP_HOME=/bigdata/hadoop-3.2.1 export HADOOP_CONF_DIR=/bigdata/hadoop-3.2.1/etc/hadoop export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
5.4.2 使用pyspark shell
Shell /bigdata/spark-3.2.3-bin-hadoop3.2/bin/pyspark \ –master spark://node-1.51doit.cn:7077 \ –executor-memory 1g –total-executor-cores 10
在pyspark shell使用python编写wordcount
Python sc.textFile(“hdfs://node-1.51doit.cn:8020/data/wc”).flatMap(lambda line: line.split(‘ ‘)).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda t: t[1], False).saveAsTextFile(‘hdfs://node-1.51doit.cn:8020/out01’)
5.4.3 配置PyCharm开发环境
①配置python的环境
②配置pyspark的依赖
点击Project Structure将Spark安装包下python/lib目录的py4j-*-src.zip和pyspark.zip添加进来
③添加环境变量
点击Edit Configuration
在pycharm中使用python编写wordcount
Python from pyspark import SparkConf, SparkContext
if __name__ == ‘__main__’: conf = SparkConf().setAppName(‘WordCount’).setMaster(‘local[*]’) sc = SparkContext(conf=conf) lines = sc.textFile(‘file:///Users/star/Desktop/data.txt’) words = lines.flatMap(lambda line: line.split(‘ ‘)) wordAndOne = words.map(lambda word: (word, 1)) reduced = wordAndOne.reduceByKey(lambda x, y: x + y) result = reduced.sortBy(lambda t: t[1], False) print(result.collect())