消息中間件kafka學習記錄


1. 概述

Apache Kafka是一個分布式消息系統,憑借其優異的特性而被廣泛使用。

  • 高性能:O(1)復雜度消息快速持久化。
  • 高吞吐率: 單機每秒10w條消息傳輸。
  • 支持消息分區和分布式消費。
  • 支持在線水平擴展。
架構及核心組件

  • Producer: 消息生產者,即向kafka broker發送消息的客戶端。
  • Consumer:消息消費者,即從kafka broker獲取消息的客戶端。
  • Topic:消息根據topic進行歸類。
  • Partition:消息分片,每個topic中的消息被分為n個獨立的partition,以提高消息處理效率。
  • Broker:kafka集群中的kafka實例(服務器節點),一個broker可以容納多個topic。

Kafka依賴於zookeeper保存一些meta信息,來保證系統可用性。

2. 環境准備

2.1 安裝zookeeper

zk官網下載安裝包,如zookeeper-3.4.13.tar.gz,解壓即可。

  • 啟動服務,默認監聽端口2181
./bin/zkServer.sh start
  • 客戶端連接
./bin/zkCli.sh -server 10.183.222.203:2181

2.2 安裝kafka

kafka官網下載安裝包,如kafka_2.12-2.0.0.tgz,解壓即可。
/config/server.properties配置:
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://10.183.222.203:9092
zookeeper.connect=10.183.222.203:2181 #連接zookeeper

  • 啟動服務,默認監聽端口9092
./bin/kafka-server-start.sh  config/server.properties  &

啟動kafka后,zk增加了brokers、consumers等節點:
啟動kafka后,查看zk節點

3. 命令行常用命令

創建topic (-create)
# 創建topic:test01,並指定了replication-factor和partitions分別為1。
# replication-factor控制一個Message會被寫到多少台服務器上,因此這個值必須≤Broker數量。
./bin/kafka-topics.sh -create -zookeeper  10.183.222.203:2181 -replication-factor 1 -partitions 1 -topic test01

創建topic

查看topic詳情 (-describe)
./bin/kafka-topics.sh -describe -zookeeper 10.183.222.203:2181

查看topic詳情

發送消息到指定topic
./bin/kafka-console-producer.sh --broker-list 10.183.222.203:9092 -topic test01

發送消息

消費指定topic上的消息
./bin/kafka-console-consumer.sh -bootstrap-server 10.183.222.203:9092 -topic test01 -from-beginning

接收消息

4. java api實現

4.1 添加maven配置
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
4.2 消息生產者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 消息生產者
 */
public class ProducerDemo {
    private static final String helloTopic = "HelloWorld";

    public static void main(String[] args) {
        // 1. 構造Propertity,進行producer 相關配置。
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.183.222.203:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432);
        // 消息序列化方式
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            // 2. 構造Producer對象
            producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 10; i++) {
                String msgValue = "Message " + i;
                // 3. 發送消息
                producer.send(new ProducerRecord<>(helloTopic, msgValue));
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (producer != null) {
                producer.close();
            }
        }
    }
}
4.3 消息消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

/**
 *  consumer從kafka讀取消息
 */
public class ConsumerDemo {
    private static final String helloTopic = "HelloWorld";

    public static void main(String[] args) {
        // 1. 構造Propertity,進行consumer相關配置。
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.183.222.203:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        // 消息反序列化方式
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
       
        // 2. 生成消費實例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 3. 訂閱相應的 topic
        //    說明:可以消費多個topic: Arrays.asList(topic);
        //          topic支持正則表達式:如:subscribe(Pattern.compile("test.*")
        consumer.subscribe(Collections.singleton(helloTopic));
        
        // 4. 循環消費消息
        while (true) {
            try {
                // 4.1 poll方法拉取訂閱的消息, 消費者必須不斷的執行poll,獲取消息、維持連接。
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // 4.2 消費數據,必須在下次poll之前消費完這些數據, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG
                //     若不能在下次poll之前消費完,則會觸發一次負載均衡,產生卡頓。
                //     可以開一個單獨的線程池來消費消息,然后異步返回結果。
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }

            } finally {
                // 不再消費主動關閉
                consumer.close();
            }
        }
    }
}

消息消費者

kafka作為目前廣泛使用的消息中間件。本文對其核心組件和基本用法做了學習記錄。

參考:Kafka: The Definitive Guide


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM