clickhouse使用Spark導入數據


本文主要介紹如何通過Spark程序寫入數據到Clickhouse中。

操作步驟

  1. 准備Spark程序目錄結構。
     
     find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/com
    ./src/main/scala/com/spark
    ./src/main/scala/com/spark/test ./src/main/scala/com/spark/test/WriteToCk.scala
  2. 編輯build.sbt配置文件添加依賴。
     
    name := "Simple Project" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
  3. 創建WriteToCk.scala數據寫入程序文件。
     
    package com.spark.test
    
    import java.util import java.util.Properties import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel object WriteToCk { val properties = new Properties() properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver") properties.put("user", "<your-user-name>") properties.put("password", "<your-password>") properties.put("batchsize","100000") properties.put("socket_timeout","300000") properties.put("numPartitions","8") properties.put("rewriteBatchedStatements","true") val url = "jdbc:clickhouse://<you-url>:8123/default" val table = "<your-table-name>" def main(args: Array[String]): Unit = { val sc = new SparkConf() sc.set("spark.driver.memory", "1G") sc.set("spark.driver.cores", "4") sc.set("spark.executor.memory", "1G") sc.set("spark.executor.cores", "2") val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate() val df = session.read.format("csv") .option("header", "true") .option("sep", ",") .option("inferSchema", "true") .load("</your/path/to/test/data/a.txt>") .selectExpr( "Year", "Quarter", "Month" ) .persist(StorageLevel.MEMORY_ONLY_SER_2) println(s"read done") df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties) println(s"write done") df.unpersist(true) } }

    參數說明

    • your-user-name:目標ClickHouse集群中創建的數據庫賬號名。
    • your-pasword:數據庫賬號名對應的密碼。
    • your-url:目標ClickHouse集群地址。
    • /your/path/to/test/data/a.txt:要導入的數據文件的路徑,包含文件地址和文件名。
       
      說明 文件中的數據及schema,需要與ClickHouse中目標表的結構保持一致。
    • your-table-name:ClickHouse集群中的目標表名稱。
  4. 編譯打包。
     
    sbt package
  5. 運行。
     
    ${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM