Kafka入門之生產者消費者


一、Kafka安裝與使用 ( kafka介紹     )

1. 下載Kafka

官網 http://kafka.apache.org/    以及各個版本的下載地址 http://archive.apache.org/dist/kafka/

 

2. 安裝

Kafka是使用scala編寫的運行與jvm虛擬機上的程序,雖然也可以在windows上使用,但是kafka基本上是運行在linux服務器上,(也可以運行在windows上)因此我們這里也使用linux來開始今天的實戰。首先確保你的機器上安裝了jdk,kafka需要java運行環境,以前的kafka還需要zookeeper,新版的kafka已經內置了一個zookeeper環境,所以我們可以直接使用。說是安裝,如果只需要進行最簡單的嘗試的話我們只需要解壓到任意目錄即可,這里我們將kafka壓縮包解壓到/home目錄

直接解壓縮即可使用(不管在linux上,還是windows上)

Kafka目錄如下:

1 其中bin是執行文件目錄,包括linux下的執行文件,以及bin/window目錄下包含windows執行的批處理命令;
2 config中包含kafka的配置文件;
3 libs中是kafka的各種依賴包。

 

3. 配置  ( Linux下Kafka單機安裝配置方法(圖文)

配置Kafka文件(不配置也能在本地機上執行,不配置默認主機是localhost )

命令行輸入: vi server.properties #編輯修改相應的參數

1 broker.id=0
2 
3 port=9092 #端口號
4 
5 host.name=192.168.0.11 #服務器IP地址,修改為自己的服務器IP
6 
7 log.dirs=/usr/local/kafka/log/kafka #日志存放路徑,上面創建的目錄  (改成自己的目錄)
8 
9 zookeeper.connect=localhost:2181 #zookeeper地址和端口,單機配置部署,localhost:2181

 

4. 命令行運行

4.1  啟動zookeeper

cd進入kafka解壓目錄,輸入

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動zookeeper成功后會看到如下的輸出

4.2 啟動kafka

cd進入kafka解壓目錄,輸入

bin/kafka-server-start.sh config/server.properties

啟動kafka成功后會看到如下的輸出

 

 

5. 第一個消息(Linux)

5.1   創建一個topic

Kafka通過topic對同一類的數據進行管理,同一類的數據使用同一個topic可以在處理數據時更加的便捷

在kafka解壓目錄打開終端,輸入

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

創建一個名為test的topic

 

在創建topic后可以通過輸入

 bin/kafka-topics.sh --list --zookeeper localhost:2181

來查看已經創建的topic

5.2 創建一個消息消費者

在kafka解壓目錄打開終端,輸入

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以創建一個用於消費topic為test的消費者

 

消費者創建完成之后,因為還沒有發送任何數據,因此這里在執行后沒有打印出任何數據

不過別着急,不要關閉這個終端,打開一個新的終端,接下來我們創建第一個消息生產者

5.3    創建一個消息生產者

在kafka解壓目錄打開一個新的終端,輸入

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在執行完畢后會進入的編輯器頁面

在發送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經打印出了我們剛才發送的消息

 

第4步可以通過腳本文件進行實現:

1) 創建啟動腳本,假設我們的Kafka在/usr/local/目錄下

cd /usr/local/kafka  #創建啟動腳本

vi kafkastart.sh #編輯,添加以下代碼

1 #!/bin/sh
2 #創建啟動腳本
3 #啟動zookeeper
4 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
5 sleep 3 #等3秒后執行
6 
7 #啟動kafka
8 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

2) 創建關閉腳本

vi kafkastop.sh #編輯,添加以下代碼

1 #!/bin/sh
2 #創建關閉腳本
3 #關閉kafka
4 /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
5 sleep 3 #等3秒后執行
6 
7 #關閉zookeeper
8 /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &

3)命令行添加執行權限

1 #添加腳本執行權限
2 chmod +x kafkastart.sh
3 chmod +x kafkastop.sh

4)命令行執行腳本

1 sh /usr/local/kafka/kafkastart.sh #啟動kafka
2 
3 sh /usr/local/kafka/kafkastop.sh #關閉kafka

 

6.  第一個消息(windows)  https://www.cnblogs.com/flower1990/p/7466882.html

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

6.1 啟動zookeeper

.\bin\windows\zookeeper-server-start.bat .\config\server.properties

6.2 啟動Kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

image

 

注意:注意:不要關了這個窗口,啟用Kafka前請確保ZooKeeper實例已經准備好並開始運行

6.3 創建主題

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

clip_image005

注意:不要關了這個窗口

