Kafka 簡易教程


1.初識概念

Apache Kafka是一個分布式消息發布訂閱系統。

Topic
Kafka將消息種子(Feed)分門別類, 每一類的消息稱之為話題(Topic).

Producer
發布消息的對象稱之為話題生產者(Kafka topic producer)

Consumer
訂閱消息並處理發布的消息的種子的對象稱之為話題消費者(consumers)
Broker
已發布的消息保存在一組服務器中,稱之為Kafka集群。集群中的每一個服務器都是一個代理(Broker). 消費者可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發布的消息。

分區

一個topic可以有一個或多個分區,每一個分區都是一個順序的、不可變的消息隊列, 並且可以持續的添加。分區中的消息都被分配了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

每個分區有一個leader,零或多個follower。Leader處理此分區的所有的讀寫請求而follower被動的復制數據。如果leader當機,其它的一個follower會被推舉為新的leader。

通過分區的概念,Kafka可以在多個consumer組並發的情況下提供較好的有序性和負載均衡。將每個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就可以順序的消費這個分區的消息。因為有多個分區,依然可以在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就允許多少並發消費。

Kafka 只能保證一個分區之內消息的有序性,在不同的分區之間是不可以的,這已經可以滿足大部分應用的需求。如果需要 topic 中所有消息的有序性,那就只能讓這個 topic 只有一個分區,當然也就只有一個 consumer 組消費它。

 

2.安裝使用

1. 下載 Kafka

  • 下載 wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz 或者 
  • wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz(看哪個源比較快)
  • 解壓 tar -xzf kafka_2.11-0.10.0.0.tgz
  • 進入文件夾 cd kafka_2.11-0.10.0.0/

2. 啟動服務

  • 啟動 ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties &(利用 &放到后台方便繼續操作)
  • 啟動 Kafka bin/kafka-server-start.sh config/server.properties &

3. 創建一個叫做 dawang 的 topic,它只有一個分區,一個副本

  • 創建 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
  • 查看 bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 還可以配置 broker 讓它自動創建 topic

4. 發送消息。Kafka 使用一個簡單的命令行producer,從文件中或者從標准輸入中讀取消息並發送到服務端。默認的每條命令將發送一條消息。

  • 發送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang(然后可以隨意輸入內容,回車可以發送,ctrl+c 退出)

5. 啟動 consumer。可以讀取消息並輸出到標准輸出:

  • 接收消息 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
  • 在一個終端中運行 consumer 命令行,另一個終端中運行 producer 命令行,就可以在一個終端輸入消息,另一個終端讀取消息。這兩個命令都有自己的可選參數,可以在運行的時候不加任何參數可以看到幫助信息。

6. 搭建一個多個 broker 的集群,啟動有 3 個 broker 組成的集群,這些 broker 節點也都在本機

首先復制一下配置文件:cp config/server.properties config/server-1.properties 和 cp config/server.properties config/server-2.properties

兩個文件需要改動的內容為:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

這里我們把 broker id, 端口號和日志地址配置成和之前不一樣,然后我們啟動這兩個 broker:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
然后創建一個復制因子為 3 的 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic

可以使用 describe 命令來顯示 topic 詳情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic
Topic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

我們也可以來看看之前的另一個 topic 的情況

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawang
Topic:dawang PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
最后我們可以按照同樣的方法來生產和消費消息,例如
#生產
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic
# 消費
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic

開倆終端就可以一邊生產消息,一邊消費消息了。

 

測試一下容錯. 干掉leader,也就是Broker 1:

ps -ef | grep server-1.properties

Leader被切換到一個follower上節, 點 1 不會被列在isr中了,因為它死了:

再次使用 describe 命令來顯示 topic 詳情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic

但是,消息沒丟啊,不信你試試:

徹底刪除kafka中的topic

1、刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/tmp/kafka-logs")相關topic目錄
2、Kafka 刪除topic的命令是:

如果kafaka啟動時加載的配置文件中server.properties沒有配置delete.topic.enable=true,那么此時的刪除並不是真正的刪除,而是把topic標記為:marked for deletion

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic oh3topic

3.代碼實例

需要自行安裝librdkafka庫

https://github.com/edenhill/librdkafka

produce

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
//#include "librdkafka/rdkafka.h"
using namespace std;

bool run = true;

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
    if (message.key())
      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
  }
};


class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};

/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
    public:
        int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque)
        {
            std::cout<<"partition_cnt="<<partition_cnt<<std::endl;
            return djb_hash(key->c_str(), key->size()) % partition_cnt;
        }
    private:
        static inline unsigned int djb_hash (const char *str, size_t len)
        {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        std::cout<<"hash1="<<hash<<std::endl;

        return hash;
        }
};

void TestProducer()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="helloworld_kugou_test";//自行制定主題topic
    MyHashPartitionerCb hash_partitioner;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;

    int use_ccb = 0;

    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK)
     {
          std::cerr << errstr << std::endl;
          exit(1);
     }

    /* * Set configuration properties */
    conf->set("metadata.broker.list", brokers, errstr);
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);

    ExampleDeliveryReportCb ex_dr_cb;

    /* Set delivery report callback */
    conf->set("dr_cb", &ex_dr_cb, errstr);

    /* * Create producer using accumulated global configuration. */
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer)
    {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /* * Create topic handle. */
    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    /* * Read messages from stdin and produce to broker. */
    for (std::string line; run && std::getline(std::cin, line);)
    {
        if (line.empty())
        {
            producer->poll(0);
            continue;
        }

      /* * Produce message // 1. topic // 2. partition // 3. flags // 4. payload // 5. payload len // 6. std::string key // 7. msg_opaque? NULL */
      std::string key=line.substr(0,5);//根據line前5個字符串作為key值
      // int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());
      // std::cout<<"hash="<<a<<std::endl;
      RdKafka::ErrorCode resp = producer->produce(topic, partition,
          RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
          const_cast<char *>(line.c_str()), line.size(),
          key.c_str(), key.size(), NULL);//這里可以設計key值,因為會根據key值放在對應的partition
        if (resp != RdKafka::ERR_NO_ERROR)
            std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;
        else
            std::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;
        producer->poll(0);//對於socket進行讀寫操作。poll方法才是做實際的IO操作的。return the number of events served
    }
    //
    run = true;

    while (run && producer->outq_len() > 0) {
      std::cerr << "Waiting for " << producer->outq_len() << std::endl;
      producer->poll(1000);
    }

    delete topic;
    delete producer;
}
 
int main(int argc, char *argv[])
{
    TestProducer();
    return EXIT_SUCCESS;
}

consumer

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
using namespace std;

bool run = true;
bool exit_eof = true;
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
    if (message.key())
      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
  }
};


class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};

/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
    public:
        int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque)
        {
            std::cout<<"partition_cnt="<<partition_cnt<<std::endl;
            return djb_hash(key->c_str(), key->size()) % partition_cnt;
        }
    private:
        static inline unsigned int djb_hash (const char *str, size_t len)
        {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        std::cout<<"hash1="<<hash<<std::endl;

        return hash;
        }
};

void msg_consume(RdKafka::Message* message, void* opaque)
{
    switch (message->err())
    {
        case RdKafka::ERR__TIMED_OUT:
            break;

        case RdKafka::ERR_NO_ERROR:
          /* Real message */
            std::cout << "Read msg at offset " << message->offset() << std::endl;
            if (message->key())
            {
                std::cout << "Key: " << *message->key() << std::endl;
            }
            printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));
            break;
        case RdKafka::ERR__PARTITION_EOF:
              /* Last message */
              if (exit_eof)
              {
                  run = false;
                  cout << "ERR__PARTITION_EOF" << endl;
              }
              break;
        case RdKafka::ERR__UNKNOWN_TOPIC:
        case RdKafka::ERR__UNKNOWN_PARTITION:
            std::cerr << "Consume failed: " << message->errstr() << std::endl;
            run = false;
            break;
    default:
        /* Errors */
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        run = false;
    }
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
    public:
        void consume_cb (RdKafka::Message &msg, void *opaque)
        {
            msg_consume(&msg, opaque);
        }
};
void TestConsumer()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="helloworld_kugou_test";//helloworld_kugou
    MyHashPartitionerCb hash_partitioner;
    int32_t partition = RdKafka::Topic::PARTITION_UA;//為何不能用??在Consumer這里只能寫0???無法自動嗎???
    partition = 0;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;

    int use_ccb = 0;

    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK)
    {
        std::cerr << errstr << std::endl;
        exit(1);
    }

    /* * Set configuration properties */
    conf->set("metadata.broker.list", brokers, errstr);
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);

    ExampleDeliveryReportCb ex_dr_cb;

    /* Set delivery report callback */
    conf->set("dr_cb", &ex_dr_cb, errstr);
    /* * Create consumer using accumulated global configuration. */
    RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
    if (!consumer)
    {
      std::cerr << "Failed to create consumer: " << errstr << std::endl;
      exit(1);
    }

    std::cout << "% Created consumer " << consumer->name() << std::endl;

    /* * Create topic handle. */
    RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
    if (!topic)
    {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    /* * Start consumer for topic+partition at start offset */
    RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
    if (resp != RdKafka::ERR_NO_ERROR) {
      std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
      exit(1);
    }

    ExampleConsumeCb ex_consume_cb;

    /* * Consume messages */
    while (run)
    {
        if (use_ccb)
        {
            consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);
      }
      else
      {
          RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
          msg_consume(msg, NULL);
          delete msg;
      }
      consumer->poll(0);
    }

    /* * Stop consumer */
    consumer->stop(topic, partition);

    consumer->poll(1000);

    delete topic;
    delete consumer;
}
 
int main(int argc, char *argv[])
{
    TestConsumer();
    return EXIT_SUCCESS;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM