SparkSQL数据的加载与保存

1 读取和保存文件

SparkSQL读取和保存的文件一般为三种,JSON文件、CSV文件和列式存储的文件,同时可以通过添加参数,来识别不同的存储和压缩格式。

1.1CSV文件

1)代码实现

packagecom.atguigu.sparksql;

importcom.atguigu.sparksql.Bean.User;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.function.MapFunction;

importorg.apache.spark.sql.*;

publicclassTest06_CSV{

publicstaticvoidmain(String[]args)throwsClassNotFoundException{

//1.创建配置对象

SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);

//2.获取sparkSession

SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();

//3.编写代码

DataFrameReaderreader=spark.read();

//添加参数读取csv

DatasetuserDS=reader

.option(“header”,”true”)//默认为false不读取列名

.option(“sep”,”,”)//默认为,列的分割

//不需要写压缩格式自适应

.csv(“input/user.csv”);

userDS.show();

//转换为user的ds

//直接转换类型会报错csv读取的数据都是string

//DatasetuserDS1=userDS.as(Encoders.bean(User.class));

userDS.printSchema();

DatasetuserDS1=userDS.map(newMapFunction(){

@Override

publicUsercall(Rowvalue)throwsException{

returnnewUser(Long.valueOf(value.getString(0)),value.getString(1));

}

},Encoders.bean(User.class));

userDS1.show();

//写出为csv文件

DataFrameWriterwriter=userDS1.write();

writer.option(“header”,”;”)

.option(“header”,”true”)

//.option(“compression”,”gzip”)//压缩格式

//写出模式

//append追加

//Ignore忽略本次写出

//Overwrite覆盖写

//ErrorIfExists如果存在报错

.mode(SaveMode.Append)

.csv(“output”);

//4.关闭sparkSession

spark.close();

}

}

1.2JSON文件

packagecom.atguigu.sparksql;

importcom.atguigu.sparksql.Bean.User;

importorg.apache.spark.SparkConf;

importorg.apache.spark.sql.*;

publicclassTest07_JSON{

publicstaticvoidmain(String[]args){

//1.创建配置对象

SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);

//2.获取sparkSession

SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();

//3.编写代码

Datasetjson=spark.read().json(“input/user.json”);

//json数据可以读取数据的数据类型

DatasetuserDS=json.as(Encoders.bean(User.class));

userDS.show();

//读取别的类型的数据也能写出为json

DataFrameWriterwriter=userDS.write();

writer.json(“output1”);

//4.关闭sparkSession

spark.close();

}

}

1.3Parquet文件

列式存储的数据自带列分割。

packagecom.atguigu.sparksql;

importorg.apache.spark.SparkConf;

importorg.apache.spark.sql.Dataset;

importorg.apache.spark.sql.Row;

importorg.apache.spark.sql.SparkSession;

publicclassTest08_Parquet{

publicstaticvoidmain(String[]args){

//1.创建配置对象

SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);

//2.获取sparkSession

SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();

//3.编写代码

Datasetjson=spark.read().json(“input/user.json”);

//写出默认使用snappy压缩

//json.write().parquet(“output”);

//读取parquet自带解析能够识别列名

Datasetparquet=spark.read().parquet(“output”);

parquet.printSchema();

//4.关闭sparkSession

spark.close();

}

}

2与MySQL交互

1)导入依赖

mysql

mysql-connector-java

5.1.27

2)从MySQL读数据

packagecom.atguigu.sparksql;

importorg.apache.spark.SparkConf;

importorg.apache.spark.sql.Dataset;

importorg.apache.spark.sql.Row;

importorg.apache.spark.sql.SparkSession;

importjava.util.Properties;

publicclassTest09_Table{

publicstaticvoidmain(String[]args){

//1.创建配置对象

SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);

//2.获取sparkSession

SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();

//3.编写代码

Datasetjson=spark.read().json(“input/user.json”);

//添加参数

Propertiesproperties=newProperties();

properties.setProperty(“user”,”root”);

properties.setProperty(“password”,”000000″);

//json.write()

////写出模式针对于表格追加覆盖

//.mode(SaveMode.Append)

//.jdbc(“jdbc:mysql://hadoop102:3306″,”gmall.testInfo”,properties);

Datasetjdbc=spark.read().jdbc(“jdbc:mysql://hadoop102:3306″,”gmall.testInfo”,properties);

jdbc.show();

//4.关闭sparkSession

spark.close();

}

}

3与Hive交互

SparkSQL可以采用内嵌Hive(spark开箱即用的hive),也可以采用外部Hive。企业开发中,通常采用外部Hive。

3.1Linux中的交互

1)添加MySQL连接驱动到sparkyarn的jars目录

[atguigu@hadoop102spark-yarn]$cp/opt/software/mysql-connector-java-5.1.27-bin.jar/opt/module/spark-yarn/jars

2)添加hivesite.xml文件到sparkyarn的conf目录

[atguigu@hadoop102spark-yarn]$cp/opt/module/hive/conf/hive-site.xml/opt/module/spark-yarn/conf

3)启动sparksql的客户端即可

[atguigu@hadoop102spark-yarn]$bin/spark-sql–masteryarn

spark-sql(default)>showtables;

3.2IDEA中的交互

1)添加依赖

org.apache.spark

spark-sql_2.12

3.1

mysql

mysql-connector-java

5.1.27

org.apache.spark

spark-hive_2.12

3.1

org.projectlombok

lombok

1.18.22

2)拷贝hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml)

3)代码实现

packagecom.atguigu.sparksql;

importorg.apache.spark.SparkConf;

importorg.apache.spark.sql.SparkSession;

publicclassTest10_Hive{

publicstaticvoidmain(String[]args){

System.setProperty(“HADOOP_USER_NAME”,”atguigu”);

//1.创建配置对象

SparkConfconf=newSparkConf().setAppName(“sparksql”).setMaster(“local[*]”);

//2.获取sparkSession

SparkSessionspark=SparkSession.builder()

.enableHiveSupport()//添加hive支持

.config(conf).getOrCreate();

//3.编写代码

spark.sql(“showtables”).show();

spark.sql(“createtableuser_info(nameString,agebigint)”);

spark.sql(“insertintotableuser_infovalues(‘zhangsan’,10)”);

spark.sql(“select*fromuser_info”).show();

//4.关闭sparkSession

spark.close();

}

}

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享