1、導入kafka所需要的包
在服務器上安裝kafka程序的時候,解壓后就有kafka需要的jar包,如下圖所示:

2、新建生產者類
1 package demo; 2 3 import java.util.Properties; 4 import java.util.concurrent.TimeUnit; 5 6 import kafka.javaapi.producer.Producer; 7 import kafka.producer.KeyedMessage; 8 import kafka.producer.ProducerConfig; 9 import kafka.serializer.StringEncoder; 10 11 public class ProducerDemo extends Thread { 12 13 //指定具體的topic 14 private String topic; 15 16 public ProducerDemo(String topic){ 17 this.topic = topic; 18 } 19 20 //每隔5秒發送一條消息 21 public void run(){ 22 //創建一個producer的對象 23 Producer producer = createProducer(); 24 //發送消息 25 int i = 1; 26 while(true){ 27 String data = "message " + i++; 28 //使用produer發送消息 29 producer.send(new KeyedMessage(this.topic, data)); 30 //打印 31 System.out.println("發送數據:" + data); 32 try { 33 TimeUnit.SECONDS.sleep(5); 34 } catch (Exception e) { 35 e.printStackTrace(); 36 } 37 } 38 } 39 40 //創建Producer的實例 41 private Producer createProducer() { 42 Properties prop = new Properties(); 43 //聲明zk 44 prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181"); 45 prop.put("serializer.class",StringEncoder.class.getName()); 46 //聲明Broker的地址 47 prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093"); 48 return new Producer(new ProducerConfig(prop)); 49 } 50 51 public static void main(String[] args) { 52 //啟動線程發送消息 53 new ProducerDemo("mydemo1").start(); 54 } 55 }
3、新建消費者類
1 package demo; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 9 import kafka.consumer.Consumer; 10 import kafka.consumer.ConsumerConfig; 11 import kafka.consumer.ConsumerIterator; 12 import kafka.consumer.KafkaStream; 13 import kafka.javaapi.consumer.ConsumerConnector; 14 15 public class ConsumerDemo extends Thread { 16 17 //指定具體的topic 18 private String topic; 19 20 public ConsumerDemo(String topic){ 21 this.topic = topic; 22 } 23 24 public void run(){ 25 //構造一個consumer的對象 26 ConsumerConnector consumer = createConsumer(); 27 //構造一個Map對象,代表topic 28 //String: topic的名稱 Integer: 從這個topic中獲取多少條記錄 29 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 30 //一次從這個topic中獲取一條記錄 31 topicCountMap.put(this.topic, 1); 32 //構造一個messageStream:輸入流 33 //String: topic的名稱 List: 獲取的數據 34 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); 35 //獲取每次接受到的具體的數據 36 KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0); 37 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 38 while(iterator.hasNext()){ 39 String message = new String(iterator.next().message()); 40 System.out.println("接受數據:" + message); 41 } 42 } 43 44 //創建具體的consumer 45 private ConsumerConnector createConsumer() { 46 Properties prop = new Properties(); 47 //指明zk的地址 48 prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181"); 49 //指明這個consumer的消費組 50 prop.put("group.id", "group1"); 51 //時間設置的過小可能會連接超時。。。 52 prop.put("zookeeper.connection.timeout.ms", "60000"); 53 return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop)); 54 } 55 56 public static void main(String[] args) { 57 new ConsumerDemo("mydemo1").start(); 58 } 59 60 }
運行程序如下:

注意:
1、在消費者的類中,時間要設置長一些,否則可能出現連接超時的錯誤(我就出現了。。。)
2、直接關閉生產者和消費者窗口,重新打開消費者窗口,會有重復數據。。。目前還沒找到解決辦法。。。
