上篇文章學習kafka的基本安裝和基礎概念,本文主要是學習kafka的常用API。其中包括生產者和消費者,
多線程生產者,多線程消費者,自定義分區等,當然還包括一些避坑指南。
首發於個人網站:鏈接地址
准備工作
kafka版本:2.11-1.1.1
操作系統:centos7
java:jdk1.8
有了以上這些條件就OK了,具體怎么安裝和啟動Kafka這里就不強調了,可以看上一篇文章。
新建一個maven工程,需要的依賴如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
主題管理
kafka的核心就是主題,學會使用kafka的腳本創建主題,也需要學習使用Java API來創建主題。
Kafka將zookeeper的操作封裝成一個ZkUtils類,通過AdminUtils類來調用ZkUtils,來實現Kafka中元數據的操作。
下面一個例子是使用AdminUtils來創建主題,並同時創建指定大小的分區數。
1 // 連接配置 2 private static final String ZK_CONNECT = "10.0.90.53:2181"; 3 4 // session過期時間 5 private static final int SEESSION_TIMEOUT = 30 * 1000; 6 7 // 連接超時時間 8 private static final int CONNECT_TIMEOUT = 30 * 1000; 9 10 /** 11 * 創建主題 12 * 13 * @param topic 主題名稱 14 * @param partition 分區數 15 * @param repilca 副本數 16 * @param properties 配置信息 17 */ 18 public static void createTopic(String topic, int partition, int repilca, Properties properties) { 19 ZkUtils zkUtils = null; 20 try { 21 // 創建zkutil 22 zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled()); 23 if (!AdminUtils.topicExists(zkUtils, topic)) { 24 //主題不存在,則創建主題 25 AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties, AdminUtils.createTopic$default$6()); 26 } 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } finally { 30 zkUtils.close(); 31 } 32 }
執行該方法,創建主題,
在centos7中查看之前創建的主題:
bin/kafka-topics.sh --list --zookeeper localhost:2181
刪除主題:
/** * 刪除主題 * * @param topic */ public static void deleteTopic(String topic){ ZkUtils zkUtils = null; try { zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled()); AdminUtils.deleteTopic(zkUtils,topic); } catch (Exception e) { e.printStackTrace(); } finally { zkUtils.close(); } }
生產者API
在掌握了創建和刪除主題之后,接下來,學習Kafka的生產者API。
Kafka中的生產者,通過KafkaProducer這個類來實現的,在介紹這個類的使用之前,首先介紹kafka的配置項,這也是實際生產中比較關心的。
消息發送流程
實例化生產者時,有三個配置是必須指定的:
- bootstrap.servers:配置連接代理列表,不必包含Kafka集群的所有代理地址,當連接上一個代理后,會從集群元數據信息中獲取其他存活的代理信息。但為了保證能夠成功連上Kafka集群,在多代理集群的情況下,建議至少配置兩個代理。
- (由於電腦配置有限,本文實驗的是單機情況)
key.serializer : 用於序列化消息Key的類 - value.serializer :用於序列化消息值(Value)的類
向Kafka發送一個消息,基本上要經過以下的流程:
1.配置Properties對象,這個是必須的
2.實例化KafkaProducer對象
3.實例化ProducerRecord對象,每條消息對應一個ProducerRecord對象
4.調用KafkaProducer的send方法,發送消息。發送消息有兩種,一種是帶回調函數的(如果發送消息有異常,會在回調函數中返回),另一種是不帶回調函數的。
KafkaProducer默認是異步發送消息,首先它會將消息緩存到消息緩沖區中,當緩存區累積到一定數量時,將消息封裝成一個
RecordBatch,統一發送消息。也就是說,發送消息實質上分為兩個階段,第一將消息發送到消息緩沖區,第二執行網絡I/O操作
5.關閉KafkaProducer,釋放連接的資源。
了解以上的流程,那么接下來就實現Java版本的API。
代碼實例
第一步:
新建一個消息實體類,模擬支付訂單消息,包含消息的ID,商家名稱,創建時間,備注。
public class OrderMessage { // 訂單ID private String id; // 商家名稱 private String sName; // 創建時間 private long createTime; // 備注 private String remake; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getsName() { return sName; } public void setsName(String sName) { this.sName = sName; } public long getCreateTime() { return createTime; } public void setCreateTime(long createTime) { this.createTime = createTime; } public String getRemake() { return remake; } public void setRemake(String remake) { this.remake = remake; } @Override public String toString() { return "OrderMessage{" + "id='" + id + '\'' + ", sName='" + sName + '\'' + ", createTime=" + createTime + ", remake='" + remake + '\'' + '}'; } }
第二步:
這里簡單的發送一個消息demo,按照上面的流程,生產者例子如下:
package kafka.producer; import kafka.OrderMessage; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.UUID; /** * kafka生產者 */ public class ProducerSimpleDemo { static Properties properties = new Properties(); //主題名稱 static String topic = "myTopic"; //生產者 static KafkaProducer<String, String> producer = null; //生產者配置 static { properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092"); properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); } public static void main(String args[]) throws Exception { sendMsg(); } /** * 發送消息 * * @throws Exception */ public static void sendMsg() throws Exception { ProducerRecord<String, String> record = null; try { // 循環發送一百條消息 for (int i = 0; i < 10; i++) { // 構造待發送的消息 OrderMessage orderMessage = new OrderMessage(); orderMessage.setId(UUID.randomUUID().toString()); long timestamp = System.nanoTime(); orderMessage.setCreateTime(timestamp); orderMessage.setRemake("remind"); orderMessage.setsName("test"); // 實例化ProducerRecord record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString()); producer.send(record, (metadata, e) -> { // 使用回調函數 if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println(String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
運行,結果就出現了,異常。
異常記錄:
2018-07-30 18:05:10.755 DEBUG 10272 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_111] at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_111] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) [kafka-clients-0.10.1.1.jar:na] at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]
在配置kafka中,首先需要修改kafka的配置server.properties中的
advertised.listeners=PLAINTEXT://:your.host.name:9092
advertised.listeners=PLAINTEXT://10.0.90.53:9092
需要注意的是,如果Kafka有多個節點,那么需要每個節點都按照這個節點的實際hostname和port情況進行設置。
修改完畢,重啟Kafka服務,開啟消費者,接受消息,在服務器中輸入:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic myTopic --from-beginning
可以看到服務器中的消費者:

