Java API方式調用Kafka各種協議


  眾所周知,Kafka自己實現了一套二進制協議(binary protocol)用於各種功能的實現,比如發送消息,獲取消息,提交位移以及創建topic等。具體協議規范參見:Kafka協議  這套協議的具體使用流程為:

  1. 客戶端創建對應協議的請求
  2. 客戶端發送請求給對應的broker
  3. broker處理請求,並發送response給客戶端

  雖然Kafka提供的大量的腳本工具用於各種功能的實現,但很多時候我們還是希望可以把某些功能以編程的方式嵌入到另一個系統中。這時使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個范例,同時也會針對“創建topic”和“查看位移”這兩個主要功能給出對應的例子。 需要提前說明的是,本文給出的范例並沒有考慮Kafka集群開啟安全的情況。另外Kafka的KIP4應該一直在優化命令行工具以及各種管理操作,有興趣的讀者可以關注這個KIP。

  本文中用到的API依賴於kafka-clients,所以如果你使用Maven構建的話,請加上:

 

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

 

如果是gradle,請加上:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底層框架

 1 /**
 2      * 發送請求主方法
 3      * @param host          目標broker的主機名
 4      * @param port          目標broker的端口
 5      * @param request       請求對象
 6      * @param apiKey        請求類型
 7      * @return              序列化后的response
 8      * @throws IOException
 9      */
10     public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
11         Socket socket = connect(host, port);
12         try {
13             return send(request, apiKey, socket);
14         } finally {
15             socket.close();
16         }
17     }
18 
19     /**
20      * 發送序列化請求並等待response返回
21      * @param socket            連向目標broker的socket
22      * @param request           序列化后的請求
23      * @return                  序列化后的response
24      * @throws IOException
25      */
26     private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
27         sendRequest(socket, request);
28         return getResponse(socket);
29     }
30 
31     /**
32      * 發送序列化請求給socket
33      * @param socket            連向目標broker的socket
34      * @param request           序列化后的請求
35      * @throws IOException
36      */
37     private void sendRequest(Socket socket, byte[] request) throws IOException {
38         DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
39         dos.writeInt(request.length);
40         dos.write(request);
41         dos.flush();
42     }
43 
44     /**
45      * 從給定socket處獲取response
46      * @param socket            連向目標broker的socket
47      * @return                  獲取到的序列化后的response
48      * @throws IOException
49      */
50     private byte[] getResponse(Socket socket) throws IOException {
51         DataInputStream dis = null;
52         try {
53             dis = new DataInputStream(socket.getInputStream());
54             byte[] response = new byte[dis.readInt()];
55             dis.readFully(response);
56             return response;
57         } finally {
58             if (dis != null) {
59                 dis.close();
60             }
61         }
62     }
63 
64     /**
65      * 創建Socket連接
66      * @param hostName          目標broker主機名
67      * @param port              目標broker服務端口, 比如9092
68      * @return                  創建的Socket連接
69      * @throws IOException
70      */
71     private Socket connect(String hostName, int port) throws IOException {
72         return new Socket(hostName, port);
73     }
74 
75     /**
76      * 向給定socket發送請求
77      * @param request       請求對象
78      * @param apiKey        請求類型, 即屬於哪種請求
79      * @param socket        連向目標broker的socket
80      * @return              序列化后的response
81      * @throws IOException
82      */
83     private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
84         RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
85         ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
86         header.writeTo(buffer);
87         request.writeTo(buffer);
88         byte[] serializedRequest = buffer.array();
89         byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
90         ByteBuffer responseBuffer = ByteBuffer.wrap(response);
91         ResponseHeader.parse(responseBuffer);
92         return responseBuffer;
93     }

  有了這些方法的鋪墊,我們就可以創建具體的請求了。

創建topic

 1 /**
 2      * 創建topic
 3      * 由於只是樣例代碼,有些東西就硬編碼寫到程序里面了(比如主機名和端口),各位看官自行修改即可
 4      * @param topicName             topic名
 5      * @param partitions            分區數
 6      * @param replicationFactor     副本數
 7      * @throws IOException
 8      */
 9     public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
10         Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
11         // 插入多個元素便可同時創建多個topic
12         topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
13         int creationTimeoutMs = 60000;
14         CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
15         ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
16         CreateTopicsResponse.parse(response, request.version());
17     }

查看位移

 1 /**
 2      * 獲取某個consumer group下的某個topic分區的位移
 3      * @param groupID           group id
 4      * @param topic             topic名
 5      * @param parititon         分區號
 6      * @throws IOException
 7      */
 8     public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
 9         TopicPartition tp = new TopicPartition(topic, parititon);
10         OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
11                 .setVersion((short)2).build();
12         ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
13         OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
14         OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
15         System.out.println(partitionData.offset);
16     }
 1 /**
 2      * 獲取某個consumer group下所有topic分區的位移信息
 3      * @param groupID           group id
 4      * @return                  (topic分區 --> 分區信息)的map
 5      * @throws IOException
 6      */
 7     public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
 8         OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
 9         ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
10         OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
11         return resp.responseData();
12     }

okay, 上面就是“創建topic”和“查看位移”的樣例代碼,各位看官可以參考着這兩個例子構建其他類型的請求。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM