cassandra 之 在spark-shell 中使用 spark cassandra connector 完整案例


1、cassandra 准備

啟動cqlsh,

 

CQLSH_HOST=172.16.163.131 bin/cqlsh

 

cqlsh>CREATE KEYSPACE productlogs WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '2' } 

cqlsh>CREATE TABLE productlogs.logs (
    ids uuid,
    app_name text,
    app_version text,
    city text,
    client_time timestamp,
    country text,
    created_at timestamp,
    cs_count int,
    device_id text,
    id int,
    modle_name text,
    province text,
    remote_ip text,
    updated_at timestamp,
    PRIMARY KEY (ids)
)

 

2、spark cassandra conector jar包

新建空項目,使用sbt,引入connector,打包為spark-cassandra-connector-full.jar,在*.sbt文件中添加如下一行

libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.5.0"

這步的意義在於:官方的connector包沒有將依賴打進去,所以,直接使用官方包的時候,需要自己將依賴找出來。不同版本依賴的包及版本也不相同,簡單起見,直接打一個full包

3、啟動spark-shell

 

/opt/db/spark-1.5.2-bin-hadoop2.6/bin/spark-shell --master spark://u1:7077  --jars ~/spark-cassandra-connector-full.jar

以下為sparkshell 命令

4、准備數據源:

//可能大多數文檔都先stop掉當前sc,再重啟一個,其實根本沒必要,直接在原有sc上添加cassandra的參數就好
scala>sc.getConf.set("spark.cassandra.connection.host", "172.16.163.131")
//讀取HDFS上的數據源
scala>val df = sc.textFile("/data/logs")
//引入需要的命令空間
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>import com.datastax.spark.connector._
scala>import java.util.UUID
//定義shcmea
scala>val schema = StructType(
  StructField("ids", StringType, true) ::
    StructField("id", IntegerType, true) ::
    StructField("app_name", StringType, true) ::
    StructField("app_version", StringType, true) ::
    StructField("client_time", TimestampType, true) ::
    StructField("device_id", StringType, true) ::
    StructField("modle_name", StringType, true) ::
    StructField("cs_count", IntegerType, true) ::
    StructField("created_at", TimestampType, true) ::
    StructField("updated_at", TimestampType, true) ::
    StructField("remote_ip", StringType, true) ::
    StructField("country", StringType, true) ::
    StructField("province", StringType, true) ::
   StructField("city", StringType, true) :: Nil)
//指定數據源的schema
scala>val rowRDD = df.map(_.split("\t")).map(p => Row(UUID.randomUUID().toString(), p(0).toInt, p(1), p(2), java.sql.Timestamp.valueOf(p(3)), p(4), p(5), p(6).toInt, java.sql.Timestamp.valueOf(p(7)), java.sql.Timestamp.valueOf(p(8)), p(9), p(10), p(11), p(12)))
scala>val df= sqlContext.createDataFrame(rowRDD, schema)
scala>df.registerTempTable("logs")
//看下結果
scala>sqlContext.sql("select * from logs limit 1").show

 如果你足夠細心的話,你可能看到在類型為uuid的ids列,我用的是字符串UUID.randomUUID().toString()。為什么呢?其實在spark cassandra connector內部,會進行轉換的。見附錄1

5、將數據存入cassandra

scala>import org.apache.spark.sql.cassandra._
scala>df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "logs", "keyspace" -> "productlogs")).save()

6、取出剛存的數據:

scala>import org.apache.spark.sql.cassandra._
scala>val cdf = sqlContext.read.
  format("org.apache.spark.sql.cassandra").
  options(Map("table" -> "logs", "keyspace" -> "productlogs")).
  load().registerTempTable("logs")
scala>sqlContext.sql("select * from logs_jsut_save limit 1").show

 7、cassandra 與spark sql 數據類型對應關系

spark-cassandra-connector/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DataTypeConverter.scala

  private[cassandra] val primitiveTypeMap = Map[connector.types.ColumnType[_], catalystTypes.DataType](
    connector.types.TextType       -> catalystTypes.StringType,
    connector.types.AsciiType      -> catalystTypes.StringType,
    connector.types.VarCharType    -> catalystTypes.StringType,

    connector.types.BooleanType    -> catalystTypes.BooleanType,

    connector.types.IntType        -> catalystTypes.IntegerType,
    connector.types.BigIntType     -> catalystTypes.LongType,
    connector.types.CounterType    -> catalystTypes.LongType,
    connector.types.FloatType      -> catalystTypes.FloatType,
    connector.types.DoubleType     -> catalystTypes.DoubleType,
    connector.types.SmallIntType   -> catalystTypes.ShortType,
    connector.types.TinyIntType    -> catalystTypes.ByteType,

    connector.types.VarIntType     -> catalystTypes.DecimalType(38, 0), // no native arbitrary-size integer type
    connector.types.DecimalType    -> catalystTypes.DecimalType(38, 18),

    connector.types.TimestampType  -> catalystTypes.TimestampType,
    connector.types.InetType       -> catalystTypes.StringType,
    connector.types.UUIDType       -> catalystTypes.StringType,
    connector.types.TimeUUIDType   -> catalystTypes.StringType,
    connector.types.BlobType       -> catalystTypes.BinaryType,
    connector.types.DateType       -> catalystTypes.DateType,
    connector.types.TimeType       -> catalystTypes.LongType
  )

 備注:作者在spark-shell下,使用spark-cassandra-conector 主要使用了兩個技巧

1、新建空項目,引入spark-cassandra-conector,將依賴包打進來

2、在spark-shell,直接獲取conf,然后添加cassandra 連接參數,這樣,就可以在默認的sparkcontext、sqlContext:HiveContext上使用,而不需要先sc.stop


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM