[Kafka] - Kafka Java Consumer實現(一)


Kafka提供了兩種Consumer API,分別是:High Level Consumer APILower Level Consumer API(Simple Consumer API)

High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取數據、更新offset、設置偏移量等操作屏蔽掉,直接將操作數據流的處理工作提供給編寫程序的人員。優點是:操作簡單;缺點:可操作性太差,無法按照自己的業務場景選擇處理方式。(入口類:ConsumerConnector)

Lower Level Consumer API:通過直接操作底層API獲取數據的方式獲取Kafka中的數據,需要自行給定分區、偏移量等屬性。優點:可操作性強;缺點:代碼相對而言比較復雜。(入口類:SimpleConsumer) 

這里主要將Lower Level Consumer API使用Java代碼實現並測試:

Hight Level Consumer API詳見博客:[Kafka] - Kafka Java Consumer實現(二)

===============================================================

一、KafkaBrokerInfo:自定義bean類,主要功能保存連接kafka的broker的元數據,比如host&port;代碼如下:

/**
 * Kafka服務器連接參數
 * Created by gerry on 12/21.
 */
public class KafkaBrokerInfo {
    // 主機名
    public final String brokerHost;
    // 端口號
    public final int brokerPort;

    /**
     * 構造方法
     *
     * @param brokerHost Kafka服務器主機或者IP地址
     * @param brokerPort 端口號
     */
    public KafkaBrokerInfo(String brokerHost, int brokerPort) {
        this.brokerHost = brokerHost;
        this.brokerPort = brokerPort;
    }

    /**
     * 構造方法, 使用默認端口號9092進行構造
     *
     * @param brokerHost
     */
    public KafkaBrokerInfo(String brokerHost) {
        this(brokerHost, 9092);
    }
}

 

 

二、KafkaTopicPartitionInfo:自定義bean類,主要功能是保存讀取具體分區的信息,包括topic名稱和partition ID;代碼如下:

/**
 * Created by gerry on 02/22.
 */
public class KafkaTopicPartitionInfo {
    // 主題名稱
    public final String topic;
    // 分區id
    public final int partitionID;

    /**
     * 構造函數
     *
     * @param topic       主題名稱
     * @param partitionID 分區id
     */
    public KafkaTopicPartitionInfo(String topic, int partitionID) {
        this.topic = topic;
        this.partitionID = partitionID;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        KafkaTopicPartitionInfo that = (KafkaTopicPartitionInfo) o;

        if (partitionID != that.partitionID) return false;
        return topic != null ? topic.equals(that.topic) : that.topic == null;

    }

    @Override
    public int hashCode() {
        int result = topic != null ? topic.hashCode() : 0;
        result = 31 * result + partitionID;
        return result;
    }
}

 

 

三、JavaKafkaSimpleConsumerAPI:具體通過Kafka提供的LowerAPI操作Kafka的相關代碼,包括數據的讀取、偏移量的讀取、更新等操作;具體代碼如下:

import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.*;

/**
 * TODO: 添加必要的日志打印信息
 * Kafka Lower consumer api ==> Kafka Simple Consumer API
 * Created by gerry on 12/21.
 */
public class JavaKafkaSimpleConsumerAPI {
    // 最大重試次數
    private int maxRetryTimes = 5;
    // 重試間隔時間
    private long retryIntervalMillis = 1000;
    // 緩存Topic/Partition對應的Broker連接信息
    private Map<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>> replicaBrokers = new HashMap<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>>();

    /**
     * 運行入口
     *
     * @param maxReads           最多讀取記錄數量
     * @param topicPartitionInfo 讀取數據的topic分區信息
     * @param seedBrokers        連接topic分區的初始化連接信息
     * @throws Exception
     */
    public void run(long maxReads,
                    KafkaTopicPartitionInfo topicPartitionInfo,
                    List<KafkaBrokerInfo> seedBrokers) throws Exception {
        // 默認消費數據的偏移量是當前分區的最早偏移量值
        long whichTime = kafka.api.OffsetRequest.EarliestTime();

        // 構建client name及groupId
        String topic = topicPartitionInfo.topic;
        int partitionID = topicPartitionInfo.partitionID;
        String clientName = this.createClientName(topic, partitionID);
        String groupId = clientName;

        // 獲取當前topic分區對應的分區元數據(主要包括leader節點的連接信息)
        PartitionMetadata metadata = this.findLeader(seedBrokers, topic, partitionID);

        // 校驗元數據
        this.validatePartitionMetadata(metadata);


        // 連接leader節點構建具體的SimpleConsumer對象
        SimpleConsumer consumer = this.createSimpleConsumer(metadata.leader().host(),
                metadata.leader().port(), clientName);

        try {
            // 獲取當前topic、當前consumer的消費數據offset偏移量
            int times = 0;
            long readOffSet = -1;
            while (true) {
                readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, whichTime, clientName);
                if (readOffSet == -1) {
                    // 當返回為-1的時候,表示異常信息
                    if (times > this.maxRetryTimes) {
                        throw new RuntimeException("Fetch the last offset of those group:" + groupId + " occur exception");
                    }
                    // 先休眠,再重新構建Consumer連接
                    times++;
                    this.sleep();
                    consumer = this.createNewSimpleConsumer(consumer, topic, partitionID);
                    continue;
                }

                // 正常情況下,結束循環
                break;
            }
            System.out.println("The first read offset is:" + readOffSet);

            int numErrors = 0;
            boolean ever = maxReads <= 0;
            // 開始數據讀取操作循環,當maxReads為非正數的時候,一直讀取數據;當maxReads為正數的時候,最多讀取maxReads條數據
            while (ever || maxReads > 0) {
                // 構建獲取數據的請求對象, 給定獲取數據對應的topic、partition、offset以及每次獲取數據最多獲取條數
                kafka.api.FetchRequest request = new FetchRequestBuilder()
                        .clientId(clientName)
                        .addFetch(topic, partitionID, readOffSet, 100000)
                        .build();

                // 發送請求到Kafka,並獲得返回值
                FetchResponse response = consumer.fetch(request);

                // 如果返回對象表示存在異常,進行異常處理,並進行consumer重新連接的操作
                // 當異常連續出現次數超過5次的時候,程序拋出異常
                if (response.hasError()) {
                    String leaderBrokerHost = consumer.host();
                    numErrors++;
                    short code = response.errorCode(topic, partitionID);
                    System.out.println("Error fetching data from the Broker:" + leaderBrokerHost + " Reason:" + code);
                    if (numErrors > 5) break;
                    if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                        // 異常表示是offset異常,重新獲取偏移量即可
                        readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, kafka.api.OffsetRequest.LatestTime(), clientName);
                        continue;
                    }
                    consumer.close();
                    consumer = null;

                    // 重新創建一個SimpleConsumer對象
                    consumer = this.createNewSimpleConsumer(consumer, topic, partitionID);
                    continue;
                }
                // 重置失敗次數
                numErrors = 0;

                // 接收數據沒有異常,那么開始對數據進行具體操作,eg: 打印
                long numRead = 0;
                for (MessageAndOffset messageAndOffset : response.messageSet(topic, partitionID)) {
                    // 校驗偏移量
                    long currentOffset = messageAndOffset.offset();
                    if (currentOffset < readOffSet) {
                        System.out.println("Found and old offset:" + currentOffset + " Expection:" + readOffSet);
                        continue;
                    }

                    // 獲取下一個讀取數據開始的偏移量
                    readOffSet = messageAndOffset.nextOffset();

                    // 讀取數據的value
                    ByteBuffer payload = messageAndOffset.message().payload();

                    byte[] bytes = new byte[payload.limit()];
                    payload.get(bytes);
                    System.out.println(currentOffset + ": " + new String(bytes, "UTF-8"));
                    numRead++;
                    maxReads--;
                }

                // 更新偏移量
                consumer = this.updateOffset(consumer, topic, partitionID,
                        readOffSet, groupId, clientName, 0);

                // 如果沒有讀取數據,休眠一秒鍾
                if (numRead == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        // nothings
                    }
                }
            }

            System.out.println("執行完成....");
        } finally {
            // 關閉資源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (Exception e) {
                    // nothings
                }
            }
        }
    }

    /**
     * 驗證分區元數據,如果驗證失敗,直接拋出IllegalArgumentException異常
     *
     * @param metadata
     */
    private void validatePartitionMetadata(PartitionMetadata metadata) {
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting!!");
            throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting!!");
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting!!");
            throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting!!");
        }
    }

    /**
     * Finding the Lead Broker for a Topic and Partition<br/>
     * 獲取主題和分區對應的主Broker節點(即topic和分區id是給定參數的對應brokere節點的元數據)<br/>
     * 獲取方式:
     *
     * @param brokers     Kafka集群連接參數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092}
     * @param topic       topic名稱
     * @param partitionID 分區id
     * @return
     */
    public PartitionMetadata findLeader(
            List<KafkaBrokerInfo> brokers,
            String topic,
            int partitionID) {
        PartitionMetadata returnMetadata = null;

        for (KafkaBrokerInfo broker : brokers) {
            SimpleConsumer consumer = null;

            try {
                // 1. 創建簡單的消費者連接對象
                consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, 100000, 64 * 1024, "leaderLookUp");

                // 2. 構建獲取參數的Topic名稱參數集合
                List<String> topics = Collections.singletonList(topic);

                // 3. 構建請求參數
                TopicMetadataRequest request = new TopicMetadataRequest(topics);

                // 4. 請求數據,得到返回對象
                TopicMetadataResponse response = consumer.send(request);

                // 5. 獲取返回值
                List<TopicMetadata> metadatas = response.topicsMetadata();

                // 6. 遍歷返回值
                for (TopicMetadata metadata : metadatas) {
                    // 獲取當前metadata對應的分區
                    String currentTopic = metadata.topic();
                    if (topic.equalsIgnoreCase(currentTopic)) {
                        // 遍歷所有分區的原始數據 ==> 當前分區的元數據
                        for (PartitionMetadata part : metadata.partitionsMetadata()) {
                            if (part.partitionId() == partitionID) {
                                // 1. 找到對應的元數據
                                returnMetadata = part;

                                // 2. 更新備份節點的host數據
                                if (returnMetadata != null) {
                                    KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
                                    List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo);
                                    if (brokerInfos == null) {
                                        brokerInfos = new ArrayList<KafkaBrokerInfo>();
                                    } else {
                                        brokerInfos.clear();
                                    }

                                    for (Broker replica : returnMetadata.replicas()) {
                                        brokerInfos.add(new KafkaBrokerInfo(replica.host(), replica.port()));
                                    }

                                    this.replicaBrokers.put(topicPartitionInfo, brokerInfos);
                                    return returnMetadata;
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + broker.brokerHost + "] to find Leader for [" + topic + ", " + partitionID + "] Reason:" + e);
            } finally {
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (Exception e) {
                        // nothings
                    }
                }
            }
        }

        // 沒有找到,返回一個空值,默認情況下,不會返回該值
        return null;
    }

    /**
     * 獲取當前groupID對應的consumer在對應的topic和partition中對應的offset偏移量
     *
     * @param consumer    消費者
     * @param groupId     消費者分區id
     * @param topic       所屬的Topic
     * @param partitionID 所屬的分區ID
     * @param whichTime   用於判斷,當consumer從沒有消費數據的時候,從當前topic的Partition的那個offset開始讀取數據
     * @param clientName  client名稱
     * @return 正常情況下,返回非負數,當出現異常的時候,返回-1
     */
    public long getLastOffSet(SimpleConsumer consumer, String groupId,
                              String topic, int partitionID,
                              long whichTime, String clientName) {
        // 1. 從ZK中獲取偏移量,當zk的返回偏移量大於0的時候,表示是一個正常的偏移量
        long offset = this.getOffsetOfTopicAndPartition(consumer, groupId, clientName, topic, partitionID);
        if (offset > 0) {
            return offset;
        }

        // 2. 獲取當前topic當前分區的數據偏移量
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

        OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionID));
            return -1;
        }

        // 獲取偏移量
        long[] offsets = response.offsets(topic, partitionID);
        return offsets[0];
    }

    /**
     * 從保存consumer消費者offset偏移量的位置獲取當前consumer對應的偏移量
     *
     * @param consumer    消費者
     * @param groupId     Group Id
     * @param clientName  client名稱
     * @param topic       topic名稱
     * @param partitionID 分區id
     * @return
     */
    public long getOffsetOfTopicAndPartition(SimpleConsumer consumer, String groupId, String clientName, String topic, int partitionID) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
        List<TopicAndPartition> requestInfo = new ArrayList<TopicAndPartition>();
        requestInfo.add(topicAndPartition);
        OffsetFetchRequest request = new OffsetFetchRequest(groupId, requestInfo, 0, clientName);
        OffsetFetchResponse response = consumer.fetchOffsets(request);

        // 獲取返回值
        Map<TopicAndPartition, OffsetMetadataAndError> returnOffsetMetadata = response.offsets();
        // 處理返回值
        if (returnOffsetMetadata != null && !returnOffsetMetadata.isEmpty()) {
            // 獲取當前分區對應的偏移量信息
            OffsetMetadataAndError offset = returnOffsetMetadata.get(topicAndPartition);
            if (offset.error() == ErrorMapping.NoError()) {
                // 沒有異常,表示是正常的,獲取偏移量
                return offset.offset();
            } else {
                // 當Consumer第一次連接的時候(zk中不在當前topic對應數據的時候),會產生UnknownTopicOrPartitionCode異常
                System.out.println("Error fetching data Offset Data the Topic and Partition. Reason: " + offset.error());
            }
        }

        // 所有異常情況直接返回0
        return 0;
    }

    /**
     * 根據給定參數獲取一個新leader的分區元數據信息
     *
     * @param oldLeader
     * @param topic
     * @param partitionID
     * @return
     */
    private PartitionMetadata findNewLeaderMetadata(String oldLeader,
                                                    String topic,
                                                    int partitionID) {
        KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
        List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo);
        for (int i = 0; i < 3; i++) {
            boolean gotoSleep = false;
            PartitionMetadata metadata = this.findLeader(brokerInfos, topic, partitionID);
            if (metadata == null) {
                gotoSleep = true;
            } else if (metadata.leader() == null) {
                gotoSleep = true;
            } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // leader切換過程中
                gotoSleep = true;
            } else {
                return metadata;
            }

            if (gotoSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // nothings
                }
            }
        }

        System.out.println("Unable to find new leader after Broker failure. Exiting!!");
        throw new RuntimeException("Unable to find new leader after Broker failure. Exiting!!");
    }

    /**
     * 更新偏移量,當SimpleConsumer發生變化的時候,重新構造一個新的SimpleConsumer並返回
     *
     * @param consumer
     * @param topic
     * @param partitionID
     * @param readOffSet
     * @param groupId
     * @param clientName
     * @param times
     * @return
     * @throws RuntimeException 當更新失敗的情況下
     */
    private SimpleConsumer updateOffset(SimpleConsumer consumer, String topic, int partitionID, long readOffSet, String groupId, String clientName, int times) {
        // 構建請求對象
        Map<TopicAndPartition, OffsetAndMetadata> requestInfoMap = new HashMap<TopicAndPartition, OffsetAndMetadata>();
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
        requestInfoMap.put(topicAndPartition, new OffsetAndMetadata(readOffSet, OffsetAndMetadata.NoMetadata(), -1));
        kafka.javaapi.OffsetCommitRequest ocRequest = new OffsetCommitRequest(groupId, requestInfoMap, 0, clientName);
        // 提交修改偏移量的請求,並獲取返回值
        kafka.javaapi.OffsetCommitResponse response = consumer.commitOffsets(ocRequest);

        // 根據返回值進行不同的操作
        if (response.hasError()) {
            short code = response.errorCode(topicAndPartition);
            if (times > this.maxRetryTimes) {
                throw new RuntimeException("Update the Offset occur exception," +
                        " the current response code is:" + code);
            }

            if (code == ErrorMapping.LeaderNotAvailableCode()) {
                // 當異常code為leader切換情況的時候,重新構建consumer對象
                // 操作步驟:先休眠一段時間,再重新構造consumer對象,最后重試
                try {
                    Thread.sleep(this.retryIntervalMillis);
                } catch (InterruptedException e) {
                    // nothings
                }
                PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(),
                        topic, partitionID);
                this.validatePartitionMetadata(metadata);
                consumer = this.createSimpleConsumer(metadata.leader().host(),
                        metadata.leader().port(), clientName);
                // 重試
                consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1);
            }

            if (code == ErrorMapping.RequestTimedOutCode()) {
                // 當異常為請求超時的時候,進行重新請求
                consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1);
            }

            // 其他code直接拋出異常
            throw new RuntimeException("Update the Offset occur exception," +
                    " the current response code is:" + code);
        }

        // 返回修改后的consumer對象
        return consumer;
    }

    /**
     * 構建clientName根據主題名稱和分區id
     *
     * @param topic
     * @param partitionID
     * @return
     */
    private String createClientName(String topic, int partitionID) {
        return "client_" + topic + "_" + partitionID;
    }

    /**
     * 根據一個老的consumer,重新創建一個consumer對象
     *
     * @param consumer
     * @param topic
     * @param partitionID
     * @return
     */
    private SimpleConsumer createNewSimpleConsumer(SimpleConsumer consumer, String topic, int partitionID) {
        // 重新獲取新的leader節點
        PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(),
                topic, partitionID);
        // 校驗元數據
        this.validatePartitionMetadata(metadata);
        // 重新創建consumer的連接
        return this.createSimpleConsumer(metadata.leader().host(),
                metadata.leader().port(), consumer.clientId());
    }

    /**
     * 構建一個SimpleConsumer並返回
     *
     * @param host
     * @param port
     * @param clientName
     * @return
     */
    private SimpleConsumer createSimpleConsumer(String host, int port, String clientName) {
        return new SimpleConsumer(host, port, 100000, 64 * 1024, clientName);
    }

    /**
     * 休眠一段時間
     */
    private void sleep() {
        try {
            Thread.sleep(this.maxRetryTimes);
        } catch (InterruptedException e) {
            // nothings
        }
    }

    /**
     * 關閉對應資源
     *
     * @param consumer
     */
    private static void closeSimpleConsumer(SimpleConsumer consumer) {
        if (consumer != null) {
            try {
                consumer.close();
            } catch (Exception e) {
                // nothings
            }
        }
    }

    /**
     * 從Kafka集群中獲取指定topic的分區ID<br/>
     * 如果集群中不存在對應的topic,那么返回一個empty的集合
     *
     * @param brokers    Kafka集群連接參數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092}
     * @param topic      要獲取ID對應的主題
     * @param soTimeout  過期時間
     * @param bufferSize 緩沖區大小
     * @param clientId   client連接ID
     * @return
     */
    public static List<Integer> fetchTopicPartitionIDs(List<KafkaBrokerInfo> brokers, String topic, int soTimeout, int bufferSize, String clientId) {
        Set<Integer> partitionIDs = new HashSet<Integer>();

        List<String> topics = Collections.singletonList(topic);

        // 連接所有的Kafka服務器,然后獲取參數 ==> 遍歷連接
        for (KafkaBrokerInfo broker : brokers) {
            SimpleConsumer consumer = null;

            try {
                // 構建簡單消費者連接對象
                consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, soTimeout, bufferSize, clientId);

                // 構建請求參數
                TopicMetadataRequest tmRequest = new TopicMetadataRequest(topics);

                // 發送請求
                TopicMetadataResponse response = consumer.send(tmRequest);

                // 獲取返回結果
                List<TopicMetadata> metadatas = response.topicsMetadata();

                // 遍歷返回結果,獲取對應topic的結果值
                for (TopicMetadata metadata : metadatas) {
                    if (metadata.errorCode() == ErrorMapping.NoError()) {
                        // 沒有異常的情況下才進行處理
                        if (topic.equals(metadata.topic())) {
                            // 處理當前topic對應的分區
                            for (PartitionMetadata part : metadata.partitionsMetadata()) {
                                partitionIDs.add(part.partitionId());
                            }
                            // 處理完成,結束循環
                            break;
                        }
                    }
                }
            } finally {
                // 關閉連接
                closeSimpleConsumer(consumer);
            }
        }

        // 返回結果
        return new ArrayList<Integer>(partitionIDs);
    }


}

 

 

四、JavaKafkaSimpleConsumerAPITest:測試類;主要代碼如下:

import java.util.ArrayList;
import java.util.List;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaSimpleConsumerAPITest {
    public static void main(String[] args) {
        JavaKafkaSimpleConsumerAPI example = new JavaKafkaSimpleConsumerAPI();
        long maxReads = 300;
        String topic = "test2";
        int partitionID = 0;

        KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
        List<KafkaBrokerInfo> seeds = new ArrayList<KafkaBrokerInfo>();
        seeds.add(new KafkaBrokerInfo("192.168.187.146", 9092));

        try {
            example.run(maxReads, topicPartitionInfo, seeds);
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 獲取該topic所屬的所有分區ID列表
        System.out.println(example.fetchTopicPartitionIDs(seeds, topic, 100000, 64 * 1024, "client-id"));
    }
}

 

 

五、測試

Kafka相關命令可以參考博客[Kafka] - Kafka基本操作命令, 測試截圖如下:

 

至此,開發基本完成

========================================================

六、Kafka Pom文件依賴

<properties>
    <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM