本人今天上午參考了不少博文,發現不少博文不是特別好,不是因為依賴沖突問題就是因為版本問題。
於是我結合相關的博文和案例,自己改寫了下並參考了下,於是就有了這篇文章。希望能夠給大家幫助,少走一些彎路。
一、KafKa的介紹
1.主要功能
根據官網的介紹,ApacheKafka®是一個分布式流媒體平台,它主要有3種功能:
a.發布和訂閱消息流,這個功能類似於消息隊列,這也是kafka歸類為消息隊列框架的原因。
b.以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流。
c.可以再消息發布的時候進行處理。
2.使用場景
a.在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能。
b.構建實時的流數據處理程序來變換或處理數據流,數據處理功能。
3.詳細介紹
Kafka目前主要作為一個分布式的發布訂閱式的消息系統使用,下面簡單介紹一下kafka的基本機制
消息傳輸過程:
Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。
Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息
Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。
二、安裝
安裝包下載地址:http://kafka.apache.org/downloads
找到0.11.0.1版本,如圖:
1.下載
wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
2.解壓
tar -xzvf kafka_2.11-0.11.0.1.tgz
配置說明:
consumer.properites 消費者配置,這個配置文件用於配置開啟的消費者,此處我們使用默認的即可。
producer.properties 生產者配置,這個配置文件用於配置開啟的生產者,此處我們使用默認的即可。
server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,目前僅介紹幾個最基礎的配置。
a.broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,並且集群中的每一個kafka服務器的id都應是唯一的,我們這里采用默認配置即可。
b.listeners 申明此kafka服務器需要監聽的端口號,如果是在本機上跑虛擬機運行可以不用配置本項,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,
例如:listeners=PLAINTEXT:// 192.168.126.143:9092。並確保服務器的9092端口能夠訪問。
c.zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由於本次使用的是kafka高版本中自帶zookeeper,
使用默認配置即可,zookeeper.connect=localhost:2181。
3.運行
首先運行zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
運行成功,顯示如圖:
然后運行kafka
bin/kafka-server-start.sh config/server.properties
運行成功,顯示如圖:
三、整合KafKa
1.新建Maven項目導入Maven依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.test</groupId> <artifactId>kafka_demo</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <!-- 指定編譯版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> <finalName>${project.artifactId}</finalName> </build> </project>
2.編寫消息實體
package com.springboot.kafka.bean;
import java.util.Date;
import lombok.Data;
@Data
public class Message {
private Long id; //id
private String msg; //消息
private Date sendTime; //時間戳
}
有了lombok,每次編寫實體不必要使用快捷鍵生成seter或geter方法了,代碼看起來更加簡潔了。
3.編寫消息發送者(可以理解為生產者,最好聯系詳細介紹中的圖)
package com.springboot.kafka.producer; import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.springboot.kafka.bean.Message; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //發送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); } }
4.編寫消息接收者(可以理解為消費者)
package com.springboot.kafka.producer; import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.springboot.kafka.bean.Message; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //發送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); } }
5.編寫啟動類
package com.springboot.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import com.springboot.kafka.producer.KafkaSender; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) { //調用消息發送類中的消息發送方法 sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
6.編寫application.properties配置文件
#============== kafka =================== # \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A spring.kafka.bootstrap-servers=192.168.126.143:9092 #=============== provider ======================= spring.kafka.producer.retries=0 # \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
7.運行結果
示例代碼地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo
如果按照上述流程沒有達到預計的效果可以git clone到本地。