kafka簡介:
Kafka 是一種分布式的,基於發布 / 訂閱的消息系統。主要設計目標如下:
- 以時間復雜度為 O(1) 的方式提供消息持久化能力,即使對 TB 級以上數據也能保證常數時間復雜度的訪問性能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條以上消息的傳輸。
- 支持 Kafka Server 間的消息分區,及分布式消費,同時保證每個 Partition 內的消息順序傳輸。
- 同時支持離線數據處理和實時數據處理。
- Scale out:支持在線水平擴展。
一:配置文件
1.1:pom.xml中加入kafka依賴
<!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.4.RELEASE</version> </dependency>
1.2:導入生產者的xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!--參數配置 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!-- kafka服務地址,可能是集群 value="localhost:9092,localhost:9093,localhost:9094"--> <entry key="bootstrap.servers" value="ip地址:9092" /> <!--有可能導致broker接收到重復的消息 默認是0--> <entry key="retries" value="3" /> <!--acks=0, 表示生產者在成功寫入消息之前不會等待任何來自服務器的響應--> <!--acks=1, 表示只要集群的leader分區副本接收到了消息,就會向生產者發送一個成功響應的ack--> <!--acks=all, 表示只要所有參與復制的1節點(ISR列表的副本)全部收到消息時,生產者才會接收到來自服務器的響應--> <entry key="ack" value="1" /> <!--producer可以用來緩存數據的內存大小,如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常--> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 創建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 創建kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 --> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <!--設置對應topic test -是后台已經創建好的主題--> <property name="defaultTopic" value="test" /> </bean> </beans>
1.3:導入消費者xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!--消息監聽器--> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> </bean> <!-- 記得修改主題 --> <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties"> <!-- 構造函數 就是 主題的參數值 --> <constructor-arg value="test" /> <!-- 自定義個消息監聽器 --> <property name="messageListener" ref="myListnener" /> <!--手工確定--> <property name="ackMode" value="MANUAL"></property> </bean> <!-- -消息監聽器 --> <bean id="myListnener" class="com.zcb.kafka.MyMessageListener"></bean> <!-- 創建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!--Kafka服務地址 --> <entry key="bootstrap.servers" value="ip地址:9092" /> <!--Consumer的組ID,相同group.id的consumer屬於同一個組。 --> <entry key="group.id" value="test-consumer-group" /> <!--如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。 --> <entry key="enable.auto.commit" value="false" /> <!--當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費--> <entry key="auto-offset-reset" value="earliest"></entry> <!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 --> <entry key="session.timeout.ms" value="15000" /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> </beans>
1.4:添加監聽器實體類,以及用於啟動消費者的實體類
//消費者監聽器 public class MyMessageListener implements AcknowledgingMessageListener<String,String> { @Override public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) { String key = data.key(); String value = data.value(); if(key.equals("request")){ String s = JSON.parseObject(value, String.class); System.out.println(s); //確定接收 acknowledgment.acknowledge(); } } }
//啟動消費者 public class KafKaMain { public static void main(String[] args) { ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("spring-kafka-consumer.xml"); } }
二:啟動生產者消費者
2.1:測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-kafka-producer.xml") public class MyTest { @Autowired KafkaTemplate kafkaTemplate; @Test public void testKafka(){ kafkaTemplate.sendDefault("request", JSON.toJSONString("hello,Word!")); } }
2.2:效果