Apache Spark技術實戰之7 -- CassandraRDD高並發數據讀取實現剖析


未經本人同意,嚴禁轉載,徽滬一郎。

概要

本文就 spark-cassandra-connector 的一些實現細節進行探討,主要集中於如何快速將大量的數據從cassandra 中讀取到本地內存或磁盤。

數據分區

存儲在 Cassandra 中數據的一般都會比較多,記錄數在千萬級別或上億級別是常見的事。如何將這些表中的內容快速加載到本地內存就是一個非常現實的問題。
解決這一挑戰的思路從大的方面來說是比較簡單的,那就是將整張表中的內容分成不同的區域,然后分區加載,不同的分區可以在不同的線程或進程中加載,利用並行化來減少整體加載時間。

順着這一思路出發,要問的問題就是 Cassandra 中的數據如何才能分成不同的區域。

不同於 MySQL ,在 Cassandra 中是不存在 Sequence Id 這樣的類型的,也就是說無法簡單的使用seqId 來指定查詢或加載的數據范圍。

既然沒有 SequenceID,在 Cassandra 中是否就沒有辦法了呢?答案顯然是否定的,如果只是僅僅支持串行讀取,Cassandra 早就會被扔進垃圾堆里了。
數據分區的辦法在 Cassandra 中至少有兩種辦法可以達到,一是通過 token range,另一個是 slice range。這里主要講解利用 token range 來實現目的。

token range

Cassandra將要存儲的記錄存儲在不同的區域中,判斷某一記錄具體存儲在哪個區域的依據是主鍵的Hash值。

Cassandra 1.2之前,組成Cassandra集群的所有節點(Node),都需要手工的指定該節點的Hash值范圍也就是Token Range

手工計算Token Range顯然是很繁瑣,同時也不怎么容易維護,在Cassandra 1.2之后,引進了虛擬節點(vnode)的概念,主要目的是減少不必要的人工指定,同時也將token range的划分變得更為細粒度。比如原先手工指定token range,只能達到10000這樣一個精度,而有了vnode之后,默認安裝是每一個物理節點上有256個虛擬節點,這樣子的話每一個range的范圍就是10000/256,這樣變的更為精細。

有關token range的信息存儲在cassandrasystem命名空間(keyspace)下的localpeers兩張表中。其中local表示本節點的token range情況,而peers表示集群中其它節點的token range情況。這兩張表中的tokens字段就存儲有詳細的信息。如果集群中只由一台機器組成,那么peers中的就會什么內容都沒有。

簡單實驗,列出本節點的token range>

use system;
desc table local;
select tokens from local;

Thrift接口

Token Range告訴我們Cassandra的記錄是分片存儲的,也就意味着可以分片讀取。現在的問題轉換成為如何知道每一個Token Range的起止范圍。

Cassandra支持的Thrift接口中describe_ring就是用來獲取token range的具體起止范圍的。我們常用的nodetool工具使用的就是thrift接口,nodetool 中有一個describering指令使用的就是describe_ring原語。

可以做一個簡單的實驗,利用nodetool來查看某個keyspacetoken range具體情況。

	nodetool -hcassandra_server_addr describering keyspacename

Spark-Cassandra-Connector

在第一節中講解了Cassandra中Token Range信息的存儲位置,以及可以使用哪些API來獲取token range信息。

接下來就分析spark-cassandra-connector是如何以cassandra為數據源將數據加載進內存的。

以簡單的查詢語句為例,假設用戶要從demo這個keyspace的tableX表中加載所有數據,用CQL來表述就是

select * from demo.tableX

上述的查詢使用spark-cassandra-connector來表述就是

sc.cassandraTable(“demo”,”tableX”)

盡管上述語句沒有觸發Spark Job的提交,也就是說並不會將數據直正的從Cassandra的tableX表中加載進來,但spark-cassandra-connector還是需要進行一些數據庫的操作。要解決的主要問題就是schema相關。

cassandraTable(“demo”,”tableX”)

只是說要從tableX中加載數據,並沒有告訴connector有哪些字段,每個字段的類型是什么。這些信息對后面使用諸如get[String](“fieldX”)來說卻是非常關鍵的。

