開發簡單的Kafka應用


  之前基於集群和單機安裝過kafka,現在利用kafka提供的API構建一個簡單的生產者消費者的項目示例,來跑通kafka的流程,具體過程如下:

  首先使用eclipse for javaee建立一個maven項目,然后在pom.xml添加如下依賴配置:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.2.2</version>
    </dependency>

  這里kafka版本是kafka_2.9.2-0.8.2.2,保存之后maven會自動下載依賴,注意要關閉windows防火牆,盡量專用網絡和外網都要關閉,否則下載的很慢,下載好之后就可以編寫項目代碼了,這里的pom.xml所有配置如下:

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4 
 5   <groupId>kafkatest</groupId>
 6   <artifactId>kafkatest</artifactId>
 7   <version>0.0.1-SNAPSHOT</version>
 8   <packaging>jar</packaging>
 9 
10   <name>kafkatest</name>
11   <url>http://maven.apache.org</url>
12 
13   <properties>
14     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15   </properties>
16 
17   <dependencies>
18     <dependency>
19       <groupId>junit</groupId>
20       <artifactId>junit</artifactId>
21       <version>3.8.1</version>
22       <scope>test</scope>
23     </dependency>
24     <dependency>
25         <groupId>org.apache.kafka</groupId>
26         <artifactId>kafka_2.9.2</artifactId>
27         <version>0.8.2.2</version>
28     </dependency>
29   </dependencies>
30 </project>

  然后,我們建立一個簡單生產者類SimpleProducer,代碼如下:

 1 package test;
 2 
 3 import java.util.Properties;
 4 
 5 import kafka.javaapi.producer.Producer;
 6 import kafka.producer.KeyedMessage;
 7 import kafka.producer.ProducerConfig;
 8 
 9 public class SimpleProducer {
10         private static Producer<Integer,String> producer;
11         private final Properties props=new Properties();
12         public SimpleProducer(){
13                 //定義連接的broker list
14                 props.put("metadata.broker.list", "192.168.1.216:9092");
15                 //定義序列化類 Java中對象傳輸之前要序列化
16                 props.put("serializer.class", "kafka.serializer.StringEncoder");
17                 producer = new Producer<Integer, String>(new ProducerConfig(props));
18         }
19         public static void main(String[] args) {
20                 SimpleProducer sp=new SimpleProducer();
21                 //定義topic
22                 String topic="mytopic";
23                 
24                 //定義要發送給topic的消息
25                 String messageStr = "This is a message";
26                 
27                 //構建消息對象
28                 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
29          
30                 //推送消息到broker
31                 producer.send(data);
32                 producer.close();
33         }
34 }

  類的代碼很簡單,我這里是kafka單機環境端口就是kafka broker端口9092,這里定義topic為mytopic當然可以自己隨便定義不用考慮服務器是否創建,對於發送消息的話上面代碼是簡單的單條發送,如果發送數據量很大的話send方法多次推送會耗費時間,所以建議把data數據按一定量分組放到List中,最后send一下AarrayList即可,這樣速度會大幅度提高

  接下來寫一個簡單的消費者類SimpleHLConsumer,代碼如下:

 1 package test;
 2 
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 
 8 import kafka.consumer.Consumer;
 9 import kafka.consumer.ConsumerConfig;
10 import kafka.consumer.ConsumerIterator;
11 import kafka.consumer.KafkaStream;
12 import kafka.javaapi.consumer.ConsumerConnector;
13 
14 public class SimpleHLConsumer {
15         private final ConsumerConnector consumer;
16         private final String topic;
17 
18         public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
19                 Properties props = new Properties();
20                 //定義連接zookeeper信息
21                 props.put("zookeeper.connect", zookeeper);
22                 //定義Consumer所有的groupID
23                 props.put("group.id", groupId);
24                 props.put("zookeeper.session.timeout.ms", "500");
25                 props.put("zookeeper.sync.time.ms", "250");
26                 props.put("auto.commit.interval.ms", "1000");
27                 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
28                 this.topic = topic;
29         }
30 
31         public void testConsumer() {
32                 Map<String, Integer> topicCount = new HashMap<String, Integer>();
33                 //定義訂閱topic數量
34                 topicCount.put(topic, new Integer(1));
35                 //返回的是所有topic的Map
36                 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
37                 //取出我們要需要的topic中的消息流
38                 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
39                 for (final KafkaStream stream : streams) {
40                         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
41                         while (consumerIte.hasNext())
42                                 System.out.println("Message from Topic :" + new String(consumerIte.next().message()));
43                 }
44                 if (consumer != null)
45                         consumer.shutdown();
46         }
47 
48         public static void main(String[] args) {
49                 String topic = "mytopic";
50                 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.1.216:2181/kafka", "testgroup", topic);
51                 simpleHLConsumer.testConsumer();
52         }
53 
54 }

  消費者代碼主要邏輯就是對生產者發送過來的數據做簡單處理和輸出,注意這里的地址是zookeeper的地址並且包括節點/kafka,topic名稱要一致

  上面兩個類已經可以實現消息的生產和消費了,但是現在服務器需要做一定的配置才可以,否則會拋出異常,就是在之前配置的server.properties基礎之上進行修改,進入kafka安裝目錄下,使用命令 vim config/server.properties 打開配置文件,找到host.name這個配置,首先去掉前面的#注釋,然后把默認的localhost改成IP地址192.168.1.216,因為eclipse遠程運行代碼時讀取到localhost再執行時就是提交到本地了,所以會拋出異常,當然把代碼打成jar包在服務器運行就不會出現這樣的問題了,這里要注意:

  

  修改之后保存並退出,然后確保zookeeper的正常運行

  如果之前kafka正在運行,那么就執行 bin/kafka-server-stop.sh  停止kafka服務,然后再執行

   nohup bin/kafka-server-start.sh config/server.properties >> /dev/null & 啟動服務,如果原來就是停止的,那么直接啟動即可

  啟動之后先運行啟動消費者,消費者處於運行等待

  

  然后啟動生產者發送消息,生產者發送完成立即關閉,消費者消費輸出如下:

  

  到這里,就完成了kafka從生產到消費簡單示例的開發,消息隊列可以跑通了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM