使用Spark SQL读写MySQL数据库

1.在MySQL数据库中创建表

在MySQL数据库中创建一个名称为spark的数据库,并创建一个名称为student的表,录入两条数据

首先在linux中安装sql:

sudo apt-get update  #更新软件源
sudo apt-get install mysql-server  #安装mysql

service mysql start #启动mysql

确认是否启动成功,mysql节点处于LISTEN状态表示启动成功:

sudo netstat -tap | grep mysql

mysql -u root -p #进入mysql shell 界面

create database spark; #创建spark库

use spark; #选择spark库

create table student (id int(4), name char(20), gender char(4), age int(4));#在spark库下创建student表

insert into student values(1,'Xueqian','F',23);

insert into student values(2,'Weiliang','M',24); #向student表中插入两条数据

select * from student;#显示student表中的内容

2.在spark-shell交互式执行环境中读写MySQL数据库

(1)在spark-shell交互式执行环境中,撰写Scala代码,读取MySQL数据库的内容;

(2)在spark-shell交互式执行环境中,撰写Scala代码,向MySQL数据库写入两条新的记录:

| 3 | Rongcheng | M | 26 |

| 4 | Guanhua | M | 27 |

读取MySQL数据库的内容:

下载MySQL的JDBC驱动程序,把该驱动程序拷贝到spark的安装目录

启动一个spark-shell(启动Spark Shell时,必须指定mysql连接驱动jar包)

cd /usr/local/spark  

./bin/spark-shell  \ --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \ --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar

执行以下命令连接数据库,读取数据,并显示:

向MySQL数据库写入数据:

import java.util.Properties

import org.apache.spark.sql.types._

import org.apache.spark.sql.Row

//下面我们设置两条数据表示两个学生信息

val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))

//下面要设置模式信息

val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))

//下面创建Row对象,每个Row对象都是rowRDD中的一行

val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))

//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来

val studentDF = spark.createDataFrame(rowRDD, schema)

//下面创建一个prop变量用来保存JDBC连接参数

val prop = new Properties() prop.put("user", "root") //表示用户名是root prop.put("password", "***") //表示密码是***

prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver

//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中 studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)

//写入数据后查看是否写入成功,也可以退出spark-shell进入mysql查看

val jdbcDFs = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark")

      .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student")

      .option("user", "root").option("password", "13110949258").option("useSSL", "false").load()

    jdbcDFs.show()//再次显示student表里的内容

3.IDEA中开发程序MySQL数据库读写数据

(1)在IDEA中创建Maven工程,开发Spark SQL程序,从MySQL数据库读取数据。

(2)在IDEA中创建Maven工程,开发Spark SQL程序,向MySQL数据库写入两条新的记录:

| 5 | Chenglu | F | 22 |

| 6 | Linzhe | M | 23 |

下载idea2.2版本并创建maven项目

pom.xml文件参考:

<dependencies>

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.12</artifactId>

        <version>3.1.2</version>

        <scope>compile</scope>

    </dependency>

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-sql_2.12</artifactId>

        <version>3.1.2</version>

        <scope>compile</scope>

    </dependency>

    <dependency>

        <groupId>mysql</groupId>

        <artifactId>mysql-connector-java</artifactId>

        <version>5.1.40</version>

        <scope>compile</scope>

    </dependency>

</dependencies>

 在项目结构中导入scala库

重构scala文件并创建mysql.scala

运行代码

 成功读取sql中student表的内容并添加数据!

Logo

更多推荐