安裝zookeeper: https://www.cnblogs.com/guoyansi19900907/p/9954864.html
並啟動zookeeper
安裝kafka https://www.cnblogs.com/guoyansi19900907/p/9961143.html
並啟動kafka.
1.創建maven java項目
2.添加依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
3.創建生產者:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties kafkaProps=new Properties(); /** * acks指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入成功。 * acks=0,生產者在寫入消息之前不會等待任何來自服務器的響應;就算發送失敗了,生產者也不知道。 * acks=1,只要集群首領收到消息,生產者就會收到一個來自服務器的成功消息 * acks=all,所有參與復制的節點都收到消息,生產者才會收到一個來自服務器的成功響應。 */ kafkaProps.put("acks", "all"); /** * 發送失敗后重發的次數,最終還不成功表示發送徹底的失敗 */ kafkaProps.put("retries", 0); /** * 默認情況下,消息發送時不會被壓縮。 * snappy:壓縮算法由Google發明,它占用較少的CPU,卻能提供較好的性能和相當可觀的壓縮比 * gzip:占用較多的CPU,但是提供更高的壓縮比,帶寬比較有限,可以考慮這個壓縮算法。 * 使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往時向kafka發送消息的瓶頸 */ kafkaProps.put("compression.type", "snappy"); /** * 一個批次可以使用的內存大小;當批次被填滿,批次里的所有消息會被發送;不過生產者並不一定等批次被填滿才發送; * 所以批次大小設置得很大,也不會造成延遲,只是會占用更多得內存而已。但是設置得太小, * 因為生產者需要更頻繁的發送消息,會增加額外的開銷。 */ kafkaProps.put("batch.size", 100); /** * 指定了生產者在發送批次之前等待更多消息加入批次的時間。 * KafkaProducer會在批次填滿或liner.ms達到上限時把批次發送出去。 * 這樣做雖然會出現一些延時,但是會提高吞吐量。 */ kafkaProps.put("linger.ms", 1); /** * 生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。 * 如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足, * 這個時候send()方法要么被阻塞,要么拋出異常。 */ kafkaProps.put("buffer.memory", 33554432); /** * 生產者在收到服務器響應之前可以發送多少個消息。 * 值越高就會占用越多的內存,不過也會提升吞吐量。 * 設為1可以保證消息是按照發送順序填寫入服務器的,即使發生了重試。 */ kafkaProps.put("max.in.flight.requests.per.connection", 1); //kafkaProps.put("bootstrap.servers","192.168.123.128:9092,192.168.123.129:9092,192.168.123.130:9092"); //主機信息(broker) kafkaProps.put("bootstrap.servers","192.168.123.128:9092"); //鍵為字符串類型 kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //值為字符串類型 kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer=new KafkaProducer<String, String>(kafkaProps); String msg = "abc"; producer.send(new ProducerRecord<String, String>("guo", msg)); System.out.println("Sent:" + msg); producer.close(); } }
4.創建消費者
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerDemo { public static void main(String[] args) throws Exception{ Properties properties=new Properties(); //主機信息 properties.put("bootstrap.servers","192.168.123.128:9092"); //群組id properties.put("group.id", "group-1"); /** *消費者是否自動提交偏移量,默認是true * 為了經量避免重復數據和數據丟失,可以把它設為true, * 由自己控制核實提交偏移量。 * 如果設置為true,可以通過auto.commit.interval.ms屬性來設置提交頻率 */ properties.put("enable.auto.commit", "true"); /** * 自動提交偏移量的提交頻率 */ properties.put("auto.commit.interval.ms", "1000"); /** * 默認值latest. * latest:在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據 * erliest:偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。 */ properties.put("auto.offset.reset", "earliest"); /** * 消費者在指定的時間內沒有發送心跳給群組協調器,就被認為已經死亡, * 協調器就會觸發再均衡,把它的分區分配給其他消費者。 */ properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); /** * 訂閱主題,這個地方只傳了一個主題:gys. * 這個地方也可以有正則表達式。 */ kafkaConsumer.subscribe(Arrays.asList("guo")); //無限循環輪詢 while (true) { /** * 消費者必須持續對Kafka進行輪詢,否則會被認為已經死亡,他的分區會被移交給群組里的其他消費者。 * poll返回一個記錄列表,每個記錄包含了記錄所屬主題的信息, * 記錄所在分區的信息,記錄在分區里的偏移量,以及鍵值對。 * poll需要一個指定的超時參數,指定了方法在多久后可以返回。 * 發送心跳的頻率,告訴群組協調器自己還活着。 */ ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { //Thread.sleep(1000); System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
5.先運行消費者,然后會出現一個監聽的控制台,運行生產者。