環境
JDK 1.8
Zookeeper 3.6.1
Kafka 2.6.0
引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
創建主題
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.25.132:9092"); //kafka服務地址
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AdminClient client = KafkaAdminClient.create(props);//創建操作客戶端
//創建名稱為test1的topic,有5個分區
NewTopic topic = new NewTopic("test1", 5, (short) 1);
client.createTopics(Arrays.asList(topic));
client.close();//關閉
}
查看結果,在zookeeper路徑/brokers/topics下新增了節點test1,就是剛才創建的topic主題
生產者
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//異步發送20條消息
for (int i = 1; i <= 20; i++){
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i);
producer.send(record);
}
producer.close();
}
先啟動控制台的消費者,監聽topic=test1的數據
kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1
然后運行生產者程序,這時消費端會輸出消息信息
[root@cluster01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1
message1
message2
message3
message4
message5
message6
message7
message8
message9
message10
message11
message12
message13
message14
message15
message16
message17
message18
message19
message20
帶回調函數的生產者
回調函數是在生產者收到ack時調用,把上面的producer.send(record)加上回調即可
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("success:" + recordMetadata.offset());
}
else{
e.printStackTrace();
}
}
});
同步發送:消息發送后,會堵塞當前線程直到收到ack為止,才繼續發送
//在后面加上get()即可
producer.send(record).get();
消費者
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的屬於同一個消費者組
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自動提交offset
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消費test1主題
consumer.subscribe(Arrays.asList("test1"));
while (true){
System.out.println("consumer is polling");
//5秒等待
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("offset=%d,key=%s,value=%s",
record.offset(), record.key(), record.value()));
}
//同步提交,失敗會重試
consumer.commitSync();
//異步提交,失敗不會重試
//consumer.commitAsync();
}
}
運行消費者,看到輸出“consumer is polling”之后,再運行生產者,結果如下:
D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\Idea\IntelliJ IDEA 2019.1.1\lib\idea_rt.jar=52238:D:\Idea\IntelliJ IDEA 2019.1.1\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk1.8.0_131\jre\lib\charsets.jar;D:\Java\jdk1.8.0_131\jre\lib\deploy.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_131\jre\lib\javaws.jar;D:\Java\jdk1.8.0_131\jre\lib\jce.jar;D:\Java\jdk1.8.0_131\jre\lib\jfr.jar;D:\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_131\jre\lib\jsse.jar;D:\Java\jdk1.8.0_131\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_131\jre\lib\plugin.jar;D:\Java\jdk1.8.0_131\jre\lib\resources.jar;D:\Java\jdk1.8.0_131\jre\lib\rt.jar;F:\JavaTest\ConsoleTrain\kafka\target\classes;E:\.m2\repository\org\apache\kafka\kafka-clients\2.6.0\kafka-clients-2.6.0.jar;E:\.m2\repository\com\github\luben\zstd-jni\1.4.4-7\zstd-jni-1.4.4-7.jar;E:\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;E:\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;E:\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;E:\.m2\repository\org\slf4j\slf4j-nop\1.7.25\slf4j-nop-1.7.25.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.5\jackson-databind-2.9.5.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.5\jackson-core-2.9.5.jar com.train.consumer.SimpleConsumer
consumer is polling
consumer is polling
partition=1,offset=15,key=key2,value=message2
partition=1,offset=16,key=key6,value=message6
partition=1,offset=17,key=key12,value=message12
consumer is polling
partition=0,offset=25,key=key1,value=message1
partition=0,offset=26,key=key3,value=message3
partition=0,offset=27,key=key10,value=message10
partition=0,offset=28,key=key15,value=message15
partition=0,offset=29,key=key19,value=message19
partition=3,offset=20,key=key5,value=message5
partition=3,offset=21,key=key11,value=message11
partition=3,offset=22,key=key14,value=message14
partition=3,offset=23,key=key18,value=message18
partition=2,offset=15,key=key7,value=message7
partition=2,offset=16,key=key8,value=message8
partition=2,offset=17,key=key16,value=message16
partition=4,offset=25,key=key4,value=message4
partition=4,offset=26,key=key9,value=message9
partition=4,offset=27,key=key13,value=message13
partition=4,offset=28,key=key17,value=message17
partition=4,offset=29,key=key20,value=message20
consumer is polling
注意觀察上圖的數據:consumer消費數據,只保證同一個分區內的數據是有序的。當1個consumer去消費不同分區的數據時,分區之間的message不能保證順序。