1 Kafka
Kafka是一個開源分布式的流處理平台,一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。Kafka由Scala和Java編寫,2012年成為Apache基金會下頂級項目。
2 Kafka優點
- 低延遲:Kafka支持低延遲消息傳遞,速度極快,能達到200w寫/秒
- 高性能:Kafka對於消息的分布,訂閱都有高吞吐量。即使存儲了TB級的信息,依然能夠保證穩定的性能
- 可靠性:Kafka是分布式,分區,復制和容錯的,保證零停機和零數據丟失
- 可擴展:用戶可以從但個代理Broker開始作POC,然后慢慢擴展到由三個Broker組成的小型開發集群,接着擴展到數十個甚至數百個Broker集群進入生產階段,可以在集群聯機時進行擴展,而不會影響整個系統的可用性
- 多個生產者:無論這些客戶使用相同Topic還是多個Topic,Kafka都能無縫處理多個生產者,使得系統可以非常容易聚合來自許多前端系統的數據並使其保持一致
- 多個消費者:Kafka具有多個消費者設計,可以讀取任何但個消息流而不會相互干擾。多個Kafka消費者可以組成一個消費組進行操作並共享消息流,從而確保每一條消息只被整個消費組處理一次
- 基於磁盤的保留:Kafka使用分布式提交日志,消息能夠快速持久化到磁盤上。消息持久化意味着如果消費者落后,無論是由於處理速度緩慢還是突然的消息涌入都不會有丟失數據的危險,也意味着消費者可以被停止。消息將保留在Kafka中,允許消費者重新啟動並且從中斷處獲取處理信息而不會丟失數據
3 Kafka相關術語
- Broker:Kafka集群包含一個或多個服務器,這種服務器稱為Broker
- Topic:每條發布到Kafka的消息都有一個類別,這個類別稱為Topic。物理上不同Topic的消息分開存儲,邏輯上Topic的消息雖然保存在一個或多個Broker上,但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存放於何處
- Partition:每個Topic包含一個或多個Partition
- Producer:生產者,負責發布消息到Broker
- Consumer:消費者,向Broker讀取消息的客戶端
- Consumer Group:每個Consumer屬於一個特定的Consumer Group,可以為每個Consumer指定Group Name,否則屬於默認Group
4 動手干活
4.1 環境
- Spring Boot 2.3.1
- IDEA 2020.1.1
- OpenJDK 11.0.7
- Kafka 2.5.0
- Kotlin 1.3.72
4.2 下載Kafka
官網戳這里。
下載並解壓(注意需要Kafka與Spring Boot版本對應,可以參考這里):
tar -xvf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0
接着啟動ZooKeeper與Kafka:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Kafka需要用到ZooKeeper,需要在啟動Kafka之前啟動ZooKeeper(ZooKeeper是一個開源的分布式應用程序協調服務,是Hadoop的組件,主要用於解決分布式應用中的一些數據管理問題)。
Kafka默認使用9092端口,部署在服務器上需要注意防火牆以及安全組的處理。
4.3 新建工程
考慮到Spring Boot在2.3.0M1中(截至本文寫作日期2020.07.14Spring Boot已更新到2.4.0M1)首次采用Gradle而不是Maven來構建項目,換句話說日后Spring Boot的構建工具將從Maven遷移到Gradle,Spring Boot團隊給出的主要原因是可以減少項目構建所花費的時間,詳情可以戳這里瞧瞧。
另外由於另一個基於JVM的語言Kotlin的日漸崛起,后端開始逐漸有人采用Kotlin(盡管不多,不過語法糖真的香,JetBrains家的語言配合IDE,爽得飛起),因此本示例項目將采用兩種方式搭建:
- Java+Maven
- Kotlin+Gradle
選擇的依賴如下(當然您喜歡的話可以在pom.xml
或者build.gradle.kts
里面加,對於Kotlin不需要Lombok
):
4.4 項目結構
Java版:
Kotlin版:
serialize
:序列化/反序列化實體類Constant.java
/Constant.kt
:常量類Consumer.java
/Consumer.kt
:消費者類Entity.java
/Entity.kt
:實體類Producer.java
/Product.kt
:生產者類TestApplicationTets
:測試類
4.5 常量類
包含Topic與GroupId:
public class Constants {
public static final String TOPIC = "TestTopic";
public static final String GROUP_ID = "TestGroupId";
}
Kotlin版:
object Constants
{
const val TOPIC = "TestTopic"
const val GROUP_ID = "TestGroupId"
}
4.6 實體類
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Entity {
private long id;
private String name;
private int num;
}
說一下Lombok的幾個注解:
@AllArgsConstructor
/@NoArgsConstructor
:生成所有參數/無參數構造方法@Data
:@Setter+@Getter+@RequiredArgsConstrucotr+@ToString+@EqualAndHashCode
@Builder
:可以通過建造者模式創建對象
Kotlin版:
class Entity {
var id: Long = 0
var name: String = ""
var num: Int = 0
constructor()
constructor(id:Long,name:String,num:Int)
{
this.id = id
this.name = name
this.num = num
}
}
4.7 生產者
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class Producer {
private final KafkaTemplate<String, Entity> kafkaTemplate;
public void send(Entity entity) {
//發送消息
ListenableFuture<SendResult<String, Entity>> future =
kafkaTemplate.send(Constants.TOPIC, entity);
//回調函數
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
log.info("Send message failed");
}
@Override
public void onSuccess(SendResult<String, Entity> stringEntitySendResult) {
log.info("Send message success");
}
});
}
}
這里的send
有兩個參數,第一個為消息的Topic,第二個為消息體,一般使用String或者Json。
Kotlin版:
@Component
class Producer
{
@Autowired
private var kafkaTemplate:KafkaTemplate<String,Entity> ? = null
private val log = LoggerFactory.getLogger(this.javaClass)
fun send(entity: Entity)
{
val future = kafkaTemplate!!.send(Constants.TOPIC,entity);
future.addCallback(object : ListenableFutureCallback<SendResult<String?, Entity?>?>{
override fun onSuccess(result : SendResult<String?,Entity?>?)
{
log.info("Send success");
}
override fun onFailure(e:Throwable)
{
log.info("Send failed");
}
})
}
}
4.8 消費者
@Component
@Slf4j
public class Consumer {
@KafkaListener(topics = Constants.TOPIC,groupId = Constants.GROUP_ID)
public void consume(Entity entity)
{
log.info("Consume a entity, id is "+entity.getId());
}
}
使用@KafkaListener
注解,第一個參數表示需要消費的消息的Topic,可以是String []
,第二個是消費者組的id。生產者的消息Topic必須與消費者的Topic保持一致否則不能消費,這里簡單處理打印日志。
Kotlin版:
@Component
class Consumer {
private val log = LoggerFactory.getLogger(this.javaClass)
@KafkaListener(topics = [Constants.TOPIC],groupId = Constants.GROUP_ID)
fun consume(entity: Entity) {
log.info("Consume a entity, id is "+entity.id.toString())
}
}
4.9 序列化/反序列化
這里自定義了序列化/反序列化類,序列化/反序列化類需要實現org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口,其中T
是想要序列化的類型,這里是Entity
。序列化接口如下:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
byte[] serialize(String var1, T var2);
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}
反序列化接口如下:
public interface Deserializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
T deserialize(String var1, byte[] var2);
default T deserialize(String topic, Headers headers, byte[] data) {
return this.deserialize(topic, data);
}
default void close() {
}
}
也就是只需要實現其中的serialize/deserialize
方法即可。這里序列化/反序列化用到了自帶的Jackson:
@Slf4j
public class Serializer implements org.apache.kafka.common.serialization.Serializer<Entity> {
public byte [] serialize(String topic, Entity entity)
{
try {
return entity == null ? null : new ObjectMapper().writeValueAsBytes(entity);
} catch (JsonProcessingException e) {
e.printStackTrace();
log.error("Can not serialize entity in Serializer");
}
return null;
}
}
反序列化:
@Slf4j
public class Deserializer implements org.apache.kafka.common.serialization.Deserializer<Entity> {
public Entity deserialize(String topic,byte [] data)
{
try {
return data == null ? null : new ObjectMapper().readValue(data,Entity.class);
} catch (IOException e) {
e.printStackTrace();
log.error("Can not deserialize entity in Deserializer");
}
return null;
}
}
Kotlin版:
class Serializer : org.apache.kafka.common.serialization.Serializer<Entity?>
{
private val log = LoggerFactory.getLogger(this.javaClass)
override fun serialize(topic: String?, data: Entity?): ByteArray? {
try {
return if (data == null) null else ObjectMapper().writeValueAsBytes(data)
}
catch (e:JsonProcessingException)
{
e.printStackTrace()
log.error("Can not serialize entity in Serializer")
}
return null
}
}
class Deserializer : org.apache.kafka.common.serialization.Deserializer<Entity?>
{
private val log = LoggerFactory.getLogger(this.javaClass)
override fun deserialize(topic: String?, data: ByteArray?): Entity? {
try
{
return ObjectMapper().readValue(data, Entity::class.java)
}
catch (e:IOException)
{
e.printStackTrace()
log.error("Can not deserialize entity in Deserializer")
}
return null
}
}
4.10 配置文件
application.properties
:
# 地址
spring.kafka.bootstrap-servers=localhost:9092
# 消費者組id
spring.kafka.consumer.group-id=TestGroupId
spring.kafka.consumer.auto-offset-reset=earliest
# 消費者鍵反序列化類
spring.kafka.consumer.key-deserializer=org.ap飯ache.kafka.common.serialization.StringDeserializer
# 消費者值反序列化類
spring.kafka.consumer.value-deserializer=com.test.serialize.Deserializer
# 生產者鍵序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生產者值序列化類
spring.kafka.producer.value-serializer=com.test.serialize.Serializer
對於auto-offest-rest
,該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下怎么處理,有四個取值:
earliest
:當各分區有已提交的offest
時,從提交的offest
開始消費,無提交的offest
時,從頭開始消費latest
(默認):當各分區有已提交的offest
時,從提交的offest
開始消費,無提交的offest
時,消費新產生的該分區下的數據none
:各分區都存在已提交的offest
時,從offest
后消費,只要有一個分區不存在已提交的offest
,則拋出異常exception
:其他情況將拋出異常給消費者
對於序列化/反序列化,String可以使用自帶的序列化/反序列化類:
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringDeserializer
至於Json可以使用:
org.springframework.kafka.support.serializer.JsonSerializer
org.springframework.kafka.support.serializer.JsonDeserializer
其他自定義的請實現org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口。
yml版:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: TestGroupId
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.test.serialize.Deserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.test.serialize.Serializer
5 測試
5.1 測試類
@SpringBootTest
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
class TestApplicationTests {
private final Producer producer;
@Test
void contextLoads() {
Random random = new Random();
for (int i = 0; i < 1000; i++) {
long id = i+1;
String name = UUID.randomUUID().toString();
int num = random.nextInt();
producer.send(Entity.builder().id(id).name(name).num(num).build());
}
}
}
生產者發送1000條消息。
Kotlin版:
@SpringBootTest
class TestApplicationTests {
@Autowired
private val producer:Producer? = null
@Test
fun contextLoads() {
for(i in 0..1000)
{
val id = (i + 1).toLong()
val name = java.util.UUID.randomUUID().toString()
val num = (0..100000).random()
producer!!.send(Entity(id,name,num))
}
}
}
5.2 測試
控制台輸出如下:
所有消息被成功發送並且被成功消費。
最后可以去驗證一下Kafka的Topic列表,可以看到配置文件中的Topic的值(TestTopic
),進入Kafka目錄:
bin/kafka-topics.sh --list --zookepper localhost:2181
6 源碼
7 參考
1、CSDN-Kafka優點
2、簡書-Spring Boot 2.x 快速集成整合消息中間件 Kafka
3、簡書-springboot 之集成kafka
如果覺得文章好看,歡迎點贊。
同時歡迎關注微信公眾號:氷泠之路。