Py学习  »  DATABASE

如何使用apache spark在mysql数据库中创建表

svk 041994 • 4 年前 • 122 次点击  

我正在尝试创建一个Spark应用程序,它对 创建、读取、写入和更新MySQL数据。那么,有没有办法用spark创建一个mysql表呢?

下面我有一个scala jdbc代码,它在mysql中创建了一个表。 数据库。我怎样才能通过火花做到这一点?

package SparkMysqlJdbcConnectivity

import org.apache.spark.sql.SparkSession
import java.util.Properties
import java.lang.Class
import java.sql.Connection
import java.sql.DriverManager

object MysqlSparkJdbcProgram {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("MysqlJDBC Connections")
      .master("local[*]")
      .getOrCreate()

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/world"
    val operationtype = "create table"
    val tablename = "country"
    val tablename2 = "state"

    val connectionProperties = new Properties()

    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")

    val jdbcDf = spark.read.jdbc(url, s"${tablename}", connectionProperties)

    operationtype.trim() match {
      case "create table" => {
       // Class.forName(driver)
        try{
          val con:Connection = DriverManager.getConnection(url,connectionProperties)
          val result = con.prepareStatement(s"create table ${tablename2} (name varchar(255), country varchar(255))").execute()
          println(result)
          if(result) println("table creation is unsucessful") else println("table creation is unsucessful")
        }
      }

      case "read table" => {

        val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)
        jdbcDf.show()
      }

      case "write table" => {}

      case "drop table"  => {}

    }

  }

}
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/38372
 
122 次点击  
文章 [ 1 ]  |  最新文章 4 年前
Ravikumar
Reply   •   1 楼
Ravikumar    4 年前

当您编写JDBCDF数据帧时,表将自动创建。

jdbcDf
 .write
 .jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)

如果要指定表架构,

jdbcDf
 .write
 .option("createTableColumnTypes", "name VARCHAR(500), col1 VARCHAR(1024), col3 int")
 .jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)