Java編寫程序將數據存入Kafka中


Kafka是一個類似於RabbitMQ的消息系統,它的主要功能是消息的發布和訂閱、處理和存儲。

1.它類似於一個消息系統,讀寫流式的數據。

2.編寫可擴展的流應用處理程序,用於實時事件響應的場景。

3.安全的將流式的數據存儲在一個分布式,有副本備份,容錯的集群。

 

本篇博文主要介紹如何使用Java編寫程序將數據寫入到Kafka中,即Kafka生產者,並不涉及Kafka消費者。另外,像Spark,Storm等都有相應的程序從Kafka消費者中獲取數據的方法,直接調用即可。

 

Kafka的運行需要Zookeeper的幫助,所以,需要先安裝Zookeeper。

 

1.先啟動Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

 再啟動Kafka服務器:

bin/kafka-server-start.sh config/server.properties

2.創建一個Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 顯示topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

 也可以在程序中進行topic的創建。

3.發送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4.接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 

 

 

下面,是本次的程序:

 1 import org.apache.kafka.clients.producer.*;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.ExecutionException;
 5 
 6 public class MyProducer extends Thread {
 7     private final KafkaProducer<Integer, String> producer;
 8     private final String topic;
 9     private final Boolean isAsync;
10 
11     public MyProducer(String topic, Boolean isAsync) {
12         Properties prop = new Properties();
13         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
14         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
15         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
16         producer = new KafkaProducer<Integer, String>(prop);
17         this.topic = topic;
18         this.isAsync = isAsync;
19     }
20 
21     public void run() {
22         int messageNo = 1;
23         while (true) {
24             String messageStr = "Message_" + messageNo;
25             long startTime = System.currentTimeMillis();
26             if (isAsync) {
27                 producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallback(startTime, messageNo, messageStr));
28             } else {
29                 try {
30                     producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get();
31                     System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
32                 } catch (InterruptedException | ExecutionException e) {
33                     e.printStackTrace();
34                 }
35             }
36             ++messageNo;
37         }
38     }
39     public static void main(String[] args) {
40         boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
41 
42         MyProducer producerThread = new MyProducer("test", isAsync);
43         producerThread.start();
44 
45     }
46 }
47 
48 class DemoCallback implements Callback {
49     private final long startTime;
50     private final int key;
51     private final String message;
52 
53     public DemoCallback(long startTime, int key, String message) {
54         this.startTime = startTime;
55         this.key = key;
56         this.message = message;
57     }
58 
59     public void onCompletion(RecordMetadata metadata, Exception exception) {
60         long elapsedTime = System.currentTimeMillis() - startTime;
61         if (metadata != null) {
62             System.out.println(
63                     "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
64                             "), " +
65                             "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
66         } else {
67             exception.printStackTrace();
68         }
69     }
70 }

 

好了,完成!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM