如果你要利用代碼來跑kafka的應用,那你最好先把官網給出的example先在單機環境和分布式環境下跑通,然后再逐步將原有的consumer、producer和broker替換成自己寫的代碼。所以在閱讀這篇文章前你需要具備以下前提:
1. 簡單了解kafka功能,理解kafka的分布式原理
2. 能在分布式環境下成功運行—topic test。
如果你還沒有完成上述兩個前提,請先看:
接下來我們就簡單介紹下kafka開發環境的搭建。
1. 搭建scala環境,這里有兩種方案,第一直接去scala官網下載SDK,解壓配置環境變量;第二種辦法,你可以不用安裝SDK,直接在項目中引用kafka編譯后下載下來的scala編譯包和編碼包(scala-compiler.jar和scala-library.jar)。我推薦第二種,因為通過kafka編譯下載下來的scala版本和kafka版本都是匹配的(但是有時候可能會跟eclipse的插件需要的環境沖突,所以最好把第一種也安裝一下,以防萬一),而一般我們用的是java項目來寫,所以直接導入相關依賴包就可以了,第一種方案有助於我們看源碼和用scala開發。這些jar的路徑位於kafka-0.7.2-incubating-src\javatest\lib目錄下。
2. 為eclipse安裝scala開發環境。這只是一個插件,可以在:http://scala-ide.org/中下載安裝或者在線安裝。裝完之后,你就能在eclipse中創建scala的項目了。
我們可以先寫一個hello world試一下,這樣一來,是不是又多了一種語言寫hello world了。
有些人new的時候找不到scala相關的類,那是因為你eclipse的perspective不對,切換到scala的perspective下就可以了。注意new的是object,然后輸入:
1
2
3
4
5
6
7
8
|
package
com.a
2
.kafka.scala.test
object
Hello {
def
main(args
:
Array[String])
:
Unit
=
{
printf(
"Hello Scala!!"
);
}
}
|
3. 找到編碼需要的依賴包。記住去你linux上經過update的kafka文件夾里找,不要從直接從官網上下載的文件里找。具體路徑是:kafka-0.7.2-incubating-src\javatest\lib 這是你用java開發kafka相關程序用到的最基礎的包,如果你用到了hadoop,只要去相關的文件夾找一下就可以了。然后把這些包加到項目里即可。
到了這里,基本的開發環境應該是搭建完了,然后我們要開始寫點兒簡單的代碼了。我們還是根據之前《分布式環境搭建》中給出的例子。稍微回憶下:
1. 啟動zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties & (用&是為了能退出命令行)
2. 啟動kafka server: bin/kafka-server-start.sh ../config/server.properties &
3. Kafka為我們提供了一個console來做連通性測試,下面我們先運行producer:bin/kafka-console-producer.sh --zookeeper 192.168.10.11:2181 --topic test 這是相當於開啟了一個producer的命令行。
4. 接下來運行consumer,新啟一個terminal:bin/kafka-console-consumer.sh --zookeeper 192.168.10.11:2181 --topic test --from-beginning
5. 執行完consumer的命令后,你可以在producer的terminal中輸入信息,馬上在consumer的terminal中就會出現你輸的信息。
這個例子就是在分布式的環境下producer生產數據,然后consumer從broker抓取數據顯示在console上。當然注意一點server.properties中的hostname需要換成你的對應地址,具體可以回去看《分布式環境搭建》。現在我們就用代碼來模擬producer發送數據的過程:
這里我們建一個java project就可以了,導入依賴包。kafka-0.7.2-incubating-src\javatest\lib目錄下的jar.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package
com.a2.test.kafka;
import
java.util.Properties;
import
kafka.javaapi.producer.Producer;
import
kafka.javaapi.producer.ProducerData;
import
kafka.producer.ProducerConfig;
public
class
Producertest {
public
static
void
main(String[] args) {
Properties props =
new
Properties();
props.put(
"zk.connect"
,
"192.168.10.11:2181"
);
props.put(
"serializer.class"
,
"kafka.serializer.StringEncoder"
);
ProducerConfig config =
new
ProducerConfig(props);
Producer<String, String> producer =
new
Producer<String, String>(config);
ProducerData<String, String> data =
new
ProducerData<String, String>(
"test"
,
"Hello"
);
producer.send(data);
}
}
|
這樣我們就用代碼代替了console去發送信息了,這里我們用了utils中的properties對象直接構建了配置,而不是直接讀取,當然你也可以讀配置。Data里的兩個參數,第一個是指定topic,第二個是發送的內容。
接下來就是運行了,如果你是在windows下,可能會等待很久然后報Unable to connect to zookeeper server within timeout: 6000,這個可能是網卡的原因,你可以直接放到linux上,然后用命令行運行,注意引包。
1
|
Java –jar test.jar Dclasspath=/lib
|
Consumer的代碼以及實現和producer差不多,如果你感興趣可以去官網找相關的代碼,都很簡單。當然還有一部分關於producer和consumer的配置,我們下篇再說。