1 读取和保存文件
SparkSQL读取和保存的文件一般为三种,JSON文件、CSV文件和列式存储的文件,同时可以通过添加参数,来识别不同的存储和压缩格式。
1.1CSV文件
1)代码实现
packagecom.atguigu.sparksql;
importcom.atguigu.sparksql.Bean.User;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.function.MapFunction;
importorg.apache.spark.sql.*;
publicclassTest06_CSV{
publicstaticvoidmain(String[]args)throwsClassNotFoundException{
//1.创建配置对象
SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);
//2.获取sparkSession
SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();
//3.编写代码
DataFrameReaderreader=spark.read();
//添加参数读取csv
DatasetuserDS=reader
.option(“header”,”true”)//默认为false不读取列名
.option(“sep”,”,”)//默认为,列的分割
//不需要写压缩格式自适应
.csv(“input/user.csv”);
userDS.show();
//转换为user的ds
//直接转换类型会报错csv读取的数据都是string
//DatasetuserDS1=userDS.as(Encoders.bean(User.class));
userDS.printSchema();
DatasetuserDS1=userDS.map(newMapFunction(){
@Override
publicUsercall(Rowvalue)throwsException{
returnnewUser(Long.valueOf(value.getString(0)),value.getString(1));
}
},Encoders.bean(User.class));
userDS1.show();
//写出为csv文件
DataFrameWriterwriter=userDS1.write();
writer.option(“header”,”;”)
.option(“header”,”true”)
//.option(“compression”,”gzip”)//压缩格式
//写出模式
//append追加
//Ignore忽略本次写出
//Overwrite覆盖写
//ErrorIfExists如果存在报错
.mode(SaveMode.Append)
.csv(“output”);
//4.关闭sparkSession
spark.close();
}
}
1.2JSON文件
packagecom.atguigu.sparksql;
importcom.atguigu.sparksql.Bean.User;
importorg.apache.spark.SparkConf;
importorg.apache.spark.sql.*;
publicclassTest07_JSON{
publicstaticvoidmain(String[]args){
//1.创建配置对象
SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);
//2.获取sparkSession
SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();
//3.编写代码
Datasetjson=spark.read().json(“input/user.json”);
//json数据可以读取数据的数据类型
DatasetuserDS=json.as(Encoders.bean(User.class));
userDS.show();
//读取别的类型的数据也能写出为json
DataFrameWriterwriter=userDS.write();
writer.json(“output1”);
//4.关闭sparkSession
spark.close();
}
}
1.3Parquet文件
列式存储的数据自带列分割。
packagecom.atguigu.sparksql;
importorg.apache.spark.SparkConf;
importorg.apache.spark.sql.Dataset;
importorg.apache.spark.sql.Row;
importorg.apache.spark.sql.SparkSession;
publicclassTest08_Parquet{
publicstaticvoidmain(String[]args){
//1.创建配置对象
SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);
//2.获取sparkSession
SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();
//3.编写代码
Datasetjson=spark.read().json(“input/user.json”);
//写出默认使用snappy压缩
//json.write().parquet(“output”);
//读取parquet自带解析能够识别列名
Datasetparquet=spark.read().parquet(“output”);
parquet.printSchema();
//4.关闭sparkSession
spark.close();
}
}
2与MySQL交互
1)导入依赖
mysql
mysql-connector-java
5.1.27
2)从MySQL读数据
packagecom.atguigu.sparksql;
importorg.apache.spark.SparkConf;
importorg.apache.spark.sql.Dataset;
importorg.apache.spark.sql.Row;
importorg.apache.spark.sql.SparkSession;
importjava.util.Properties;
publicclassTest09_Table{
publicstaticvoidmain(String[]args){
//1.创建配置对象
SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);
//2.获取sparkSession
SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();
//3.编写代码
Datasetjson=spark.read().json(“input/user.json”);
//添加参数
Propertiesproperties=newProperties();
properties.setProperty(“user”,”root”);
properties.setProperty(“password”,”000000″);
//json.write()
////写出模式针对于表格追加覆盖
//.mode(SaveMode.Append)
//.jdbc(“jdbc:mysql://hadoop102:3306″,”gmall.testInfo”,properties);
Datasetjdbc=spark.read().jdbc(“jdbc:mysql://hadoop102:3306″,”gmall.testInfo”,properties);
jdbc.show();
//4.关闭sparkSession
spark.close();
}
}
3与Hive交互
SparkSQL可以采用内嵌Hive(spark开箱即用的hive),也可以采用外部Hive。企业开发中,通常采用外部Hive。
3.1Linux中的交互
1)添加MySQL连接驱动到spark–yarn的jars目录
[atguigu@hadoop102spark-yarn]$cp/opt/software/mysql-connector-java-5.1.27-bin.jar/opt/module/spark-yarn/jars
2)添加hive–site.xml文件到spark–yarn的conf目录
[atguigu@hadoop102spark-yarn]$cp/opt/module/hive/conf/hive-site.xml/opt/module/spark-yarn/conf
3)启动spark–sql的客户端即可
[atguigu@hadoop102spark-yarn]$bin/spark-sql–masteryarn
spark-sql(default)>showtables;
3.2IDEA中的交互
1)添加依赖
org.apache.spark
spark-sql_2.12
3.1
mysql
mysql-connector-java
5.1.27
org.apache.spark
spark-hive_2.12
3.1
org.projectlombok
lombok
1.18.22
2)拷贝hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml)
3)代码实现
packagecom.atguigu.sparksql;
importorg.apache.spark.SparkConf;
importorg.apache.spark.sql.SparkSession;
publicclassTest10_Hive{
publicstaticvoidmain(String[]args){
System.setProperty(“HADOOP_USER_NAME”,”atguigu”);
//1.创建配置对象
SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);
//2.获取sparkSession
SparkSessionspark=SparkSession.builder()
.enableHiveSupport()//添加hive支持
.config(conf).getOrCreate();
//3.编写代码
spark.sql(“showtables”).show();
spark.sql(“createtableuser_info(nameString,agebigint)”);
spark.sql(“insertintotableuser_infovalues(‘zhangsan’,10)”);
spark.sql(“select*fromuser_info”).show();
//4.关闭sparkSession
spark.close();
}
}