方法一:各个字段都是提前定好的

val prop = new java.util.Properties
prop.setProperty("user", "root")
prop.setProperty("password", "123456")

df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test", "mytab", prop)

方法二:字段可自由添减

df.foreachPartition(p => {
      @transient val conn = ConnectionPool.getConnection
      p.foreach(x => {
        val sql = "insert into app_id(id,date,appid,num) values (" +
          "'"+UUID.randomUUID+"'," +
          "'"+x.getInt(0)+"'," +
          "'"+x.getString(1)+"'," +
          "'"+x.getLong(2)+"'" +
          ")"
        val stmt = conn.createStatement
        stmt.executeUpdate(sql)
      })
      ConnectionPool.returnConnection(conn)
    })

数据库链接池:

package com.prince.spark.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;

public class ConnectionPool {
    private static LinkedList<Connection> connectionQueue;

    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        }catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public synchronized static Connection getConnection() {
        try {
            if (connectionQueue == null) {
                connectionQueue = new LinkedList<Connection>();
                for (int i = 0;i < 5;i ++) {
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://192.168.1.97:3306/xiang_log?characterEncoding=utf8",
                            "root",
                            "123456"
                    );
                    connectionQueue.push(conn);
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        return connectionQueue.poll();
    }

    public static void returnConnection(Connection conn) {
        connectionQueue.push(conn);
    }
}

方法三:有时涉及到计算结果的写入,还要组装df

//组装结果RDD
val arrayRDD = sc.parallelize(List ((num,log_date)))
//将结果RDD映射到rowRDD
val resultRowRDD = arrayRDD.map(p =>Row(
  p._1.toInt,
  p._2.toString,
  new Timestamp(new java.util.Date().getTime)
))
//通过StructType直接指定每个字段的schema
val resultSchema = StructType(
  List(
    StructField("verify_num", IntegerType, true), 
    StructField("log_date", StringType, true), //是哪一天日志分析出来的结果
    StructField("create_time", TimestampType, true) //分析结果的创建时间
  )
)
//组装新的DataFrame
val DF = spark.createDataFrame(resultRowRDD,resultSchema)
//将结果写入到Mysql
DF.write.mode("append")
  .format("jdbc")
  .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log")
  .option("dbtable","verify") //表名
  .option("user","root")
  .option("password","123456")
  .save()
Logo

更多推荐