Apache Spark技術實戰之4 -- 利用Spark將json文件導入Cassandra


歡迎轉載,轉載請注明出處。

概要

本文簡要介紹如何使用spark-cassandra-connector將json文件導入到cassandra數據庫,這是一個使用spark的綜合性示例。

前提條件

假設已經閱讀技術實戰之3,並安裝了如下軟件

  1. jdk
  2. scala
  3. sbt
  4. cassandra
  5. spark-cassandra-connector

實驗目的

將存在於json文件中的數據導入到cassandra數據庫,目前由cassandra提供的官方工具是json2sstable,由於對cassandra本身了解不多,這個我還沒有嘗試成功。

但想到spark sql中可以讀取json文件,而spark-cassadra-connector又提供了將RDD存入到數據庫的功能,我想是否可以將兩者結合一下。

創建KeySpace和Table

為了減少復雜性,繼續使用實戰3中的keyspace和table,

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

啟動spark-shell

與實戰3中描述一致。

bin/spark-shell --driver-class-path /root/working/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-thrift/jars/cassandra-thrift-2.0.9.jar:/root/.ivy2/cache/org.apache.thrift/libthrift/jars/libthrift-0.9.1.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-clientutil/jars/cassandra-clientutil-2.0.9.jar:/root/.ivy2/cache/com.datastax.cassandra/cassandra-driver-core/jars/cassandra-driver-core-2.0.4.jar:/root/.ivy2/cache/io.netty/netty/bundles/netty-3.9.0.Final.jar:/root/.ivy2/cache/com.codahale.metrics/metrics-core/bundles/metrics-core-3.0.2.jar:/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.7.jar:/root/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.3.2.jar:/root/.ivy2/cache/org.joda/joda-convert/jars/joda-convert-1.2.jar:/root/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.3.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-all/jars/cassandra-all-2.0.9.jar:/root/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.2.jar

准備json文件

以spark自帶的person.json文件為例,內容如下所示

{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

數據導入

假設person.json文件存儲在$SPARK_HOME目錄,在啟動spark-shell之后,執行如下語句

sc.stop
import com.datastax.spark.connector._
import org.apache.spark._
val conf = new SparkConf()
conf.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val path = "./people.json"
val people = sqlContext.jsonFile(path)
people.map(p=>(p.getString(10),p.getInt(0)))
      .saveToCassandra("test","kv",SomeColumns("key","value"))

注意:

  1. jsonFile返回的是jsonRDD,其中每一個成員是Row類型,並不行直接將saveToCassandra作用於jsonRDD,需要先作一步轉換即map過程
  2. map中使用到的getXXX函數是在事先已知數據類型的情況下取出其值
  3. 最后saveToCassandra觸發數據的存儲過程

另外一個地方值得記錄一下,如果在cassandra中創建的表使用了uuid作為primary key,在scala中使用如下函數來生成uuid

import java.util.UUID
UUID.randomUUID

驗證步驟

使用cqlsh來查看數據是否已經真正的寫入到test.kv表中。

小結

本次實驗結合了以下知識

  1. spark sql
  2. spark RDD的轉換函數
  3. spark-cassandra-connector


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM