kafka客戶端代碼解析


轉載:http://backend.blog.163.com/blog/static/202294126201431724652597/

 

可以使用服務器端下載的kafka二進制包及依賴,也可以通過mavne獲取(注意實測發現該方式拿到的包是用jdk7打的):
<dependency>
<groupId>com.sksamuel.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0-beta1</version>
</dependency>
 
生產者
下面是開發生產者代碼的例子:
Properties props = new Properties();
//指定kafka節點:注意這里無需指定集群中所有Boker,只要指定其中部分即可,它會自動取meta信息並連接到對應的Boker節點
props.put("metadata.broker.list", "172.17.1.163:9093");
//指定采用哪種序列化方式將消息傳輸給Boker,你也可以在發送消息的時候指定序列化類型,不指定則以此為默認序列化類型
props.put("serializer.class", "kafka.serializer.StringEncoder");
//指定消息發送對應分區方式,若不指定,則隨機發送到一個分區,也可以在發送消息的時候指定分區類型。
props.put("partitioner.class", "example.producer.SimplePartitioner");
//該屬性表示你需要在消息被接收到的時候發送ack給發送者。以保證數據不丟失
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
//申明生產者:泛型1為分區key類型,泛型2為消息類型
Producer<String, String> producer = new Producer<String, String>(config);
//創建KeyedMessage發送消息,參數1為topic名,參數2為分區名(若為null則隨機發到一個分區),參數3為消息
producer.send(new KeyedMessage<String,String>("topic","partitionKey1","msg1"));
producer.close();
 
//自定義分區:
public class SimplePartitioner implements Partitioner<String> {
    public SimplePartitioner (VerifiableProperties props) {
    }
 
    public int partition(String key, int a_numPartitions) {
       return key.length()%a_numPartitions;
  } 
}
 
消費者
 
消費者api分上層api和底層api,這里是采用上層api的消費者例子(無需關系消息的offset,只是希望獲得數據)
注意:
1.上層api將會內部實現持久化每個分區最后讀到的消息的offset,數據保存在zookeeper中的消費組名中(如/consumers/id1/offsets/test2/2。其中id1是消費組,test2是topic,最后一個2表示第3個分區),每間隔一個很短的時間更新一次offset,那么可能在重啟消費者時拿到重復的消息。此外,當分區leader發生變更時也可能拿到重復的消息。因此在關閉消費者時最好等待一定時間(10s)然后再shutdown()
2.消費組名是一個全局的信息,要注意在新的消費者啟動之前舊的消費者要關閉。如果新的進程啟動並且消費組名相同,kafka會添加這個進程到可用消費線程組中用來消費topic和觸發重新分配負載均衡,那么同一個分區的消息就有可能發送到不同的進程中。
3.如果消費的線程多於分區數,一些線程可能永遠無法看到一些消息。
4.如果分區數多於線程數,一些線程會收到多個分區的消息
5.如果一個線程對應了多個分區,那么接收到的消息是不能保證順序的。
備注:可用zk的命令查詢:get /consumers/id1/owners/test3/2其中id1為消費組,test3為topic,2為分區3.查看里面的內容如:id1_163-PC-1382409386474-1091aef2-1表示該分區被該標示的線程所執行。
下面舉例:
Properties props = new Properties();
// 指定zookeeper服務器地址
props.put("zookeeper.connect", "172.17.1.163:2181");
// 指定消費組(沒有它會自動添加)
props.put("group.id", "id1");
// 指定kafka等待多久zookeeper回復(ms)以便放棄並繼續消費。
props.put("zookeeper.session.timeout.ms", "4000");
// 指定zookeeper同步最長延遲多久再產生異常
props.put("zookeeper.sync.time.ms", "2000");
// 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的消息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
 
// 我們要告訴kafka該進程會有多少個線程來處理對應的topic
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
int a_numThreads = 3;
// 用3個線程來處理topic:test2
topicCountMap.put("test2", a_numThreads);
// 拿到每個stream對應的topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test2");
 
// 調用thread pool來處理topic
ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(Thread.currentThread() + ":"
+ new String(it.next().message()));
}
}
});
}
System.in.read();
// 關閉
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
 
 
下面是采用底層api實現的消費者(需要多次讀消息,或從部分分區里讀數據,或用事務保證消息只處理一次)
 
注意:
1.你必須自己實現當停止消費時如何持久化offset
2.你必須自己找到哪個broker是leader以便處理topic和分區
3.你必須自己處理leader變更
 
使用階段:
1.找到那些broker是leader以便讀取topic和partition
2.自己決定哪個副本作為你的topic和分區
3.建立自己需要請求並自定義獲取你感興趣的數據
4.獲取數據
5.當leader變更時自己識別和恢復。
例子:

 

String topic = "test2";
int partition = 1;
String brokers = "172.17.1.163:9093";
int maxReads = 100; // 讀多少條數據
// 1.找leader
PartitionMetadata metadata = null;
for (String ipPort : brokers.split(",")) {
//我們無需要把所有的brokers列表加進去,目的只是為了獲得metedata信息,故只要有broker可連接即可
SimpleConsumer consumer = null;
try {
String[] ipPortArray = ipPort.split(":");
consumer = new SimpleConsumer(ipPortArray[0],
Integer.parseInt(ipPortArray[1]), 100000, 64 * 1024,
"leaderLookup");
List<String> topics = new ArrayList<String>();
topics.add(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
// 取meta信息
TopicMetadataResponse resp = consumer.send(req);
 
//獲取topic的所有metedate信息(目測只有一個metedata信息,何來多個?)
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
//獲取每個meta信息的分區信息,這里我們只取我們關心的partition的metedata
System.out.println("----"+part.partitionId());
if (part.partitionId() == partition) {
metadata = part;
break;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + ipPort
+ "] to find Leader for [" + topic + ", " + partition
+ "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (metadata == null || metadata.leader() == null) {
System.out.println("meta data or leader not found, exit.");
return;
}
// 拿到leader
Broker leadBroker = metadata.leader();
// 獲取所有副本
System.out.println(metadata.replicas());
 
// 2.獲取lastOffset(這里提供了兩種方式:從頭取或從最后拿到的開始取,下面這個是從頭取)
long whichTime = kafka.api.OffsetRequest.EarliestTime();
//這個是從最后拿到的開始取
// long whichTime = kafka.api.OffsetRequest.LatestTime();
System.out.println("lastTime:"+whichTime);
String clientName = "Client_" + topic + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker.host(),
leadBroker.port(), 100000, 64 * 1024, clientName);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
// 獲取指定時間前有效的offset列表
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return;
}
// 千萬不要認為offset一定是從0開始的
long[] offsets = response.offsets(topic, partition);
System.out.println("offset list:" + Arrays.toString(offsets));
long offset = offsets[0];
 
// 讀數據
while (maxReads > 0) {
// 注意不要調用里面的replicaId()方法,這是內部使用的。
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// 出錯處理。這里只直接返回了。實際上可以根據出錯的類型進行判斷,如code == ErrorMapping.OffsetOutOfRangeCode()表示拿到的offset錯誤
// 一般出錯處理可以再次拿offset,或重新找leader,重新建立consumer。可以將上面的操作都封裝成方法。再在該循環來進行消費
// 當然,在取所有leader的同時可以用metadata.replicas()更新最新的節點信息。另外zookeeper可能不會立即檢測到有節點掛掉,故如果發現老的leader和新的leader一樣,可能是leader根本沒掛,也可能是zookeeper還沒檢測到,總之需要等等。
short code = fetchResponse.errorCode(topic, partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
return;
}
//取一批消息
boolean empty = true;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
topic, partition)) {
empty = false;
long curOffset = messageAndOffset.offset();
//下面這個檢測有必要,因為當消息是壓縮的時候,通過fetch獲取到的是一個整塊數據。塊中解壓后不一定第一個消息就是offset所指定的。就是說存在再次取到已讀過的消息。
if (curOffset < offset) {
System.out.println("Found an old offset: " + curOffset
+ " Expecting: " + offset);
continue;
}
// 可以通過當前消息知道下一條消息的offset是多少
offset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
maxReads++;
}
//進入循環中,等待一會后獲取下一批數據
if(empty){
Thread.sleep(1000);
}
}
// 退出(這里象征性的寫一下)
if (consumer != null)
consumer.close();

 

另外還有采用hadoop專用api實現消息保存到hadoop中(這里略)
 
消費者配置
 
消費者或consumer.properties配置:
zookeeper.connect:zookeeper連接服務器地址
zookeeper.session.timeout.ms對zookeeper的session過期時間,默認6000ms,用於檢測消費者是否掛掉,當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
group.id:指定消費組
auto.commit.enable:是否自動提交:這里提交意味着客戶端會自動定時更新offset到zookeeper.默認為true
auto.commit.interval.ms:自動更新時間。默認60 * 1000
auto.offset.reset:如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
consumer.timeout.ms:如果一段時間沒有收到消息,則拋異常。默認-1
fetch.message.max.bytes:每次取的塊的大小(默認1024*1024),多個消息通過塊來批量發送給消費者,指定塊大小可以指定有多少消息可以一次取出。注意若一個消息就超過了該塊指定的大小,它將拿不到
queued.max.message.chunks:最大取多少塊緩存到消費者(默認10)。
更多配置可參見ConsumerConfig類
 
生產者配置
 
生產者或producer.properties配置:
metadata.broker.list:指定kafka節點列表,用於獲取metadata,不必全部指定
request.required.acks:指定生產者發送請求如何確認完成:0(默認)表示生產者不用等待broker返回ack。1表示當有復本(該復本節點不一定是同步)收到了消息后發回ack給生產者(如果leader掛掉且剛好收到消息的復本也掛掉則消息丟失)。-1表示所有已同步的復本收到了消息后發回ack給生產者(可以保證只要有一個已同步的復本存活就不會有數據丟失)。
producer.type:同步還是異步,默認2表同步,1表異步。異步可以提高發送吞吐量,但是也可能導致丟失未發送過去的消息
queue.buffering.max.ms:如果是異步,指定每次發送最大間隔時間
queue.buffering.max.messages:如果是異步,指定每次發送緩存最大數據量
serializer.class:指定序列化處理類,默認為kafka.serializer.DefaultEncoder,即byte[]
key.serializer.class:單獨序列化key處理類,默認和serializer.class一致
partitioner.class:指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區
message.send.max.retries:消息發送重試次數,默認3次
retry.backoff.ms:消息發送重試間隔次數
compression.codec:是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
compressed.topics:如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示全壓縮。
更多配置可參見ProducerConfig類


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM