go 操作 kafka


import "github.com/Shopify/sarama"
  saram 使用純go語言編寫。用於處理Apache Kafka(0.8及更高版本)的純Go客戶端庫. 它包括一個易於生成和使用消息的高級API,以及一個在高級API不足時控制線路上字節的低級API. 內嵌提供了高級API的用法示例及其完整文檔.
 

生產者:

有幾種類型的生產者:
  sarama.NewSyncProducer() //同步發送者
  sarama.NewAsyncProducer() //異步發送者
 
同步模式:
func main() {
   config := sarama.NewConfig()  //實例化個sarama的Config
   config.Producer.Return.Successes = true  //是否開啟消息發送成功后通知 successes channel
   config.Producer.Partitioner = sarama.NewRandomPartitioner //隨機分區器
   client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config) //初始化客戶端
   defer client.Close()
   if err != nil {panic(err)}
   producer,err := sarama.NewSyncProducerFromClient(client)
   if err!=nil {panic(err)}
   partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "liangtian_topic", Key: nil, Value: sarama.StringEncoder("hahaha")})
   if err != nil {
      log.Fatalf("unable to produce message: %q", err)
   }
   fmt.Println("partition",partition)
   fmt.Println("offset",offset)
}

  

異步模式:
  異步模式,顧名思義就是produce一個message之后不等待發送完成返回;這樣調用者可以繼續做其他的工作。
 config := sarama.NewConfig()
    // config.Producer.Return.Successes = true
    client, err := sarama.NewClient([]{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewAsyncProducerFromClient
    if err != nil {
        log.Fatalf("unable to create kafka producer: %q", err)
    }
    defer producer.Close()

    text := fmt.Sprintf("message %08d", i)
    producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
    // wait response
    select {
            //case msg := <-producer.Successes():
            //    log.Printf("Produced message successes: [%s]\n",msg.Value)
            case err := <-producer.Errors():
                log.Println("Produced message failure: ", err)
            default:
                log.Println("Produced message default",)
    }
    ...

  

關於異步producer有一個地方需要注意的。
  1. 異步模式produce一個消息后,缺省並不會報告成功狀態。

config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

  

則這段代碼會掛住,因為設置沒有要求返回成功config.Producer.Return.Successes = false,那么在select等待的時候producer.Successes()不會返回,producer.Errors()也不會返回(假設沒有錯誤發生),就掛在這兒。當然可以加一個default分支繞過去,就不會掛住了:

select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default")
}
  1. 如果打開了Return.Successes配置,則上述代碼段等同於同步方式

config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

  

從log可以看到,每發送一條消息,收到一條Return.Successes,類似於:

2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...
就像是同步produce一樣的行為了。
  1. 如果打開了Return.Successes配置,而又沒有producer.Successes()提取,那么Successes()這個chan消息會被寫滿。

config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    //case msg := <-producer.Successes():
    //    log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default",)
}
寫滿的結果就是不能再寫入了,導致后面的Return.Successes消息丟失, 而且producer也會掛住,因為共享的buffer被占滿了,大量的Return.Successes沒有被消耗掉。
運行一段時間后:

2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]

  

在produce第00000608個message的時候被掛住了,因為消息緩沖滿了;這個緩沖的大小是可配的(可能是這個MaxRequestSize?),但是不管大小是多少,如果沒有去提取Success消息最終都會被占滿的。
結論就是說配置config.Producer.Return.Successes = true和操作<-producer.Successes()必須配套使用;配置成true,那么就要去讀取Successes,如果配置成false,則不能去讀取Successes。

 
 

消費者:

使用消費組消費:
 每一個Topic的分區只能被一個消費組中的一個消費者所消費。一個消費者可以同時消費多個topic

 

type consumerGroupHandler struct{
   name string
}
func main1() {
   var wg sync.WaitGroup
   config := sarama.NewConfig()
   config.Consumer.Return.Errors = false
   config.Version = sarama.V0_10_2_0
   config.Consumer.Offsets.Initial = sarama.OffsetOldest
   client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config)
   defer client.Close()
   if err != nil {
      panic(err)
   }
   group1, err := sarama.NewConsumerGroupFromClient("c1", client)
   if err != nil {
      panic(err)
   }
   group2, err := sarama.NewConsumerGroupFromClient("c2", client)
   if err != nil {
      panic(err)
   }
   group3, err := sarama.NewConsumerGroupFromClient("c3", client)
   if err != nil {
      panic(err)
   }
   defer group1.Close()
   defer group2.Close()
   defer group3.Close()
   wg.Add(3)
   go consume(&group1,&wg,"c1")
   go consume(&group2,&wg,"c2")
   go consume(&group3,&wg,"c3")
   wg.Wait()
   signals := make(chan os.Signal, 1)
   signal.Notify(signals, os.Interrupt)
   select {
   case <-signals:
   }

}
func consume(group *sarama.ConsumerGroup,wg  *sync.WaitGroup, name string) {
   fmt.Println(name + "start")
   wg.Done()
   ctx := context.Background()
   for {
      //topic := []string{"tiantian_topic1","tiantian_topic2"} 可以消費多個topic
      topics := []string{"liangtian_topic"}
      handler := consumerGroupHandler{name: name}
      err := (*group).Consume(ctx, topics, handler)
      if err != nil {
         panic(err)
      }
   }
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
   claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n",h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
      // 手動確認消息
      sess.MarkMessage(msg, "")
   }
   return nil
}

func handleErrors(group *sarama.ConsumerGroup,wg  *sync.WaitGroup ){
   wg.Done()
   for err := range (*group).Errors() {
      fmt.Println("ERROR", err)
   }
}

  

 

 

  普通消費者(我姑且這么說)。有些情況下我們有些消費者是沒有消費組的,正常的消費者可自動分配分區到消費者並且組中消費者新增或刪除會自動觸發負載均衡的消費組。
  但在某些情況下,卻想要更簡單的東西。有時你知道你有一個單一的消費者總是需要從主題中的所有分區讀取數據,或者從一個主題特定分區讀取數據。在這種情況下沒有理由需要組或負載均衡,只是訂閱特定的主題或分區,偶爾使用消息和提交偏移量。
  但是有個注意的點。除了沒有負載均衡以及需要手動查找分區,一切看起來都很正常。請記住,如果有人向主題添加新分區,則不會通知消費者。所以無論是處理通過定期檢查consumer.partitionsFor()或者記住是否是管理員添加分區,應用程序將需要跳躍。還要注意的是消費者可以訂閱的主題(成為一個消費組的一部分),或分配自己的分區,但不能同時實現。下面可以看看代碼。一般不這么用。一般都用消費組+消費者
func main() {
   var wg sync.WaitGroup
   //創建消費者
   config := sarama.NewConfig()
   config.Consumer.Return.Errors = true
   client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config)
   defer client.Close()
   if err != nil {
      panic(err)
   }
   consumer, err := sarama.NewConsumerFromClient(client)
   defer consumer.Close()
   if err != nil {panic(err)}
   //設置分區
   partitionList, err :=  consumer.Partitions("liangtian_topic")
   if err != nil {
      fmt.Println("faild to get the list of partitions",err)
   }
   //[0 1 2]
   fmt.Println(partitionList)
   //循環讀取分區
   for partition := range partitionList {
      pc, err := consumer.ConsumePartition("liangtian_topic", int32(partition), sarama.OffsetOldest)
      if err != nil {
         fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
         return
      }
      defer pc.AsyncClose()
      wg.Add(1)
      go func(pc sarama.PartitionConsumer) {
         defer wg.Done()
         for msg := range pc.Messages() {
            fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            fmt.Println()
         }
      }(pc)
   }
   //time.Sleep(time.Hour)
   wg.Wait()
   consumer.Close()
}

  

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM