1.建立生產者發送數據
(1)配置zookeeper屬性信息props
(2)通過 new KafkaProducer[KeyType,ValueType](props) 建立producer
(3)通過 new ProducerRecord[KeyType,ValueType](topic,key,value) 封裝消息message
(4)通過 producer.send(message) 發送消息
package SparkDemo
import java.util
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object KafkaProducer {
def main(args:Array[String]): Unit ={
if(args.length<4){
//參數
//<metadataBrokerList> broker地址
//<topic> topic名稱
//<messagesPerSec> 每秒產生的消息
//<wordsPerMessage> 每條消息包括的單詞數
System.err.println("Usage:KafkaProducer <metadataBrokerList> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
val Array(brokers,topic,messagesPerSec,wordsPerMessage) = args
//zookeeper連接屬性
val props = new util.HashMap[String,Object]();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
//通過zookeeper建立kafka的producer
val producer = new KafkaProducer[String,String](props)
//通過producer發送一些消息
while(true){
(1 to messagesPerSec.toInt).foreach{//遍歷[1, messagesPerSec.toInt]
messageNum =>
val str = (1 to wordsPerMessage.toInt).map(
x => scala.util.Random.nextInt(10).toString
).mkString(" ")//連成字符串用空格隔開
println(str)
//注意,我們這里發送的消息都是以鍵值對的形式發送的
//需要把消息內容和topic封裝到ProducerRecord中再發送
//我們這里的topic為外部的傳參,消息的鍵值對為<null,str>
val message = new ProducerRecord[String,String](topic,null,str)
//發送消息
producer.send(message)
}
Thread.sleep(1000)//休眠一秒鍾
}
}
}
我們把程序打包好,提交到spark集群中執行

最后四個為我們要傳入的程序參數
我們定義在localhost:9092的名字為wordsender的topic會以每秒3條,每條5個單詞往外發送數據
2.建立消費者消費數據
(1)建立sparkStream ssc
(2)配置zookeeper地址 zkQuorum
(3)設置topic所在組名 group
(4)將topic配置成 Map<topicName,numThreads> 的 topicMap<topic名稱,所需線程數> 的形式
(5)通過 KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) 建立sparkStream-kafka的流通道
(6)sparkStream處理
package SparkDemo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaConsumer {
def main(args:Array[String]): Unit ={
//設置日志等級
StreamingLoggingExample.setStreamingLogLevels()
//建立spark流
val conf = new SparkConf().setAppName("KafkaConsumerDemo").setMaster("local")
val ssc = new StreamingContext(conf,Seconds(10))
//設置檢查點
ssc.checkpoint("file:/// or hdfs:///")
//zookeeper
val zkQuorum = "localhost:2181" //zookeeper服務器地址
//topic所發放的組名
val group = "1" //topic 所在的組名,可以設置為任意名字
//topic配置
val topics = "wordsender" //topic 名稱,可以為多個topic,多個之間用逗號隔開 “topic1,topic2”
//建立topicMap<topicName,numThreads.toInt> key為topic名稱,value為所需要的線程數
val topicMap = topics.split(",").map((_,1)).toMap //numThreads.toInt為所需線程數
//建立spark流
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
//處理spark流
val lines = lineMap.map(_._2)//上面傳過來的數據為<null,string>,我們去后邊的value
val pair = lines.flatMap(_.split(" ")).map((_,1))
val wordCount = pair.reduceByKey(_+_)
wordCount.print
//啟動spark流
ssc.start()
ssc.awaitTermination()
}
}
然后我們將程序打包提交到集群上運行,就可以對上面我們建立的kafka生產的消息進行消費了。
