一、实验目的

(1)通过实验掌握 Spark SQL 的基本编程方法;

(2)熟悉 RDD 到 DataFrame 的转化方法;

(3)熟悉利用 Spark SQL 管理来自不同数据源的数据。

二、实验平台 操作系统: Ubuntu16.04 Spark 版本:2.1.0 数据库:MySQL

三、实验内容和要求

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。 { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } 为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

(1) 查询所有数据;

 (2) 查询所有数据,并去除重复的数据;

(3) 查询所有数据,打印时去除 id 字段;

(4) 筛选出 age>30 的记录;

(5) 将数据按 age 分组;

(6) 将数据按 name 升序排列;

(7) 取出前 3 行数据;

(8) 查询所有记录的 name 列,并为其取别名为 username;

(9) 查询年龄 age 的平均值;

(10) 查询年龄 age 第 2 页 的最小值。

1.再MobaXter里

cd`/home/hadoop

创建

vim employee.json`

把数据粘上去

2.再IDEA上

创建

case object SparkSQLjibencaozuo

里面代码

package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case object SparkSQLjibencaozuo{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
    import spark.implicits._

    // 加载 JSON 数据到 DataFrame
    //val employeeDF = spark.read.json("employee.json")

    val employeeDF = spark.read.json("file:///home/hadoop/employee.json")

    // (1) 查询所有数据
    employeeDF.show()

    // (2) 查询所有数据,并去除重复的数据
    val distinctEmployeeDF = employeeDF.distinct()
    distinctEmployeeDF.show()

    // (3) 查询所有数据,打印时去除 id 字段
    employeeDF.select("name", "age").show()

    // (4) 筛选出 age>30 的记录
    employeeDF.filter($"age" > 30).show()

    // (5) 将数据按 age 分组
    employeeDF.groupBy("age").count().show()

    // (6) 将数据按 name 升序排列
    employeeDF.orderBy($"name".asc).show()

    // (7) 取出前 3 行数据
    employeeDF.limit(3).show()

    // (8) 查询所有记录的 name 列,并为其取别名为 username
    employeeDF.select($"name".as("username")).show()

    // (9) 查询年龄 age 的平均值
    employeeDF.select(avg($"age")).show()

    // (10) 查询年龄 age 的最小值
    employeeDF.select(min($"age")).show()

    // 停止 Spark 会话
    spark.stop()

  }
}
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case object SparkSQLjibencaozuo{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
    import spark.implicits._

    // 加载 JSON 数据到 DataFrame
    //val employeeDF = spark.read.json("employee.json")

    val employeeDF = spark.read.json("file:///home/hadoop/employee.json")

    // (1) 查询所有数据
    employeeDF.show()

    // (2) 查询所有数据,并去除重复的数据
    val distinctEmployeeDF = employeeDF.distinct()
    distinctEmployeeDF.show()

    // (3) 查询所有数据,打印时去除 id 字段
    employeeDF.select("name", "age").show()

    // (4) 筛选出 age>30 的记录
    employeeDF.filter($"age" > 30).show()

    // (5) 将数据按 age 分组
    employeeDF.groupBy("age").count().show()

    // (6) 将数据按 name 升序排列
    employeeDF.orderBy($"name".asc).show()

    // (7) 取出前 3 行数据
    employeeDF.limit(3).show()

    // (8) 查询所有记录的 name 列,并为其取别名为 username
    employeeDF.select($"name".as("username")).show()

    // (9) 查询年龄 age 的平均值
    employeeDF.select(avg($"age")).show()

    // (10) 查询年龄 age 的最小值
    employeeDF.select(min($"age")).show()

    // 停止 Spark 会话
    spark.stop()

  }
}

注意一下

```
    // 加载 JSON 数据到 DataFrame
    //val employeeDF = spark.read.json("employee.json")

    val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
```

记好employee.json文件在哪个目录下

1.运行代码

有这个结果就行了,不需要在意红色
Process finished with exit code 1

2.打包项目 build->package->git add jar->commit->push,在虚拟机中 git pull origin master 上传的 jar 包

出现这个结果就正确的

3.以 spark-local 模式提交 spark 任务
先  cd /opt/module/spark-local/
再运行
bin/spark-submit --master local[*] --jars /opt/module/spark-local/jars/mysql-connector-java-5.1.27-bin.jar --class IDEA自己文件的地址 ~/gitdata/target/scala_demo-1.0-SNAPSHOT.jar

4.10结果就出来了

2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):

1,Ella,36 2,Bob,29 3,Jack,29

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代 码。

employee.txt 如上面一样

再IDEA上建立RDDToDataFrameExample

