Spark读写MySQL数据库

文章目录

    • Spark读写MySQL数据库
      • 一、读取数据库
        • (一)通过RDD的方式读取MySQL数据库
        • (二)通过DataFrame的方式读取MySQL数据库
      • 二、添加数据到MySQL
        • (一)通过RDD的方式插入数据到MySQL
        • (二)通过RDD的方式插入数据到MySQL 2
        • (三)使用DataFrame插入数据到MySQL

一、读取数据库

(一)通过RDD的方式读取MySQL数据库

四要素:驱动、连接地址、账号密码

import org.apache.spark.rdd.JdbcRDDimport org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/** * 使用RDD读取MySQL数据库 */object spark_read_mysql {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_read_mysql") //程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//连接信息val url = "jdbc:mysql://192.168.80.145:3306/test"//用户名val username = "root"//密码val password = "123456"//具体的SQL查询语句val sql = "select * from t_user where id>=? and id<=?"//查询val rsRDD = new JdbcRDD(sc,()=>{//加载驱动Class.forName(driver)//创建和MySQL数据库的连接DriverManager.getConnection(url,username,password)},//需要执行的SQL语句sql,//查询的开始行1,//查询的结束行20,//运行几个分区执行2,//返回值的处理(将返回值变为RDD的元素),数字从1开始,表示字段的编号rs => (rs.getInt(1),rs.getString(2),rs.getInt(3)))//将RDD的元素打印在终端rsRDD.collect().foreach(println)sc.stop()}}
(二)通过DataFrame的方式读取MySQL数据库
import org.apache.spark.sql.SparkSession/** * 使用DataFrame读取MySQL数据库 */object spark_read_mysql2 {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]")//指定运行的方式.appName("spark_read_mysql2")//程序的名字.getOrCreate()//创建DataFrameval jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接.option("driver","com.mysql.cj.jdbc.Driver")//指定驱动.option("user","root")//指定连接的用户.option("password","123456")//指定连接的用户的密码.option("dbtable","t_user")//查询的表.load()//加载数据库表//在终端显示DataFrame的内容jdbcDF.show()}}

二、添加数据到MySQL

(一)通过RDD的方式插入数据到MySQL

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/** * 使用RDD插入数据到MySQL,RDD的每个元素都会执行一次创建连接和关闭连接 */object spark_write_mysql {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_write_mysql") //程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//连接信息//?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"//用户名val username = "root"//密码val password = "123456"//创建RDDval rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每条元素,将元素插入MySQL;一个元素执行一次创建连接和插入和关闭连接rdd.foreach {case (name,age) =>{//加载驱动Class.forName(driver)//创建和MySQL的链接val conn = DriverManager.getConnection(url,username,password)//添加的SQL语句val sql = "insert into t_user(name,age) values(?,?)"//给SQL语句配置参数val ps = conn.prepareStatement(sql)//根据参数的类型配置参数ps.setString(1,name)ps.setInt(2,age)//执行SQL语句ps.executeUpdate()//关闭连接ps.close()conn.close()}}sc.stop()}}
(二)通过RDD的方式插入数据到MySQL 2

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/** * 使用RDD插入数据到MySQL,RDD的每个分区执行一次创建连接和关闭连接;推荐 */object spark_write_mysql2 {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_write_mysql2") //程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//连接信息//?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"//用户名val username = "root"//密码val password = "123456"//创建RDDval rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每个分区,将元素插入MySQL;一个分区执行一次创建连接和关闭连接rdd.foreachPartition {datas =>{//加载驱动Class.forName(driver)//创建和MySQL的链接val conn = DriverManager.getConnection(url,username,password)//添加的SQL语句val sql = "insert into t_user(name,age) values(?,?)"//给SQL语句配置参数val ps = conn.prepareStatement(sql)//根据参数的类型配置参数datas.foreach{case (name,age)=>{ps.setString(1,name)ps.setInt(2,age)//执行SQL语句ps.executeUpdate()}}//关闭连接ps.close()conn.close()}}sc.stop()}}
(三)使用DataFrame插入数据到MySQL
import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/** * 使用DataFrame插入数据到MySQL */object spark_write_mysql3 {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_write_mysql3") //程序的名字.getOrCreate()//1.创建DataFrame//1.1 schemaval schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true)))//1.2 行rows//1.2.1 创建RDDval dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20)))//1.2.2 创建rowsval rows = dataRDD.map(x=>Row(x(0),x(1)))//1.3 拼接表头(schema)和行内容(rows)val df = spark.createDataFrame(rows,schema)//2.通过DataFrame插入数据到MySQL//如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表,需要注意表不能存在//df.write.mode("append"),是以追加的方式将数据写入到已经存在的表中df.write.format("jdbc").option("url", "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8") //指定连接.option("driver", "com.mysql.cj.jdbc.Driver") //指定驱动.option("user", "root") //指定连接的用户.option("password", "123456") //指定连接的用户的密码.option("dbtable", "t_user2") //查询的表.save()//保存数据}}