為了獲取字段類型信息的元數據,需要讀取system.schema_columns表,利用如下語句可以得到schema_columns表結構的詳細信息

desc table system.schema_columns

如果在conf/log4j.properties中將日志級別設置為DEBUG, 然后再執行sc.cassandraTable語句就可以看到具體的CQL查詢語句是什么。

CassandraRDDPartitioner

Spark-cassandra-connector添加了一種新的RDD實現,即CassandraRDD。我們知道對於一個Spark RDD來說,非常關鍵的就是確定getPartitions和compute函數。

getPartitions函數會調用CassandraRDDPartitioner來獲取分區數目,

override def getPartitions: Array[Partition] = {
    verify // let's fail fast
    val tf = TokenFactory.forCassandraPartitioner(cassandraPartitionerClassName)
    val partitions = new CassandraRDDPartitioner(connector, tableDef, splitSize)(tf).partitions(where)
    logDebug(s"Created total ${partitions.size} partitions for $keyspaceName.$tableName.")
    logTrace("Partitions: \n" + partitions.mkString("\n"))
    partitions
  }

CassandraRDDPartitioner中的partitions的處理邏輯大致如下:

  1. 首先確定token range,使用describe_ring

  2. 然后根據Cassandra中使用的Partitioner來確定某一個token range中可能的記錄條數,這么做的原因就是為進一步控制加載的數據,提高並發度。否則並發度就永遠是256了,比如有一個物理節點,其中有256vnodes,也就是256token分區。如果每個分區中大致的記錄數是20,000,而每次加載最大只允許10,00的話,整個數據就可以分成256x2=512個分區。

  3. describeRing返回的token range進一步拆分的話,需要使用splittersplitter的構建需要根據keyspace中使用了何種Partitioner來決定,Cassandra中默認的PartitionerMurmur3PartitionerMurmur3Hash算法可以讓Hash值更為均勻的分布到不同節點。

  4. splitter中會利用到配置項spark.cassandra.input.split.sizespark.cassandra.page.row.size,分別表示一個線程最多讀取多少記錄,另一個表示每次讀取多少行。

partitions的源碼詳見CasssandraRDDParitioner.scala

compute函數就利用確定的token的起止范圍來加載內容,這里在理解的時候需要引起注意的就是flatMap是惰性執行的,也就是說只有在真正需要值的時候才會被執行,延遲觸發。

數據真正的加載是發生在fetchTokenRange函數,這時使用到的就是Cassandra Java Driver了,平淡無奇。

fetchTokenRange

fetcchTokenRange函數使用Cassandra Java Driver提供的API接口來讀取數據,利用Java API讀取數據一般遵循以下步驟

val cluster = Cluster.Builder.addContactPoint(“xx.xx.xx.xx”).build
val session = cluster.connect
val stmt = new SimpleStatement(queryCQL)
session.execute(session)
session.close
cluster.close

addContactPoint的參數是cassandra server的ip地址,在后面真正執行cql語句的時候,如果集群有多個節點構成,那么不同的cql就會在不同的節點上執行,自動實現了負載均衡。可以在addContactPoint的參數中設定多個節點的地址,這樣可以防止某一節點掛掉,無法獲取集群信息的情況發生。

session是線程安全的,在不同的線程使用同一個session是沒有問題的,建議針對一個keySpace只使用一個session.

RDD中使用Session

Spark RDD中是無法使用SparkContext的,否則會形成RDD嵌套的現象,因為利用SparkContext很容易構造出RDD,如果在RDD的函數中如map中調用SparkContext創建一個新的RDD,則形成深度嵌套進而導致Spark Job有嵌套。

但在實際的情況下,我們可以需要根據RDD中的值再去對數據庫進行操作,那么有什么辦法來打開數據庫連接呢?

解決的辦法就是直接使用Cassandra Java Driver而不再使用spark-cassandra-connector的高級封裝,因為不能像這樣子來使用cassandraRDD.

	sc.cassandraRDD(“ks”,”tableX”).map(x=>sc.cassandraRDD(“ks”,”tableX”).where(filter))

如果是直接使用Cassandra Java Driver,為了避免每個RDD中的iterator都需要打開一個session,那么可以使用foreachPartition函數來進行操作,減少打開的session數。

val  rdd1 = sc.cassandraTable(“keyspace”,”tableX”)
	rdd1.foreachPartition( lst => {
		val cluster = ClusterBuilder.addContactPoint(“xx.xx.xx.xx”).build
		val session = cluster.connect
		while ( iter.hasNext ) {
		 	val  elem = iter.next
			//do something by using session and elem
		}
		session.close
		cluster.close
	})

其實最好的辦法是在外面建立一個session,然后在不同的partition中使用同一個session,但這種方法不行的原因是在執行的時候會需要”Task not Serializable”的錯誤,於是只有在foreachPartition函數內部新建session.

數據備份

盡管Cassandra號稱可以做到宕機時間為零,但為了謹慎起見,還是需要對數據進行備份。

Cassandra提供了幾種備份的方法,

  1. 將數據導出成為json格式

  2. 利用copy將數據導出為csv格式

  3. 直接復制sstable文件

導出成為jsoncsv格式,當表中的記錄非常多的時候,這顯然不是一個好的選擇。於是就只剩下備份sstable文件了。

問題是將sstable存儲到哪里呢?放到HDFS當然沒有問題,哪有沒有可能對放到HDFS上的sstable直接進行讀取呢,在沒有經過任務修改的情況下,這是不行的。

試想一下,sstable的文件會被拆分為多個塊而存儲到HDFS中,這樣會破壞記錄的完整性,HDFS在存儲的時候並不知道某一block中包含有完成的記錄信息。

為了做到記錄信息不會被拆分到多個block中,需要根據sstable的格式自行提取信息,並將其存儲到HDFS上。這樣存儲之后的文件就可以被並行訪問。

Cassandra中提供了工具sstablesplit來將大的sstable分割成為小的文件。

DataStaxDSE企業版中提供了和HadoopSpark的緊密結合,其一個很大的基礎就是先將sstable的內容存儲到CFS中,大體的思路與剛才提及的應該差不多。

sstable存儲結構的分析是一個研究的熱門,可以參考如下的鏈接。

  1. https://www.fullcontact.com/blog/cassandra-sstables-offline/

只所以要研究備份策略是想將對數據的分析部分與業務部分相分離開,避免由於后台的數據分析導致Cassandra集群響應變得緩慢而致前台業務不可用,即將OLTPOLAP的數據源分離開。

復雜查詢

通過近乎實時的數據備份,后台OLAP就可以使用Spark來對數據進行分析和處理。

與傳統的RDBMS相比,Cassandra所能提供的查詢功能實在是弱的可以,如果想到實現非常復雜的查詢功能的,需要將CassandraSolr進行結合。

DSE企業版提供了該功能,如果想手工搭建的話,可以參考下面的鏈接

  1. http://www.slideshare.net/planetcassandra/an-introduction-to-distributed-search-with-cassandra-and-solr

  2. https://github.com/Stratio/stratio-cassandra 開源方面的嘗試 CassandraLucene的結合

共享SparkContext

SparkContext可以被多個線程使用,這意味着同個Spark Application中的Job可以同時提交到Spark Cluster中,減少了整體的等待時間。

在同一個線程中, Spark只能逐個提交Job,當Job在執行的時候,Driver Application中的提交線程是處於等待狀態的。如果Job A沒有執行完,Job B就無法提交到集群,就更不要提分配資源真正執行了。

那么如何來減少等待時間呢,比如在讀取Cassandra數據的過程中,需要從兩個不同的表中讀取數據,一種辦法就是先讀取完成表A與讀取表B,總的耗時是兩者之和。

如果利用共享SparkContext的技術,在不同的線程中去讀取,則耗時只是兩者之間的最大值。

Scala中有多種不同的方式來實現多線程,現僅以Future為例來說明問題

val ll  = (1 to 3 toList).map(x=>sc.makeRDD(1 to 100000 toList, 3))
val futures = ll.map ( x => Future {
		x.count()
	})
val fl = Future.sequencce(futures)
Await.result(fl,3600 seconds)

簡要說明一下代碼邏輯

  1. 創建三個不同的RDD

  2. 在不同的線程(Future)中通過count函數來提交Job

  3. 使用Await來等待Future執行結束


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM