2021-11-05
使用Spark SQL读写MySQL数据库1.在MySQL数据库中创建表在MySQL数据库中创建一个名称为spark的数据库,并创建一个名称为student的表,录入两条数据首先在linux中安装sql:sudo apt-get update #更新软件源sudo apt-get install mysql-server #安装mysqlservice mysql start #启动mysql确认
使用Spark SQL读写MySQL数据库
1.在MySQL数据库中创建表
在MySQL数据库中创建一个名称为spark的数据库,并创建一个名称为student的表,录入两条数据
首先在linux中安装sql:
sudo apt-get update #更新软件源
sudo apt-get install mysql-server #安装mysql
service mysql start #启动mysql
确认是否启动成功,mysql节点处于LISTEN状态表示启动成功:
sudo netstat -tap | grep mysql
mysql -u root -p #进入mysql shell 界面
create database spark; #创建spark库
use spark; #选择spark库
create table student (id int(4), name char(20), gender char(4), age int(4));#在spark库下创建student表
insert into student values(1,'Xueqian','F',23);
insert into student values(2,'Weiliang','M',24); #向student表中插入两条数据
select * from student;#显示student表中的内容
2.在spark-shell交互式执行环境中读写MySQL数据库
(1)在spark-shell交互式执行环境中,撰写Scala代码,读取MySQL数据库的内容;
(2)在spark-shell交互式执行环境中,撰写Scala代码,向MySQL数据库写入两条新的记录:
| 3 | Rongcheng | M | 26 | | 4 | Guanhua | M | 27 | |
读取MySQL数据库的内容:
下载MySQL的JDBC驱动程序,把该驱动程序拷贝到spark的安装目录
启动一个spark-shell(启动Spark Shell时,必须指定mysql连接驱动jar包)
cd /usr/local/spark
./bin/spark-shell \ --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \ --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
执行以下命令连接数据库,读取数据,并显示:
向MySQL数据库写入数据:
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//下面我们设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//下面要设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties() prop.put("user", "root") //表示用户名是root prop.put("password", "***") //表示密码是***
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中 studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)
//写入数据后查看是否写入成功,也可以退出spark-shell进入mysql查看
val jdbcDFs = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark")
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student")
.option("user", "root").option("password", "13110949258").option("useSSL", "false").load()
jdbcDFs.show()//再次显示student表里的内容
3.IDEA中开发程序MySQL数据库读写数据
(1)在IDEA中创建Maven工程,开发Spark SQL程序,从MySQL数据库读取数据。
(2)在IDEA中创建Maven工程,开发Spark SQL程序,向MySQL数据库写入两条新的记录:
| 5 | Chenglu | F | 22 | | 6 | Linzhe | M | 23 | |
下载idea2.2版本并创建maven项目
pom.xml文件参考:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
<scope>compile</scope>
</dependency>
</dependencies>
在项目结构中导入scala库
重构scala文件并创建mysql.scala
运行代码
成功读取sql中student表的内容并添加数据!
更多推荐
所有评论(0)