package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
case object RDDToDataFrameExample{
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("RDD to DataFrame Example")
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
      .getOrCreate()

    import spark.implicits._

    // 指定employee.txt文件的位置
    val inputFilePath = "file:///home/hadoop/employee.txt"

    // 从文本文件读取数据创建RDD
    val rdd = spark.sparkContext.textFile(inputFilePath)

    // 定义DataFrame的schema
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false)
    ))

    // 将RDD转换为DataFrame
    val dataFrame = spark.createDataFrame(rdd.map { line =>
      val parts = line.split(",")
      Row(parts(0).toInt, parts(1), parts(2).toInt)
    }, schema)

    // 显示DataFrame内容
    dataFrame.show(false)

    // 按照指定格式打印所有数据
    dataFrame.collect().foreach { row =>
      println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
    }

    // 停止SparkSession
    spark.stop()
  }

}
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
case object RDDToDataFrameExample{
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("RDD to DataFrame Example")
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
      .getOrCreate()

    import spark.implicits._

    // 指定employee.txt文件的位置
    val inputFilePath = "file:///home/hadoop/employee.txt"

    // 从文本文件读取数据创建RDD
    val rdd = spark.sparkContext.textFile(inputFilePath)

    // 定义DataFrame的schema
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false)
    ))

    // 将RDD转换为DataFrame
    val dataFrame = spark.createDataFrame(rdd.map { line =>
      val parts = line.split(",")
      Row(parts(0).toInt, parts(1), parts(2).toInt)
    }, schema)

    // 显示DataFrame内容
    dataFrame.show(false)

    // 按照指定格式打印所有数据
    dataFrame.collect().foreach { row =>
      println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
    }

    // 停止SparkSession
    spark.stop()
  }



}

1.建表

CREATE DATABASE sparktest;  
USE sparktest;  
  
CREATE TABLE employee (  
  id INT PRIMARY KEY,  
  name VARCHAR(50),  
  gender CHAR(1),  
  age INT  
);  
  
INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);
————————————————

.

再IDEA上建立MySQLDataFrameExample

package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum

case object MySQLDataFrameExample{
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("MySQL DataFrame Example MySQL写入与读取")
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
      .getOrCreate()

    import spark.implicits._

    // 配置MySQL JDBC连接
    val jdbcProperties = new Properties()
    jdbcProperties.setProperty("user", "root")
    jdbcProperties.setProperty("password", "自己的密码")
    jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
    //jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")

    // 定义MySQL的JDBC连接URL
    val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"

    // 创建DataFrame以插入数据
    val newEmployeeData = Seq(
      (3, "Mary", "F", 26),
      (4, "Tom", "M", 23)
    ).toDF("id", "name", "gender", "age")

    // 将DataFrame数据插入到MySQL的employee表中
    newEmployeeData.write
      .mode("append") // 使用append模式来添加数据,而不是覆盖
      .jdbc(jdbcUrl, "employee", jdbcProperties)

    // 从MySQL读取employee表的数据
    val employeeDF = spark.read
      .jdbc(jdbcUrl, "employee", jdbcProperties)

    // 打印age的最大值
    val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
    println(s"Max age: $maxAge")

    // 打印age的总和
    val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
    println(s"Sum of ages: $sumAge")

    // 停止SparkSession
    spark.stop()
  }
}
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum

case object MySQLDataFrameExample{
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("MySQL DataFrame Example MySQL写入与读取")
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
      .getOrCreate()

    import spark.implicits._

    // 配置MySQL JDBC连接
    val jdbcProperties = new Properties()
    jdbcProperties.setProperty("user", "root")
    jdbcProperties.setProperty("password", "自己的")
    jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
    //jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")

    // 定义MySQL的JDBC连接URL
    val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"

    // 创建DataFrame以插入数据
    val newEmployeeData = Seq(
      (3, "Mary", "F", 26),
      (4, "Tom", "M", 23)
    ).toDF("id", "name", "gender", "age")

    // 将DataFrame数据插入到MySQL的employee表中
    newEmployeeData.write
      .mode("append") // 使用append模式来添加数据,而不是覆盖
      .jdbc(jdbcUrl, "employee", jdbcProperties)

    // 从MySQL读取employee表的数据
    val employeeDF = spark.read
      .jdbc(jdbcUrl, "employee", jdbcProperties)

    // 打印age的最大值
    val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
    println(s"Max age: $maxAge")

    // 打印age的总和
    val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
    println(s"Sum of ages: $sumAge")

    // 停止SparkSession
    spark.stop()
  }
}

再如实验一,上创到虚拟机上查看结果

Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