Kafka開發環境搭建(五)


如果你要利用代碼來跑kafka的應用,那你最好先把官網給出的example先在單機環境和分布式環境下跑通,然后再逐步將原有的consumer、producer和broker替換成自己寫的代碼。所以在閱讀這篇文章前你需要具備以下前提:

1.  簡單了解kafka功能,理解kafka的分布式原理

2.  能在分布式環境下成功運行—topic test。

如果你還沒有完成上述兩個前提,請先看:

kafka分布式初步      kafka搭建分布式環境

接下來我們就簡單介紹下kafka開發環境的搭建。

1.    搭建scala環境,這里有兩種方案,第一直接去scala官網下載SDK,解壓配置環境變量;第二種辦法,你可以不用安裝SDK,直接在項目中引用kafka編譯后下載下來的scala編譯包和編碼包(scala-compiler.jar和scala-library.jar)。我推薦第二種,因為通過kafka編譯下載下來的scala版本和kafka版本都是匹配的(但是有時候可能會跟eclipse的插件需要的環境沖突,所以最好把第一種也安裝一下,以防萬一),而一般我們用的是java項目來寫,所以直接導入相關依賴包就可以了,第一種方案有助於我們看源碼和用scala開發。這些jar的路徑位於kafka-0.7.2-incubating-src\javatest\lib目錄下。

2.    為eclipse安裝scala開發環境。這只是一個插件,可以在:http://scala-ide.org/中下載安裝或者在線安裝。裝完之后,你就能在eclipse中創建scala的項目了。

我們可以先寫一個hello world試一下,這樣一來,是不是又多了一種語言寫hello world了。

有些人new的時候找不到scala相關的類,那是因為你eclipse的perspective不對,切換到scala的perspective下就可以了。注意new的是object,然后輸入:

?
1
2
3
4
5
6
7
8
package com.a 2 .kafka.scala.test
 
object Hello {
 
   def main(args : Array[String]) : Unit = {
     printf( "Hello Scala!!" );
   }
}

3.     找到編碼需要的依賴包。記住去你linux上經過update的kafka文件夾里找,不要從直接從官網上下載的文件里找。具體路徑是:kafka-0.7.2-incubating-src\javatest\lib 這是你用java開發kafka相關程序用到的最基礎的包,如果你用到了hadoop,只要去相關的文件夾找一下就可以了。然后把這些包加到項目里即可。

到了這里,基本的開發環境應該是搭建完了,然后我們要開始寫點兒簡單的代碼了。我們還是根據之前《分布式環境搭建》中給出的例子。稍微回憶下:

1.   啟動zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties  & (用&是為了能退出命令行)

2.   啟動kafka server:  bin/kafka-server-start.sh ../config/server.properties  &

3.    Kafka為我們提供了一個console來做連通性測試,下面我們先運行producer:bin/kafka-console-producer.sh --zookeeper 192.168.10.11:2181 --topic test 這是相當於開啟了一個producer的命令行。

4.   接下來運行consumer,新啟一個terminal:bin/kafka-console-consumer.sh --zookeeper 192.168.10.11:2181 --topic test --from-beginning

5.    執行完consumer的命令后,你可以在producer的terminal中輸入信息,馬上在consumer的terminal中就會出現你輸的信息。

這個例子就是在分布式的環境下producer生產數據,然后consumer從broker抓取數據顯示在console上。當然注意一點server.properties中的hostname需要換成你的對應地址,具體可以回去看《分布式環境搭建》。現在我們就用代碼來模擬producer發送數據的過程:

這里我們建一個java project就可以了,導入依賴包。kafka-0.7.2-incubating-src\javatest\lib目錄下的jar.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.a2.test.kafka;
 
import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
 
import kafka.producer.ProducerConfig;
 
public class Producertest {
     public static void main(String[] args) {
         Properties props = new Properties();
         props.put( "zk.connect" , "192.168.10.11:2181" );
         props.put( "serializer.class" , "kafka.serializer.StringEncoder" );
         ProducerConfig config = new ProducerConfig(props);
         Producer<String, String> producer = new Producer<String, String>(config);
         ProducerData<String, String> data = new ProducerData<String, String>( "test" , "Hello" );
         producer.send(data);
     }
}

這樣我們就用代碼代替了console去發送信息了,這里我們用了utils中的properties對象直接構建了配置,而不是直接讀取,當然你也可以讀配置。Data里的兩個參數,第一個是指定topic,第二個是發送的內容。

接下來就是運行了,如果你是在windows下,可能會等待很久然后報Unable to connect to zookeeper server within timeout: 6000,這個可能是網卡的原因,你可以直接放到linux上,然后用命令行運行,注意引包。

?
1
Java –jar test.jar  Dclasspath=/lib

Consumer的代碼以及實現和producer差不多,如果你感興趣可以去官網找相關的代碼,都很簡單。當然還有一部分關於producer和consumer的配置,我們下篇再說。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM