正文
首先打開spark官網,找一個自己用版本我選的是1.6.3的,然后進入SparkStreaming ,通過搜索這個位置找到Kafka,
點擊過去會找到一段Scala的代碼
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
如果想看createStream方法,可以值通過SparkStreaming中的 Where to go from here 中看到,有Java,Scala,Python的documents選擇自己編碼的一種點擊進去。我這里用的Scala,點擊KafkaUtils進去后會看到這個類中有很多的方法,其中我們要找的是createStream方法,看看有哪些重載。我們把這個方法的解釋賦值過來。
defcreateStream(jssc: JavaStreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Integer]): JavaPairReceiverInputDStream[String, String]
最后我們在IDEA中寫Scala獲取Kafka代碼
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(Constants.SPARK_APP_NAME_PRODUCT)
.getOrCreate()
val map = Map("topic" -> 1)
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val createStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "hadoop01:9092,hadoop02:9092,hadoop03:9092", "groupId", map, StorageLevel.MEMORY_AND_DISK_SER)
val map1: DStream[String] = createStream.map(_._2)
}
簡答的代碼過程,因為還有一些后續的工作要做,所以只是簡單的寫了一些從Kafa獲取數據的代碼從官網查找的一個過程,也是懷着學習的態度與大家一起交流,希望大牛們多多指點。
i want to take you to travel ,this is my current mood
Create an input stream that pulls messages from Kafka Brokers. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
JavaStreamingContext object
Zookeeper quorum (hostname:port,hostname:port,..)
The group id for this consumer
Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
DStream of (Kafka message key, Kafka message value)