Kafka簡介及使用PHP處理Kafka消息


Kafka簡介及使用PHP處理Kafka消息

Kafka 是一種高吞吐的分布式消息系統,能夠替代傳統的消息隊列用於解耦合數據處理,緩存未處理消息等,同時具有更高的吞吐率,支持分區、多副本、冗余,因此被廣泛用於大規模消息數據處理應用。

 

Kafka的特點:

  • 以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間復雜度的訪問性能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。【據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)】
  • 支持Kafka Server間的消息分區,同時保證每個Partition內的消息順序傳輸。
  • 分布式系統,易於向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴展機器。
  • 消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
  • 同時支持離線數據處理和實時數據處理。

Kafka的架構:

Kafka簡介及使用PHP處理Kafka消息-kafka架構圖

 

Kafka的整體架構非常簡單,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實現Kafka注冊的接口,數據從producer發送到broker,broker承擔一個中間緩存和分發的作用。broker分發注冊到系統中的consumer。broker的作用類似於緩存,即活躍的數據和離線處理系統之間的緩存。客戶端和服務器端的通信,是基於簡單,高性能,且與編程語言無關的TCP協議。

 

Kafka基本概念:

  • Topic:特指Kafka處理的消息源(feeds of messages)的不同分類。
  • Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。
  • Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發布一些消息。
  • Producers:消息和數據生產者,向Kafka的一個topic發布消息的過程叫做producers。
  • Consumers:消息和數據消費者,訂閱topics並處理其發布的消息的過程叫做consumers。
  • Broker:緩存代理,Kafa集群中的一台或多台服務器統稱為broker。

 

Kafka消息發送的流程:

Kafka簡介及使用PHP處理Kafka消息-Kafka消息發送

 

下面是PHP生產、消費Kafka消息的例子(假設已經配置好Kafka):

1.從zookeeper源碼src/c/src安裝zookeeper c client

  1.  
    cd zookeeper -3.4.8/src/c
  2.  
    ./configure
  3.  
    make && make install


2.編譯php libzookper擴展

 

  1.  
    git clone https: //github.com/Timandes/libzookeeper.git
  2.  
    cd libzookeeper
  3.  
    phpize
  4.  
    ./configure-- with-libzookeeper=/usr/local/bin/cli_mt
  5.  
    make && makeinstall

 

3.編譯php zookeeper擴展

  1.  
    git clone https: //github.com/andreiz/php-zookeeper.git
  2.  
    cd php-zookeeper
  3.  
    phpize
  4.  
    ./configure
  5.  
    make && make install

 

4.修改php.ini配置,添加libzookeeper和php-zookeeper擴展

  1.  
    extension=libzookeeper.so
  2.  
    extension=zookeeper.so

PHP處理Kafka消息:

1.啟動zookeeper和kafka

  1.  
    ./bin/zookeeper- server-start.sh config/zookeeper.properties
  2.  
     
  3.  
    ./bin/kafka- server-start.sh config/server.properties

2.創建由2個partition組成的、名為testtopic的topic

kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor --partitions --topic testtopic


3.composer安裝nmred/kafka-php

1 composer require "nmred/kafka-php"

4.producer.php代碼

  1.  
    <php 
  2.  
    require_once('./vendor/autoload.php'); 
  3.  
    $produce=/Kafka/Produce::getInstance( 'localhost:2181',3000); 
  4.  
    $produce->setRequireAck( -1); $topicName='testtopic';
  5.  
    //獲取到topic下可用的partitions
  6.  
    $partitions=$produce->getAvailablePartitions($topicName);
  7.  
    $partitionCount=count($partitions); 
  8.  
    $count= 1;//可以處理的消費者數量(可以理解為server數量)
  9.  
    while(true){    $message=json_encode(array('uid'=>$count,'age'=>$count%100,'datetime'=>date('Y-m-d H:i:s')));     
  10.  
    //發送消息到不同的partition   
  11.  
     $partitionId=$count%$partitionCount;    
  12.  
    $produce->setMessages( 'testtopic',$partitionId,array($message));   
  13.  
     $result=$produce->send();    
  14.  
    var_dump($result);     
  15.  
    $count++;   
  16.  
      echo"producer sleeping/n";   
  17.  
     sleep( 1);
  18.  
    }

5、consumer.php代碼

  1.  
    <?php 
  2.  
    require_once('./vendor/autoload.php'); 
  3.  
    //獲取需要處理的partitionId
  4.  
    $partitionId =  isset($argv[1]) ? intval($argv[1]) :0; 
  5.  
    $consumer =/Kafka/Consumer::getInstance( 'localhost:2181'); 
  6.  
    $consumer->setGroup( 'test-consumer-group');
  7.  
    $consumer->setPartition( 'testtopic', $partitionId);
  8.  
    $consumer->setFromOffset( true);
  9.  
    $consumer->setMaxBytes( 102400); 
  10.  
    while(true){    
  11.  
    $topic = $consumer->fetch();     
  12.  
    foreach ($topic as $topicName => $partition{        
  13.  
    foreach ($partition as $partId => $messageSet{            
  14.  
    foreach ($messageSet as $message){                
  15.  
    var_dump($message);           
  16.  
    }        
  17.  
    }    
  18.  
    }    
  19.  
    echo"consumer sleeping/n";   
  20.  
    sleep( 1);
  21.  
    }

 

6、在3個終端界面分別運行

  1.  
    php producer.php
  2.  
    php consumer.php
  3.  
    php consumer.php


7、兩個consumer腳本依次收到producer發送的消息

php-kafka-consumer-output

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM