依賴庫
spark 操作redis的時候,依賴的庫是spark-redis
首先我們導入依賴
<!-- https://mvnrepository.com/artifact/com.redislabs/spark-redis -->
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.11</artifactId>
<version>2.4.2</version>
</dependency>
spark-redis 參數設置
首先初始化一個spark實例,spark-redis的參數在config中進行配置。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
def main(args: Array[String]): Unit = {
val conf: SparkConf =new SparkConf().setAppName("setRedis").setMaster("local[2]")
conf.set("redis.host", "localhost") //redis 主機節點
conf.set("redis.port", "6379") //端口號,不填默認為6379
val session: SparkSession =SparkSession.builder().config(conf).getOrCreate()
val sc: SparkContext =session.sparkContext
}
還可以設置一些額外的參數
conf.set("redis.auth","null") //用戶權限配置
conf.set("redis.db","0") //數據庫設置
conf.set("redis.timeout","2000") //設置連接超時時間
簡單使用
sc通過導入的隱式轉換可以調出的讀取Redis的方法,都是以fromRedis開頭的,都是redis可以存儲的數據結構,這里以常見的KV進行示例
import com.redislabs.provider.redis._
讀取Redis
通過fromRedisKV
方法,獲取一個鍵的值。
val rdd: RDD[(String, String)] =sc.fromRedisKV("a")
rdd.collect().foreach(println(_))
/**
* (a,1)
*/
fromRedisKV()
的源碼:
/**
* @param keysOrKeyPattern an array of keys or a key pattern
* @param partitionNum number of partitions
* @return RedisKVRDD of simple Key-Values stored in redis server
*/
def fromRedisKV[T](keysOrKeyPattern: T,
partitionNum: Int = 3)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)):
RDD[(String, String)] = {
keysOrKeyPattern match {
case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum).getKV()
case keys: Array[String] => fromRedisKeys(keys, partitionNum).getKV()
case _ => throw new scala.Exception(IncorrectKeysOrKeyPatternMsg)
}
}
fromRedisKV()
的參數:
-
泛類型
keysOrKeyPattern
從match case 模式匹配代碼中可以看出,這里的T可是是兩種類型,一個是String,另一個是Array[String],如果傳入其他類型則會拋出運行時異常,其中String類型的意思是匹配鍵,這里可以用通配符比如foo*,所以返回值是一個結果集RDD[(String, String)],當參數類型為Array[String]時是指傳入key的數組,返回的結果則為相應的的結果集,RDD的內容類型也是KV形式。 -
Ine類型
partitionNum
生成RDD的分區數,默認為3,可以根據實際情況進行更改防止數據過度傾斜。 -
柯里化形式隱式參數
redisConfig
由於我們之前在sparkConf里面set了相應的參數,這里不傳入這個參數即可。如要調整,則可以按照源碼中的方式傳入,其中RedisEndpoint是一個case class類,而且很多參數都有默認值(比如6379的端口號),所以自己建立一個RedisEndpoint也是非常方便的。
寫入Redis
通過toRedisKV
將數據寫入redis
val data: Seq[(String, String)] = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333"))
val redisData:RDD[(String,String)] = sc.parallelize(data)
sc.toRedisKV(redisData)
查看redis
127.0.0.1:6379> keys *
1) "high"
2) "together"
3) "abc"
127.0.0.1:6379>
toRedisKV()
的源碼:
/**
* @param kvs Pair RDD of K/V
* @param ttl time to live
*/
def toRedisKV(kvs: RDD[(String, String)], ttl: Int = 0)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
kvs.foreachPartition(partition => setKVs(partition, ttl, redisConfig, readWriteConfig))
}
toRedisKV()
的參數
-
kv類型的RDD
kvs是一個鍵值對類型的RDD,鍵和值的類型都是String
類型 -
Int類型的ttl
ttl是存入數據的過期時間,單位是秒
以上就是spark讀寫redis的兩個常用的方法。