Kafka 概念、單機搭建與使用


Kafka 概念、單機搭建與使用

官方網址:Apache Kafka® is a distributed streaming platform

基本概念介紹

在Kafka中有一些基本的概念,

Topic

  • 簡介:Topic在Kafka中是一個抽象的概念,一個主題是已經發布的記錄的種類。主題在Kafka中是可以被多重訂閱的,這就意味着一個主題可能有0個、一個、或者許多個消費者去訂閱這個主題中的消息。

  • Partitions:在每一個topic在Kafka中可以有多個分區,增加一個主題的分區可以提高Kafka的吞吐率,但是不是越多越好,因為如果分區數量越多的話生產者插入的效率也會降低。所以真正到生產環境時,需要權衡生產與消費的一個平衡關系,消費稍微大於生產者,不會產生消息的堆積,也能夠充分提高Kafka的效率。

  • Replication Factor:復制因子,是對於當前的Topic是否需要副本。如果設置成1的話,代表當前Topic在整個Kafka中只有一份。這里有個限制Topic的數量不能夠多於當前Kafka的Broker數量。

  • 存儲方式:在Kafka的配置中(Server.properties)有logs.dir的配置,這個是Kafka存儲消息的位置。如果Topic復制因子是1分區是1的話,在對應的文件夾下會有一個名稱為topicname的文件夾;如果復制因子是2分區是2,假設存在兩個Broker,在每個Broker中將會存在兩個文件夾分別為topicname_0 topicname_1的文件夾

  • Leader與Follower:由於每個topic如果存在副本的話,是對於partition進行復制。這么多存在在不同的Broker上的副本,其中有一個partition是leader其他的是Followers,當一個broker宕機會在副本中選擇一個充當Leader。關於Kafka中的選舉機制以及Leader的確認可以查看這兩篇文章:Leader確認選舉機制

Producer

生產者,顧明思議是生產消息,允許應用發布一個流的消息到一個或者多個主題中,

Consumer

  • 簡介:消費者是訂閱某個topic消息。
  • Group:每個消費者都有個groupid 來標定當前消費者屬於哪個group。Group的作用是,當同一個group的兩個消費者訂閱一個topic的時候,如果當前topic沒有分區那么其中一個消費者是獲得不了任何消息的;如果有分區的話,將會按照數量進行負載均衡,每個消費者獲得不同的分區的消息。
  • 同一個Group下的消費者不會同時訂閱一個主題下的同一個分區,如果消費者數量杜宇分區數量,則多出的消費者是不會有任何消息獲得的。

Broker

Broker 是一個Kafka的Server,一台單物理機或者集群都可以擁有多個broker一個broker可以容納多個主題,這個與復制因子、主題的分區都有關系。

Kafka單機配置,一個Broker

環境:

  • win10物理機
  • Wmare Centos7虛擬機
  • XShell 訪問虛擬機

配置zookeeper

  • 下載
# zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
  • 解壓后進入目錄
cd zookeeper-3.4.13/conf
  • 復制zookeeper的配置文件
cp zoo_sample.cfg zoo.cfg	
  • 返回上級進入bin目錄下,鍵入如下命令
./zkServer.sh start 
  • 查看是否成功開啟zookeeper服務
#注:這里提示一下開啟后提示的成功不一定是真的成功,所以需要查看一下
netstat -tunlp|egrep 2181
# 如果沒有結果查看統計目錄下的 zookeeper.out文件 查看log信息
# 使用jps命令查看 QuorumPeerMain是zookeeper的守護進程
11089 QuorumPeerMain
11114 Jps

配置Kafka

  • 下載安裝包
# Kafka
wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • 解壓后進入文件夾下bin目錄下
# 第一個是start.sh位置第二個是server.rpoperties的位置,所以確認好路徑的正確性
./kafka-server-start.sh ./../config/server.properties &
# 我們可以在Kafka的目錄下直接執行,而不進入到bin下,命令看着更舒服些
./bin/kafka-server-start.sh ./config/server.properties &
  • 查看是否開啟成功:默認的Kafka端口是9092,zookeeper是2181
netstat -tunlp|egrep "(2181|9092)"
# 結果如下
[root@localhost ~]# netstat -tunlp|egrep "(2181|9092)"
tcp6      0     0 :::9092               :::*                  LISTEN      1877/java  tcp6      0     0 :::2181               :::*                  LISTEN      1820/java
# jps 查看
11089 QuorumPeerMain
11458 Kafka
11847 Jps
  • 至此Kafka配置成功

使用Kafka

創建topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 返回結果
Created topic "test"

在虛擬機用sh腳本上作為生產者生產消息

  • 我們重新開一個Xshell窗口,CD到Kafka目錄/bin下,我們先介紹這一節會使用到的 kafka-console-producer.sh
# 鍵入如下命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>today message
>
# 最近本的指定,broker-list與topic是必須的參數
# 成功命令行會進入一個>的情況,鍵入消息按回車鍵就是發送消息到Kafka了
# 發送一個【today message】
  • kafka-console-producer.sh參數說明,運行./kafka-console-producer.sh --help可查看

在虛擬機上用sh腳本作為消費者消費消息

  • 重新開另個一Xshell窗口CD到Kafka目錄/bin下,我們先介紹這一節會使用到的 kafka-console-consumer.sh
# 鍵入如下命令
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 最近本的指定,bootstrap-server與topic/whitelist是必須的參數
# 由於有 from-beginning 參數 會從頭load所有消息
# 消費后返回如下
today message
#在生產端鍵入消息后,消費端會同步消息出現
  • kafka-console-consumer.sh參數說明運行./kafka-console-consumer.sh --help可查看

使用Python作為生產者、消費者

  • 在物理機上寫一個Python生產者的腳本
from kafka.producer import KafkaProducer
import time
def send_data(data):
	producer = KafkaProducer(bootstrap_servers='192.168.233.138:9092')
	producer.send("test",b''+str(data)+'')
    producer.flush()
	print ("end")
	
if __name__=="__main__":
	send_data("physics python message");
  • 查看Xshell上消費的命令行
[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning
111
333

1
12
physics python message
  • 在物理機上寫一個消費者的腳本
from kafka import KafkaConsumer
import time
def get_data(data):
	consumer = KafkaConsumer('test',bootstrap_servers='192.168.233.138:9092', group_id='my_favorite_group')
	print ("end")
	for msg in consumer:
		print(msg)
	
if __name__=="__main__":
	get_data();
  • 物理機消費者的結果
# 我這邊是先運行的消費者的腳本,所以實時接收到了物理機產生的消息
ConsumerRecord(topic=u'test', partition=0, offset=5, timestamp=1551762485911L, timestamp_type=0, key=None, value='physics python message', checksum=1520092583, serialized_key_size=-1, serialized_value_size=22)
  • 測試使用虛擬機sh端的生產者發送123 消息,查看物理機消費者結果
ConsumerRecord(topic=u'test', partition=0, offset=6, timestamp=1551762784609L, timestamp_type=0, key=None, value='123', checksum=1760815061, serialized_key_size=-1, serialized_value_size=3)
  • 幾點注意
# 物理機連接時可能出現【kafka.errors.NoBrokersAvailable: NoBrokersAvailable】這個錯誤按照如下順序依次更改
1. 查看虛擬機防火牆是否關閉
	systemctl status firewalld
	systemctl stop firewalld
2. 更改kafka服務端的server.properties:
	增加 [ listeners=PLAINTEXT://192.168.233.138:9092 ]這一行
3. 修改物理機的hosts文件 C:\Windows\System32\drivers\etc\hosts
	增加 【虛擬機ip 虛擬機主機名】 Eg:[192.168.233.138 localhost]

使用Springboot 作為生產者、消費者

注:我直接在我的一個寄存的Spring Boot Demo項目上更改

  • 在pom.xml中添加kafka依賴
 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>
<!-- 提示一件事情此處別指定version了,直接用最新的就可以,老的版本一些包找不到 -->
  • 寫一個kafka 生產者配置類
package com.example.kane.config;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class kafka_config {
	 public Map<String, Object> producerConfigs() {
	        Map<String, Object> props = new HashMap<>();
	        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092");
	        props.put(ProducerConfig.RETRIES_CONFIG, 0);
	        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
	        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
	        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
	        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	        return props;
	    }
	 
	    public ProducerFactory<String, String> producerFactory() {
	        return new DefaultKafkaProducerFactory<>(producerConfigs());
	    }
	 
	    @Bean
	    public KafkaTemplate<String, String> kafkaTemplate() {
	        return new KafkaTemplate<String, String>(producerFactory());
	    }

}
  • 創建一個生產數據的Controller
package com.example.kane.Controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;


@RestController
@RequestMapping("/kafka")
public class CollectController {
	 protected final Logger logger = LoggerFactory.getLogger(this.getClass());
	    @Autowired
	    private KafkaTemplate kafkaTemplate;

	    @RequestMapping(value = "/send", method = RequestMethod.GET)
	    public void sendKafka(HttpServletRequest request, HttpServletResponse response) {
	        try {
	            String message = request.getParameter("message");
	            logger.info("kafka的消息={}", message);
	            kafkaTemplate.send("test", "key", message);
	            logger.info("發送kafka成功.");
	        } catch (Exception e) {
	            logger.error("發送kafka失敗", e);
	        }
	    }

}
  • 啟動項目后,在瀏覽器訪問http://localhost:8080/kafka/send?message=url_producer
# 查看結果
2019-03-05 13:57:16.438  INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController    : 發送kafka成功.
2019-03-05 13:57:45.871  INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController    : kafka的消息=url_producer
2019-03-05 13:57:45.872  INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController    : 發送kafka成功.
# 查看虛擬機 Consumer結果

[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning
physics python message
123
null
url_producer
  • 增加消費者的配置
package com.example.kane.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

import com.example.kane.service.kafka_listener;
@Configuration
@EnableKafka
public class kafka_consumer_config {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        return propsMap;
    }
    @Bean
    public kafka_listener listener() {
        return new kafka_listener();
    }
}
  • 增加listener類
package com.example.kane.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class kafka_listener {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());


    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record) {
    	logger.info(record.toString());
        logger.info("kafka的key: " + record.key());
        logger.info("kafka的value: " + record.value().toString());
    }
}
  • 同樣我們用訪問http://localhost:8080/kafka/send?message=url_producer1重新發一個消息
# 結果
2019-03-05 14:31:04.787  INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController    : 發送kafka成功.
2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : ConsumerRecord(topic = test, partition = 0, offset = 10, CreateTime = 1551767464787, serialized key size = 3, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = url_producer1)
2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : kafka的key: key
2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : kafka的value: url_producer1
# 查看虛擬機 消費者信息
physics python message
123
null
url_producer
url_producer1
url_producer1

一些需要注意的問題

  1. 現在kafka官方提供自帶zookeeper版本,不建議使用自帶的,還是建議自己安裝zookeeper
  2. 物理機沒法訪問的時候,看文中的注意事項,依次更改一定能訪問


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM