Golang 連接Kafka


Kafka介紹

Kafka是Apache軟件基金會開發的一個開源流處理平台,由Java和Scala編寫;Kafka是一種高吞吐、分布式、基於訂閱發布的消息系統。

 

Kafka名稱解釋

  • Producer:生產者
  • Consumer:消費者
  • Topic:消息主題,每一類的消息稱之為一個主題
  • Broker:Kafka以集群的方式運行,可以由一個或多個服務器組成,每個服務器叫做一個broker
  • Partition:物理概念上的分區,為了提供系統吞吐量,在物理上每個Topic會分為一個或多個Partition

 

Kafka架構圖

一個典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。

Kafka通過Zookeeper管理集群配置及服務協同,Producer使用push模式將消息發布到broker,Consumer通過監聽使用pull模式從broker訂閱並消費消息。

圖上有個細節需要注意,producer給broker的過程是push,也就是有數據就推送給broker,而consumer給broker的過程是pull,是通過consumer主動去拉數據的,而不是broker把數據主動發送給consumer端的。

 

Kafka與RabbitMQ比較

  • Kafka比RabbitMQ性能要高
  • RabbitMQ比Kafka可靠性要高
  • 因此在金融支付領域使用RabbitMQ居多,而在日志處理、大數據等方面Kafka使用居多。

 

Kafka安裝

第一步 下載Kafka:

  地址 http://kafka.apache.org/downloads

第二步 解壓Kafka:

  tar -zxvf kafka.tgz -C  /usr/local/kafka

第三步 運行Zookeeper:

   以后台方式運行 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &  zookeeper端口 2181

第四步 運行Kafka:

     以后台方式運行 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  kafka端口 9092

 

Kafka圖形管理工具

http://www.kafkatool.com/download.html 

 

Go語言中使用Kafka

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

安裝sarama

  go get github.com/Shopify/sarama

Producer

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    // 新建一個arama配置實例
    config := sarama.NewConfig()

    // WaitForAll waits for all in-sync replicas to commit before responding.
    config.Producer.RequiredAcks = sarama.WaitForAll

    // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    config.Producer.Return.Successes = true

    // 新建一個同步生產者
    client, err := sarama.NewSyncProducer([]string{"172.16.65.210:9092"}, config)
    if err != nil {
        fmt.Println("producer close, err:", err)
        return
    }
    defer client.Close()

    // 定義一個生產消息,包括Topic、消息內容、
    msg := &sarama.ProducerMessage{}
    msg.Topic = "revolution"
    msg.Key = sarama.StringEncoder("miles")
    msg.Value = sarama.StringEncoder("hello world...")

    // 發送消息
    pid, offset, err := client.SendMessage(msg)


    msg2 := &sarama.ProducerMessage{}
    msg2.Topic = "revolution"
    msg2.Key = sarama.StringEncoder("monroe")
    msg2.Value = sarama.StringEncoder("hello world2...")
    pid2, offset2, err := client.SendMessage(msg2)


    if err != nil {
        fmt.Println("send message failed,", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
    fmt.Printf("pid2:%v offset2:%v\n", pid2, offset2)
}

Consumer

package main

import (
    "sync"
    "github.com/Shopify/sarama"
    "fmt"
)

var wg sync.WaitGroup

func main() {
    consumer, err := sarama.NewConsumer([]string{"172.16.65.210:9092"}, nil)
    if err != nil {
        fmt.Println("consumer connect error:", err)
        return
    }
    fmt.Println("connnect success...")
    defer consumer.Close()
    partitions, err := consumer.Partitions("revolution")
    if err != nil {
        fmt.Println("geet partitions failed, err:", err)
        return
    }

    for _, p := range partitions {
        partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest)
        if err != nil {
            fmt.Println("partitionConsumer err:", err)
            continue
        }
        wg.Add(1)
        go func(){
            for m := range partitionConsumer.Messages() {
                fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM