(08)java程序連接kafka示例


  1、導入kafka所需要的包

  在服務器上安裝kafka程序的時候,解壓后就有kafka需要的jar包,如下圖所示:

  2、新建生產者類

 1 package demo;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 import kafka.javaapi.producer.Producer;
 7 import kafka.producer.KeyedMessage;
 8 import kafka.producer.ProducerConfig;
 9 import kafka.serializer.StringEncoder;
10 
11 public class ProducerDemo extends Thread {
12     
13     //指定具體的topic
14     private String topic;
15     
16     public ProducerDemo(String topic){
17         this.topic = topic;
18     }
19     
20     //每隔5秒發送一條消息
21     public void run(){
22         //創建一個producer的對象
23         Producer producer = createProducer();
24         //發送消息
25         int i = 1;
26         while(true){
27             String data = "message " + i++;
28             //使用produer發送消息
29             producer.send(new KeyedMessage(this.topic, data));
30             //打印
31             System.out.println("發送數據:" + data);
32             try {
33                 TimeUnit.SECONDS.sleep(5);
34             } catch (Exception e) {
35                 e.printStackTrace();
36             }
37         }
38     }
39     
40     //創建Producer的實例
41     private Producer createProducer() {
42         Properties prop = new Properties();
43         //聲明zk
44         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
45         prop.put("serializer.class",StringEncoder.class.getName());
46         //聲明Broker的地址
47         prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093");
48         return new Producer(new ProducerConfig(prop));
49     }
50     
51     public static void main(String[] args) {
52         //啟動線程發送消息
53         new ProducerDemo("mydemo1").start();
54     }
55 }

  3、新建消費者類

 1 package demo;
 2 
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 
 8 
 9 import kafka.consumer.Consumer;
10 import kafka.consumer.ConsumerConfig;
11 import kafka.consumer.ConsumerIterator;
12 import kafka.consumer.KafkaStream;
13 import kafka.javaapi.consumer.ConsumerConnector;
14 
15 public class ConsumerDemo extends Thread {
16 
17     //指定具體的topic
18     private String topic;
19     
20     public ConsumerDemo(String topic){
21         this.topic = topic;
22     }
23     
24     public void run(){
25         //構造一個consumer的對象
26         ConsumerConnector consumer = createConsumer();
27         //構造一個Map對象,代表topic
28         //String: topic的名稱  Integer: 從這個topic中獲取多少條記錄
29         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
30         //一次從這個topic中獲取一條記錄
31         topicCountMap.put(this.topic, 1);
32         //構造一個messageStream:輸入流
33         //String: topic的名稱 List: 獲取的數據
34         Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
35         //獲取每次接受到的具體的數據
36         KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0);
37         ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
38         while(iterator.hasNext()){
39             String message = new String(iterator.next().message());
40             System.out.println("接受數據:" + message);
41         }
42     }
43     
44     //創建具體的consumer
45     private ConsumerConnector createConsumer() {
46         Properties prop = new Properties();
47         //指明zk的地址
48         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
49         //指明這個consumer的消費組
50         prop.put("group.id", "group1");
51         //時間設置的過小可能會連接超時。。。
52         prop.put("zookeeper.connection.timeout.ms", "60000");
53         return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
54     }
55 
56     public static void main(String[] args) {
57         new ConsumerDemo("mydemo1").start();
58     }
59 
60 }

  運行程序如下:

   注意:

  1、在消費者的類中,時間要設置長一些,否則可能出現連接超時的錯誤(我就出現了。。。)

  2、直接關閉生產者和消費者窗口,重新打開消費者窗口,會有重復數據。。。目前還沒找到解決辦法。。。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM