1.概述
在 Kafka 中,官方對外提供了兩種消費 API,一種是高等級消費 API,另一種是低等級的消費 API。在 《高級消費 API》一文中,介紹了其高級消費的 API 實現。今天給大家介紹另一種消費 API。
2.內容
在使用過 Kafka 的高級消費 API 后,我們知道它是一種高度抽象的消費 API,使用起來簡單,方便,但是對於某些特殊的需求我們可能要用到第二種更加底層的 API。那么,我們首先需要知道低級消費 API 的作用。它能幫助我們去做那些事情:
- 一個消息進行多次讀取
- 在處理過程中只消費 Partition 其中的某一部分消息
- 添加事物管理機制以保證消息僅被處理一次
當然,在使用的過程當中也是有些弊端的,其內容如下:
- 必須在程序中跟蹤 Offset 的值
- 必須找出指定的 Topic Partition 中的 Lead Broker
- 必須處理 Broker 的變動
使用其 API 的思路步驟如下所示:
- 從所有處於 Active 狀態的 Broker 中找出哪個是指定 Topic Partition 中的 Lead Broker
- 找出指定 Topic Partition 中的所有備份 Broker
- 構造請求
- 發送請求並查詢數據
- 處理 Leader Broker 的變動
3.代碼實現
3.1 Java Project
若是使用 Java Project 工程去實現該部分代碼,需要添加相關以來 JAR 文件,其內容包含如下:
- scala-xml_${version}-${version}.jar
- scala-library-${version}.jar
- metrics-core-${version}.jar
- kafka-client-${version}.jar
- kafka_${version}-${version}.jar
針對 Java Project 工程,需要自己篩選 JAR 去添加。保證代碼的順利執行。
3.2 Maven Project
對 Maven 工程,在 pom.xml 文件中添加相應的依賴信息即可,簡單方便。讓 Maven 去管理相應的依賴 JAR 文件。內容如下所示:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
這樣在 Maven 工程中相應的依賴 JAR 文件就添加完成了。
3.3 代碼實現
在低級消費 API 中,實現代碼如下所示:
/**
* @Date Mar 2, 2016
*
* @Author dengjie
*
* @Note Simple consumer api
*/
public class SimpleKafkaConsumer {
private static Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
private List<String> m_replicaBrokers = new ArrayList<String>();
public SimpleKafkaConsumer() {
m_replicaBrokers = new ArrayList<String>();
}
public static void main(String[] args) {
SimpleKafkaConsumer example = new SimpleKafkaConsumer();
// Max read number
long maxReads = SystemConfig.getIntProperty("kafka.read.max");
// To subscribe to the topic
String topic = SystemConfig.getProperty("kafka.topic");
// Find partition
int partition = SystemConfig.getIntProperty("kafka.partition");
// Broker node's ip
List<String> seeds = new ArrayList<String>();
String[] hosts = SystemConfig.getPropertyArray("kafka.server.host", ",");
for (String host : hosts) {
seeds.add(host);
}
int port = SystemConfig.getIntProperty("kafka.server.port");
try {
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
log.error("Oops:" + e);
e.printStackTrace();
}
}
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port)
throws Exception {
// Get point topic partition's meta
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
log.info("[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
log.info("[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
log.info("[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:" + leadBroker
+ " Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
log.info("[SimpleKafkaConsumer.run()] - Found an old offset: " + currentOffset + " Expecting: "
+ readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); // Message deal enter
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null)
consumer.close();
}
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime,
String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
log.info("[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
/**
* @param a_oldLeader
* @param a_topic
* @param a_partition
* @param a_port
* @return String
* @throws Exception
* find next leader broker
*/
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
log.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", "
+ a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}
4.總結
在使用 Kafka 低級消費 API 時,要明確我們所使用的業務場景,一般建議還是使用高級消費 API,除非遇到特殊需要。另外,在使用過程中,注意 Leader Broker 的處理,和 Offset 的管理。
5.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
