kafka 服務相關的命令
# 開啟kafka的服務器
bin/kafka-server-start.sh -daemon config/server.properties &
# 創建topic
bin/kafka-topics.sh --create --zookeeper bigdata-senior02.ibeifeng.com:2181 --replication-factor 1 --partitions 1 --topic orderTopic
# 開啟kafka的消費者
bin/kafka-console-consumer.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic --from-beginning
# 開啟kafka的生產者
bin/kafka-console-producer.sh --broker-list bigdata-senior02.ibeifeng.com:9092 --topic orderTopic
# 查看topic
bin/kafka-topics.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --list
# 標記刪除kafka的topic
bin/kafka-topics.sh --delete --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic
環境准備(我使用的單機偽分布模式)
首先開啟zk,再開啟kafka, 並啟動kafka的服務
ZK_HOME/bin/zkServer.sh start
KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties &
# 創建topic
bin/kafka-topics.sh --create --zookeeper bigdata-senior02.ibeifeng.com:2181 --replication-factor 1 --partitions 1 --topic orderTopic
# 查看topic
bin/kafka-topics.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --list
# 開啟kafka的消費者
bin/kafka-console-consumer.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic --from-beginning
# 開啟kafka的生產者
bin/kafka-console-producer.sh --broker-list bigdata-senior02.ibeifeng.com:9092 --topic orderTopic
通過上面的測試,確保kafka可以正常運行
spark streaming 代碼編寫(scala2.11.8,spark2.0.0,kafka1.1)
maven依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId> // 這里要特別注意自己的scala版本,不然會運行時會不兼容,
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
<!--<!– https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka –>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- Spark Core -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Spark SQL -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Spark Streaming -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
生產端數據生成
/**
* 這是一個數據生產端
*
* 開啟服務器,broker, 如果不開啟這個會提示沒有找到broker
* bin/kafka-server-start.sh -daemon config/server.properties &
*
* 開啟消費端
* bin/kafka-console-consumer.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic --from-beginning
*/
//object OrderProductor {
// def main(args: Array[String]): Unit = {
//
// val topic = "orderTopic"
// val brokers = "bigdata-senior02.ibeifeng.com:9092"
//
// val props = new util.HashMap[String,Object]()
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
//
// val producer = new KafkaProducer[String,String](props)
//
//
// // 每秒生成10個訂單
// while(true){
// (1 to 10).foreach{messageNum =>
// // 地區id, 訂單id, 訂單金額, 訂單時間
// val str = messageNum + "," + Random.nextInt(10)+","+Math.round(Random.nextDouble()*100)+","+ new Date().getTime
// val message = new ProducerRecord[String, String](topic,null,str)
// producer.send(message)
// }
//
// Thread.sleep(1000)
// }
//
// }
//
//}
// Produces some random words between 1 and 100.
object KafkaWordCountProducer {
def main(args: Array[String]) {
val topic = "orderTopic"
val brokers = "bigdata-senior02.ibeifeng.com:9092"
val messagesPerSec = 10
val wordsPerMessage = 5
val props = new util.HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
while(true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
消費數據
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
* my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
val zkQuorum = "bigdata-senior02.ibeifeng.com:2181"
val group = "g1"
val topics = "orderTopic"
val numThreads = 2
val conf= new SparkConf().setAppName("StatelessWordCount").setMaster("local[2]") // 核數至少給2,否則不會完成計算
val ssc = new StreamingContext(conf,Seconds(2)) // 兩秒進行一個批次
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val wc = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.foreachRDD(x=>x.foreach(println))
ssc.start()
ssc.awaitTermination()
}
}
以上就是spark streaming 消費 kafka的helloworld了
===============================================================================================================================
注意點:
1. 一定要選擇兼容的版本,否則會出現各種各樣奇奇怪怪的問題
2. 在這里卡了將近一周的時間,都是因為上面版本不兼容和導包的時候,出現的失誤
3. 如果代碼沒有問題,沒有出現運行時異常,看看版本的兼容性入手,或許更容易找到問題
代碼在我的github上,有問題請留言
https://github.com/nulijiushimeili/spark01