offset: 0, partition:0, topic:myTopic timestamp:1533199115840 offset: 1, partition:0, topic:myTopic timestamp:1533199115850 offset: 2, partition:0, topic:myTopic timestamp:1533199115850 offset: 3, partition:0, topic:myTopic timestamp:1533199115850 offset: 4, partition:0, topic:myTopic timestamp:1533199115850 offset: 5, partition:0, topic:myTopic timestamp:1533199115850 offset: 6, partition:0, topic:myTopic timestamp:1533199115850 offset: 7, partition:0, topic:myTopic timestamp:1533199115852 offset: 8, partition:0, topic:myTopic timestamp:1533199115852 offset: 9, partition:0, topic:myTopic timestamp:1533199115852
自定義分區器
Kafka在底層摒棄了Java堆緩存機制,采用了操作系統級別的頁緩存,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO性能。
這個在單機上的提高,對於集群,Kafka使用了分區,將topic的消息分散到多個分區上,並保存在不同的機器上。
但是是否分區越多,效率越高呢?也不盡然!
1.每個分區在底層文件系統都有屬於自己的一個目錄。該目錄下通常會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄(file handler)。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。
2.消費者和生產者都會為分區緩存消息,分區越多,緩存的消息就越多,占用的內存就越大。
3.降低高可用,Kafka是通過高可用來實現高可用性的。我們知道在集群中往往會有一個leader,假設集群中有10個Kafka進程,1個leader,9個follwer,如果一個leader掛了,那么就會重新選出一個leader,如果集群中有10000個分區,那么將要花費很長的時間,這對於高可用是有損耗的。
本身kafka有自己的分區策略的,如果未指定,就會使用默認的分區策略:
Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions。如果Key相同的話,那么就會分配到統一分區。
Kafka提供了自定義的分區器,只要實現Partitioner接口即可,下面是自定義分區的例子:
package kafka.partition;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定義分區器
*/
public class PartitionUtil implements Partitioner {
// 分區數
private static final Integer PARTITION_NUM = 6;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (null == key){
return 0;
}
String keyValue = String.valueOf(key);
// key取模
int partitionId = (int) (Long.valueOf(key.toString())%PARTITION_NUM);
return partitionId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName());
offset: 3, partition:5, topic:MyOrder timestamp:1533205894785 offset: 5, partition:3, topic:MyOrder timestamp:1533205893202 offset: 6, partition:3, topic:MyOrder timestamp:1533205894784 offset: 2, partition:2, topic:MyOrder timestamp:1533205894785 offset: 4, partition:1, topic:MyOrder timestamp:1533205894785 offset: 5, partition:1, topic:MyOrder timestamp:1533205894785 offset: 5, partition:0, topic:MyOrder timestamp:1533205894784 offset: 6, partition:0, topic:MyOrder timestamp:1533205894784 offset: 7, partition:0, topic:MyOrder timestamp:1533205894785 offset: 8, partition:0, topic:MyOrder timestamp:1533205894786
線程池生產者
在實際生產過程中,通常消息數量是比較多的,就可以考慮使用線程池。
使用線程池發送消息時,要考慮兩點:1.需要結合實際情況,合理設計線程池的大小;2.使用線程池時,消息的發送是無序的,如果對消息的順序有要求,不建議使用。
如果使用線程池,建議是只實例化一個KafkaProducer對象,這樣性能最好。代碼如下:
首先寫一個線程類:
package kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; /** * 生產者線程 */ public class ProducerThread implements Runnable { private KafkaProducer<String, String> producer = null; private ProducerRecord<String, String> record = null; public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) { this.producer = producer; this.record = record; } @Override public void run() { producer.send(record, (metadata, e) -> { if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println("消息發送成功 : "+String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); } }
接着完成啟動類,啟動類中自定義了一個線程池,這里還是有一些遐思,就是沒有自定義,線程創建工廠,沒有指定創建的線程名稱,在實際生產中,最好是自定義線程工廠。
代碼如下:
package kafka.producer; import kafka.OrderMessage; import kafka.partition.PartitionUtil; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; import java.util.UUID; import java.util.concurrent.*; /** * 線程池生產者 * * @author tangj * @date 2018/7/29 20:15 */ public class ProducerDemo { static Properties properties = new Properties(); static String topic = "MyOrder"; static KafkaProducer<String, String> producer = null; // 核心池大小 static int corePoolSize = 5; // 最大值 static int maximumPoolSize = 20; // 無任務時存活時間 static long keepAliveTime = 60; // 時間單位 static TimeUnit timeUnit = TimeUnit.SECONDS; // 阻塞隊列 static BlockingQueue blockingQueue = new LinkedBlockingQueue(); // 線程池 static ExecutorService service = null; static { // 配置項 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); // 初始化線程池 service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue); } public static void main(String args[]) throws Exception { for (int i = 0; i < 6; i++) { service.submit(createMsgTask()); } } /** * 生產消息 * * @return */ public static ProducerThread createMsgTask() { OrderMessage orderMessage = new OrderMessage(); orderMessage.setId(UUID.randomUUID().toString()); long timestamp = System.nanoTime(); orderMessage.setCreateTime(timestamp); orderMessage.setRemake("rem"); orderMessage.setsName("test"); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString()); ProducerThread task = new ProducerThread(producer, record); return task; } }
總結
對於Kafka的分區器和多線程生成者,切記一點,一定要根據實際業務進行設計。