查看主題輸入:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

clip_image007

6.4 創建生產者

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

image

注意:不要關了這個窗口

6.5 創建消費者

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

image

 

其中6.1和6.2可以使用批處理文件

1) 創建啟動腳本,假設我們的Kafka在D:\Kafka\kafka_2.12-0.11.0.0目錄下

切換到 D:\Kafka\kafka_2.12-0.11.0.0目錄下  #創建啟動腳本

用文本編輯器編輯kafkastart.bat #編輯,添加以下代碼

#創建啟動腳本
# ...自己添加

vi kafkastop.sh #編輯,添加以下代碼

#創建關閉腳本
# 自己添加

 

雙擊即可運行。

 

7. 使用Java程序(模擬真實生產環境;生產者在Kafka服務器上,消費者在客戶端; 可以推廣到分布式環境中

如果是生產者以及消費者都在本機進行測試,則Kafka中配置文件不需要改變;且生產者和消費者都在同一台機器上。

否則:

7.1 創建Topic 

 

7.2 生產者 

eclipse中創建一個名為KafkaProduce的Java Project;接着右擊該項目new一個名為lib的Folder;然后將我們部署的kafka的libs中的所有Jar包拷貝到該項目的lib目錄下;接着右擊該項目,build Path,然后選擇configure build path中的Libraries,接着Add Jars;將本項目lib下的所有Jar包添加進來。

package com.zc.kafka.producer.main;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Kafka生產者
 * 先啟動生產者,發送消息到broker,這里簡單發送了10條從0-9的消息,再啟動消費者,控制台輸出如下:
 */
public class SimpleKafkaProducer {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "192.168.0.11:9092");  // "localhost:9092"

        //請求時候需要驗證
        props.put("acks", "all");

        //請求失敗時候需要重試
        props.put("retries", 0);

        //內存緩存區大小
        props.put("buffer.memory", 33554432);

        //指定消息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //指定消息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {  //i < 10
            // 生產一條消息的時間有點長
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
            //System.out.println(i);
        }
        System.out.println("Message sent successfully");
        producer.close();
    }

}

 

7.3 消費者

eclipse中創建一個名為KafkaConsumer的Java Project;接着右擊該項目new一個名為lib的Folder;然后將我們部署的kafka的libs中的所有Jar包拷貝到該項目的lib目錄下;接着右擊該項目,build Path,然后選擇configure build path中的Libraries,接着Add Jars;將本項目lib下的所有Jar包添加進來。

package com.zc.kafka.consumer.main;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * kafka消費者
 */
public class SimpleKafkaConsumer {

    @SuppressWarnings({ "deprecation", "resource" })
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.0.11:9092");   // "localhost:9092"
        //每個消費者分配獨立的組號
        props.put("group.id", "test");

        //如果value合法,則自動提交偏移量
        props.put("enable.auto.commit", "true");

        //設置多久一次更新被消費消息的偏移量
        props.put("auto.commit.interval.ms", "1000");

        //設置會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息
        props.put("session.timeout.ms", "30000");
        
        //
        //props.put("auto.offset.reset", "earliest");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("test"));  //核心函數1:訂閱topic

        System.out.println("Subscribed to topic " + "test");
        //int i = 0;

        while (true) {
            //System.out.println(i++);
            //核心函數2:long poll,一次拉取回來多個消息
            /* 讀取數據,讀取超時時間為100ms */
            ConsumerRecords<String, String> records = consumer.poll(100);  
            //System.out.println(records.count());
            for (ConsumerRecord<String, String> record : records)
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",
                        record.offset(), record.key(), record.value());
        }
    }

}

 

7.4 打Jar包執行

1)打Jar包

右擊該項目,選擇Export;之后選擇Runnable JAR file,接着next; 然后在Launch configuration中選擇主類(含main方法),如果沒有,則需要先運行該主類,接着Export destination選擇Jar包的存放位置和名稱,接着Library handling 選擇第二個,Finish;會生成相應Jar包。

通過 java -jar XXX.jar 運行該Jar包。

2)執行

將生產者與消費者都打成相應Jar包;都可以在服務器(有Kafka環境)和客戶機(沒有Kafka環境)上執行;並且生產者和消費者可以在不同客戶機上也可以在相同客戶機上執行。

就是我們編程以及運行的kafka項目,跟有沒有Kafka環境是無關的。

1. 服務器上先啟動Kafka

2. 服務器或者客戶機上啟動生產者 java -jar KafkaProducer.jar

3. 服務器或者客戶機上啟動消費者 java -jar KafkaConsumer.jar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM