python操作kafka


kafka消息隊列

  • 當數據量大到一定程度,我們用kafka做消息中間件為了是實現高可用,多副本(避免數據丟失),高並發(同時支持多個客戶端讀寫)。
  • kafka本身是用scala語言編寫,生產者比如我們nginx,Flume(日志),dataX,web程序等。我們消費者我們可以用python程序,SparkStreaming,Java程序,Flink等,而kafka數據消費需要記錄消費的偏移量。

1.kafka特點

1.解耦:
  允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2.冗余:
  消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3.擴展性:
  因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4.靈活性 & 峰值處理能力:
  在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5.可恢復性:
  系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6.順序保證:
  在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)
7.緩沖:
  有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8.異步通信:
  很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

2.kafka架構


3.kafka角色

1.producer:
  消息生產者,發布消息到 kafka 集群的終端或服務。
2.broker:
  kafka 集群中安裝Kafka的服務器。
3.topic:
  每條發布到 kafka 集群的消息屬於的類別,即 kafka 是面向 topic 的(相當於數據庫中的表)
4.partition:
  partition 是物理上的概念,每個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。partition分區越多並發越強,分區分為leader分區和follower分區,分區的leader和follower是zk選舉的。
5.consumer:
  從 kafka 集群中消費消息的終端或服務。從指定Topic的leader分區拉取數據,消費者會管理偏移量(記錄數據讀取位置,避免重復讀取)
6.Consumer group:
  high-level consumer API 中,每個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但可以被多個 consumer group 消費。
7.replica:
  partition 的副本,保障 partition 的高可用。
8.leader:
  replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9.follower:
  replica 中的一個角色,從 leader 中復制數據。
10.zookeeper:
  kafka 通過 zookeeper 來存儲集群的 meta 信息

注意:數據如果被消費了,Kafka中數據並沒有刪除,只是標記被哪個消費者組消費了。

4.Kafka集群安裝

  • 准備3台機器

    10.0.0.134 linux01
    10.0.0.131 linux02
    10.0.0.132 linux03
    # zk集群需要安裝奇數個3台或5台... kafka不需要安裝那么多
    
  • 安裝jdk 自行百度

  • 安裝zookeeper集群,並且啟動,安裝zookeeper集群

  • 准備號kafka安裝包,拖拽到虛擬機,解壓到/bigdata/目錄下

    tar -zxvf kafka_2.11-1.1.1.tgz -C /bigdata/
    
  • 配置server.properties

    vi /bigdata/kafka_2.11-1.1.1/config/server.properties
    
    每台機器配置唯一的broker.id
    broker.id = 1
    log.dirs kafka存放數據目錄
    num.partitions 分區數量
    log.retention.hours 保留數據時間
    zookeeper.connect 配置zookeeper
    delete.topic.enable=true # 生產環境不允許刪除topic數據,測試環境可以設置true
    # linux01配置如下
    broker.id = 1
    log.dirs=/data/kafka
    zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
    num.partitions=2
    log.retention.hours=168
    delete.topic.enable=true
    # scp到其他機器
    scp -r /bigdata/kafka_2.11-1.1.1/ linux02:$CMD
    scp -r /bigdata/kafka_2.11-1.1.1/ linux03:$CMD
    # 更改linux02,linxu03 的brker.id
    

5.kafka啟動

  • 指定server.properties 后台啟動
/bigdata/kafka_2.11-1.1.1/bin/kafka-server-start.sh -daemon /bigdata/kafka_2.11-1.1.1/config/server.properties 
  • 其他機器一樣
# 日志查看
less /bigdata/kafka_2.11-1.1.1/logs/server.log
  • 這樣kafka集群啟動成功

  • 進入zookeeper查看

ls / # 可以看到多了很多節點



[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]# ids linux01,linux02,linux03節點信息。 topics表
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1, 2, 3]

# 查看ids 為1 的信息 也就是linux01信息
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://linux01:9092"],"jmx_port":-1,"host":"linux01","timestamp":"1617293687406","port":9092,"version":4}
cZxid = 0x1900000035
ctime = Fri Apr 02 00:14:46 CST 2021
mZxid = 0x1900000035
mtime = Fri Apr 02 00:14:46 CST 2021
pZxid = 0x1900000035
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3788e3497850000
dataLength = 184
numChildren = 0

  • shell命令
# 1.查看有多少個topic
/bigdata/kafka_2.11-1.1.1/bin/kafka-topics.sh --list --zookeeper linux01:2181
	# linux01:2181 指定zk地址
# 2.創建一個topic
/bigdata/kafka_2.11-1.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic wordcount --replication-factor 3 --partitions 3
#  創建名為wordcount 的 topic
# --replication-factor 保存3份副本
# --partitions 指定分區3個
# 3.生產者
/bigdata/kafka_2.11-1.1.1/bin/kafka-console-producer.sh --broker-list linux01:9092,linux02:9092,linux03:9092 --topic wordcount
# 消費者
/bigdata/kafka_2.11-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic wordcount --from-beginning
# --from-beginning 從頭開始消費,如果不指定會從消費者啟動后開始消費

#/bigdata/kafka_2.11-1.1.1/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic wordcount
# Topic  topic名稱
# PartitionCount  分區數量
# ReplicationFactor  副本數量
# Partition 哪個分區
# Leader: broker上leader是誰
# Replicas 副本存放位置
# Isr 同步順序
  • 總結:
1.Kafka的生成者直接向Broker的Leader分區寫入數據,不需要連接Zookeeper
2.Kafka的消費者(老版本API需要連接Zookeeper,獲取Broker信息和偏移量),新的Api不需要連接Zookeeper。

6.客戶端連接

  • 客戶端連接之前需要更改server.properties配置
listeners=PLAINTEXT://<本機IP>:9092
advertised.listeners=PLAINTEXT://<本機IP>:9092
zookeeper.connect=localhost:2181改為zookeeper.connect=<集群中機器IP>:2181
e.g.:zookeeper.connect=10.0.0.70:2181,10.0.0.71:2181,10.0.0.72:2181

7.python客戶端操作kafka

  • 這里介紹pykafka,pykafka安裝
pip install pykafka
  • 消費者
class KafkaConsumer(object):
    def __init__(self, hosts, topic):
        self.client = KafkaClient(hosts=hosts)
        self.topic = self.client.topics[topic.encode()]
    def simple_consumer(self, offset=0):
        """
        指定消費
        :param offset:
        :return:
        """
        partitions = self.topic.partitions
        print("分區:", partitions)
        print("last_offset", self.topic.latest_available_offsets())
        # 自動提交消費
        consumer = self.topic.get_simple_consumer(b"simple_consumer",
                                                  partitions=[partitions[1]], auto_commit_enable=True,auto_commit_interval_ms=1)
        # for i in range(100):
        #
        #     message = consumer.consume()
        #     print(message.value.decode())
        #     # 當前消費分區offset情況
        #     print("offset_list = ",consumer.held_offsets)# {1: 5}  key為分區, 第幾個




    def balance_consumer(self, offset=0):
        """
        balance consumer 消費kafka,無重復消費
        :param offset:
        :return:
        """
        # managed=True 設置后,使用新式reblance分區方法,不需要使用zk,而False是通過zk來實現reblance的需要使用zk
        consumer = self.topic.get_balanced_consumer(b"balance_consumer", managed=True, auto_commit_enable=True,auto_commit_interval_ms=1)
        partitions = self.topic.partitions
        print("partitions", partitions)
        earliest_offsets = self.topic.earliest_available_offsets()
        print("earliest_offsets", earliest_offsets)
        last_offsets = self.topic.latest_available_offsets()
        print("最近可用offset {}".format(last_offsets))
        offset = consumer.held_offsets
        print("當前消費者分區offset情況{}".format(offset))
        while True:
            msg = consumer.consume()
            offset = consumer.held_offsets
            print("{}, 當前消費者分區offset情況{}".format(msg.value.decode(), offset))
    def balance_consumer_by_id(self):
        """
        根據consumer_id消費
        :return:
        """
        consumer = self.topic.get_simple_consumer(consumer_group=b'test_group', auto_commit_enable=True,
                                             auto_commit_interval_ms=1, consumer_id=b'test_id')

        for message in consumer:
            if message is not None:
                print(message.offset, message.value.decode('utf-8'))

  • 生產者
class KafkaProducter(object):
    def __init__(self, hosts, topic):
        self.client = KafkaClient(hosts=hosts)
        self.topic = self.client.topics[topic.encode()]
    def get_all_partitions(self,):
        """
        查看所有分區
        :return:
        """
        print("所有分區:", self.topic.partitions)
        return self.topic.partitions
    def get_earliest_available_offsets(self):
        """
        獲取最早可用的offset
        :return:
        """
        return self.topic.earliest_available_offsets()


    def get_partition_offset(self):
        """
        獲取所有分區最后一次offsets
        :return:
        """
        all_last_offset = self.topic.latest_available_offsets()
        return all_last_offset
    def get_producer(self, data):
        """
        同步生產數據
        :param data:
        :return:
        """
        p = self.topic.get_producer(sync=True)
        p.produce(data.encode())
    def producer_designated_partition(self, data):
        """
        往指定分區寫入消息
        :return:
        """
        def assign_patition(pid, key):
            """
            指定特定分區, 這里測試寫入第一個分區(id=0)

            需要在獲取生產者的時候指定選區函數,
            並且在生產消息的時候額外指定一個key

            :param pid: 為分區列表
            :param key:
            :return:
            """
            print("為消息分配partition {} {}".format(pid, key))
            return pid[0]
        p = self.topic.get_producer(sync=True, partitioner=assign_patition)
        p.produce(data.encode(), partition_key=b"partition_key_0")

        def async_produce_message(self, topic):
            """
            異步生產消息,消息會被推到一個隊列里面,
            另外一個線程會在隊列中消息大小滿足一個閾值(min_queued_messages)
            或到達一段時間(linger_ms)后統一發送,默認5s
            :return:
            """
            topic = self.client.topics[topic.encode()]
            last_offset = topic.latest_available_offsets()
            print("最近的偏移量 offset {}".format(last_offset))

            # 記錄最初的偏移量
            old_offset = last_offset[0].offset[0]
            p = topic.get_producer(sync=False, partitioner=lambda pid, key: pid[0])
            p.produce(str(time.time()).encode())
            s_time = time.time()
            while True:
                last_offset = topic.latest_available_offsets()
                print("最近可用offset {}".format(last_offset))
                if last_offset[0].offset[0] != old_offset:
                    e_time = time.time()
                    print('cost time {}'.format(e_time - s_time))
                    break
                time.sleep(1)

    def async_produce_message(self):
        """
        異步生產消費
        :return:
        """
        last_offset = self.get_partition_offset()
        print("最近的偏移量 offset {}".format(last_offset))
        # print(self.get_earliest_available_offsets())
        # 獲取某個分區的offset偏移量
        old_offset = last_offset[1].offset[0]
        print("0分區偏移量:", old_offset)
        p = self.topic.get_producer(sync=False, partitioner=lambda pid, key: pid[1])
        p.produce(str(time.time()).encode())
        s_time = time.time()
        while True:
            last_offset = self.get_partition_offset()
            print("最近可用offset", last_offset)
            print("old_offset,,,,,old_offset", old_offset)
            if last_offset[1].offset[0] != old_offset:
                e_time = time.time()
                print('cost time {}'.format(e_time - s_time))
                break
            time.sleep(1)
    def get_produce_message_report(self):
        """
        查看異步發送消報告,默認會等待5s后才能獲得報告
        """
        last_offset = self.topic.latest_available_offsets()
        print("最近的偏移量 offset {}".format(last_offset))
        p = self.topic.get_producer(sync=False, delivery_reports=True, partitioner=lambda pid, key: pid[0])
        p.produce(str(time.time()).encode())
        s_time = time.time()
        delivery_report = p.get_delivery_report()
        e_time = time.time()
        print('等待{}s, 遞交報告{}'.format(e_time - s_time, delivery_report))
        last_offset = self.topic.latest_available_offsets()
        print("最近的偏移量 offset {}".format(last_offset))
  • 參考

https://blog.csdn.net/zzq900503/article/details/91454185


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM