[comment]: # Spark集群 + Akka + Kafka + Scala 開發(4) : 開發一個Kafka + Spark的應用
前言
在Spark集群 + Akka + Kafka + Scala 開發(1) : 配置開發環境中,我們已經部署好了一個Spark的開發環境。
在Spark集群 + Akka + Kafka + Scala 開發(2) : 開發一個Spark應用中,我們已經寫好了一個Spark的應用。
本文的目標是寫一個基於kafka的scala工程,在一個spark standalone的集群環境中運行。
項目結構和文件說明
說明
這個工程包含了兩個應用。
一個Consumer應用:CusomerApp - 實現了通過Spark的Stream+Kafka的技術來實現處理消息的功能。
一個Producer應用:ProducerApp - 實現了向Kafka集群發消息的功能。
文件結構
KafkaSampleApp # 項目目錄
|-- build.bat # build文件
|-- src
|-- main
|-- scala
|-- ConsumerApp.scala # Consumer應用
|-- ProducerApp.scala # Producer應用
構建工程目錄
可以運行:
mkdir KafkaSampleApp
mkdir -p /KafkaSampleApp/src/main/scala
代碼
build.sbt
name := "kafka-sample-app"
version := "1.0"
scalaVersion := "2.11.8"
scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.0",
"org.apache.spark" %% "spark-streaming" % "2.0.0",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0",
"org.apache.kafka" %% "kafka" % "0.8.2.1"
)
CusomerApp.scala
這個例子中使用了Spark自帶的Stream+Kafka結合的技術,有個限制的綁定了kafka的8.x版本。
我個人建議只用Kafka的技術,寫一個Consomer,或者使用其自帶的Consumer,來接受消息。
然后再使用Spark的技術。
這樣可以跳過對kafak版本的限制。
import java.util.Properties
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object ConsumerApp {
def main(args: Array[String]) {
val brokers = "localhost:9092"
val topics = "test-topic"
// Create context with 10 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
println("============== Start ==============")
wordCounts.print
println("============== End ==============")
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
ProducerApp.scala
import java.util.Arrays
import java.util.List
import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerApp {
def main(args: Array[String]): Unit = {
val props = new Properties()
// Must-have properties
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Optional properties
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none")
props.put(ProducerConfig.SEND_BUFFER_CONFIG, (1024*100).toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, (100).toString)
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, (5*60*1000L).toString)
//props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, (60*1000l).toString)
props.put(ProducerConfig.ACKS_CONFIG, (0).toString)
//props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, (1500).toString)
props.put(ProducerConfig.RETRIES_CONFIG, (3).toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, (1000).toString)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (32 * 1024 * 1024L).toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, (200).toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-app-producer")
val producer = new KafkaProducer[String, String](props)
// Thread hook to close produer
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
// send 10 messages
var i = 0
for( i <- (1 to 10)) {
val data = new ProducerRecord[String, String]("test-topic", "test-key", s"test-message $i")
producer.send(data)
}
// Reduce package lost
Thread.sleep(1000)
producer.close()
}
}
構建工程
進入目錄KafkaSampleApp。運行:
sbt package
第一次運行時間會比較長。
測試應用
啟動Kafka服務
# Start zookeeper server
gnome-terminal -x sh -c '$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties; bash'
# Wait zookeeper server is started.
sleep 8s
# Start kafka server
gnome-terminal -x sh -c '$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties; bash'
# Wait kafka server is started.
sleep 5s
注:使用Ctrl+C可以中斷服務。
- 創建一個topic
# Create a topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
# List topics
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動Spark服務
- 啟動spark集群master server
$SPARK_HOME/sbin/start-master.sh
master服務,默認會使用
7077
這個端口。可以通過其日志文件查看實際的端口號。
- 啟動spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
啟動Consumer應用
新起一個終端,來運行:
$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --master spark://$(hostname):7077 --class ConsumerApp target/scala-2.11/kafka-sample-app_2.11-1.0.jar
注:如果定義的topic沒有create,第一次運行會失敗,再運行一次就好了。
如果出現java.lang.NoClassDefFoundError錯誤,
請參照Spark集群 + Akka + Kafka + Scala 開發(1) : 配置開發環境,
確保kafka的包在Spark中設置好了。
啟動Producer應用
java -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp
# or
# $KAFKA_HOME/bin/kafka-run-class.sh -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp
然后:看看Consumer應用是否收到了消息。
總結
建議寫一個Kafka的Consumer,然后調用Spark功能,而不是使用Spark的Stream+Kafka的編程方式。
好處是可以使用最新版本的Kafka。
Kafka的包中帶有一個Sample代碼,可以從中學習一些編寫程序的方法。