依然是基於《kafka在windows上的安裝、運行》一文搭建的環境進行Java的調用開發。
實例一:
生產者代碼ProducerDemo.java:
package com.bijian.study; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class ProducerDemo { public static void main(String[] args) { String topic = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //這里的localhost可以改成機器名或ip props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); props.put("retries", 1); Producer<String, String> producer = new KafkaProducer<String, String>(props); producer.send(new ProducerRecord<String, String>(topic, "bijian"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println(metadata.toString());//org.apache.kafka.clients.producer.RecordMetadata@1d89e2b5 System.out.println(metadata.offset());//1 } } }); producer.send(new ProducerRecord<String, String>(topic, "ni hao"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println(metadata.toString());//org.apache.kafka.clients.producer.RecordMetadata@1d89e2b5 System.out.println(metadata.offset());//1 } } }); producer.flush(); producer.close(); } }
消費者代碼ConsumerDemo.java:
package com.bijian.study; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerDemo { public static void main(String[] args) { String topic = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "10.36.33.210:9092"); //這里的localhost可以改成機器名或ip props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("group.id", "0"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.offset.reset", "latest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); //System.out.println(records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println(record); // consumer.seekToBeginning(new TopicPartition(record.topic(), record.partition())); } } } }
運行生產者輸出如下:
test-0@236
236
test-0@237
237
再運行消費者輸出如下:
ConsumerRecord(topic = test, partition = 0, offset = 236, CreateTime = 1545754563805, serialized key size = -1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bijian) ConsumerRecord(topic = test, partition = 0, offset = 237, CreateTime = 1545754563816, serialized key size = -1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = ni hao)
注意:auto.offset.reset這里設置為earliest,是為了consumer能夠從頭開始讀取內容即offset=0開始,在org.apache.kafka.clients.consumer.ConsumerConfig中對其意義的描述如下:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset;latest: automatically reset the offset to the latest offset;none: throw exception to the consumer if no previous offset is found for the consumer's group;anything else: throw exception to the consumer。consumer.seekToBeginning也可以設置offset,但是跟源碼發現,This function evaluates lazily, seeking to the final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.也就是說seekToBeginning()的設置要生效的話,必須在poll或則position方法調用后設置seekToBeginning()才行。
earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
實例二:
生產者代碼KafkaProducerExample.java:
package com.bijian.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { String topic = "test02"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) // producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))); producer.send(new ProducerRecord<String, String>(topic, "Hello")); producer.close(); } }
消費者代碼KafkaConsumerExample.java:
package com.bijian.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { String topic = "test02"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
先運行生產者后,再運行消費者,消費者運行結果如下(共輸出100個消息):
offset = 100, key = null, value = Hello offset = 101, key = null, value = Hello offset = 102, key = null, value = Hello offset = 103, key = null, value = Hello ... offset = 196, key = null, value = Hello offset = 197, key = null, value = Hello offset = 198, key = null, value = Hello offset = 199, key = null, value = Hello
文章來源:https://blog.csdn.net/m0_37739193/article/details/78396773