- 下載zk
- 地址: http://mirrors.hust.edu.cn/apache/zookeeper/stable/
- 解壓 tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
- cd apache-zookeeper-3.5.8-bin/
-
mv conf/ zoo_sample.cfg conf/zoo.cfg
- vim zoo.cfg, 配置日志:
- tickTime: zookeeper中使用的基本時間單位, 毫秒
- dataDir: 內存數據快照的保存目錄;如果沒有自定義Log也使用該目錄
- clientPort: 監聽Cli連接的端口號
- 下載 kafa : http://kafka.apache.org/downloads.html
a) 解壓:tar -zxvf kafka_2.12-2.5.0.tgz
b) 修改 kafka_2.12-2.5.0/config/server.properties
c) 修改 zookeeper.properties
d) 啟動kafka: sh bin/kafka-server-start.sh ./config/server.properties
e) 啟動zk ,啟動server :
./zkServer.sh start
d)創建kafak topic : sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
檢驗是否創建好了topic:
./kafka-topics.sh --list --zookeeper localhost:2181
3. sping mvc 項目
a) 配置pom.xml文件
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>dubbo</artifactId> <version>2.5.3</version> <exclusions> <exclusion> <artifactId>spring</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
b) 創建kafka-producer.xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!--基本配置 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!-- kafka服務地址,可能是集群--> <entry key="bootstrap.servers" value="localhost:9092,localhost:9093,localhost:9094" /> <!-- 有可能導致broker接收到重復的消息,默認值為3--> <entry key="retries" value="10" /> <!-- 每次批量發送消息的數量--> <entry key="batch.size" value="1638" /> <!-- 默認0ms,在異步IO線程被觸發后(任何一個topic,partition滿都可以觸發)--> <entry key="linger.ms" value="1" /> <!--producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常 --> <entry key="buffer.memory" value="33554432 " /> <!-- producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號--> <entry key="acks" value="all" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 創建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 創建kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 --> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <!--設置對應topic--> <property name="defaultTopic" value="test" /> </bean> </beans>
c) 創建 kafka-consumer.xml 文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!--Kafka服務地址 --> <entry key="bootstrap.servers" value="localhost:9092" /> <!--Consumer的組ID,相同goup.id的consumer屬於同一個組。 --> <entry key="group.id" value="order-beta" /> <!--如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。 --> <entry key="enable.auto.commit" value="true" /> <!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 --> <entry key="session.timeout.ms" value="15000 " /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!--指定具體監聽類的bean --> <bean id="messageListernerConsumerService" class="test.ke.kafka.KafkaConsumerListener" /> <!-- 創建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test"/> <property name="messageListener" ref="messageListernerConsumerService"/> </bean> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> </beans>
d) 配置spring-mvc文件
<!-- 引入kafka配置文件,根據個人文件位置--> <import resource="classpath:kafka-producer.xml"/> <import resource="classpath:kafka-consumer.xml"/>
<!-- 使用zookeeper注冊中心暴露服務地址 --> <dubbo:application name="test-provider"/> <dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181"/>
d) 寫接口
@Resource private KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping(value = "/hello.do") @ResponseBody public void hello(){ kafkaTemplate.sendDefault("test it"); }
消息消費
package test.ke.kafka; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener; public class KafkaConsumerListener implements MessageListener<Integer, String> { @SneakyThrows @Override public void onMessage(ConsumerRecord<Integer, String> consumerRecord) { Object o = consumerRecord.value(); System.out.println(String.valueOf(o)); } }
此時,起服務在瀏覽器調用 http://localhost:18080/hello.do 則在控制台會打印出
---------------------
要簡單驗證kafka是否運行正常,則可以在kafka的bin目錄下執行:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
然后新起一個terminal, 執行下面的語句,producer發送文案,則上面的consumer就會拿到對應的結果
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
同時由於我們剛剛起的spring服務監聽的端口號也是9092, 因此服務也會獲取到該消息: