Scala調用Kafka的生產者和消費者Demo,以及一些配置參數整理


kafka簡介

Kafka是apache開源的一款用Scala編寫的消息隊列中間件,具有高吞吐量,低延時等特性。

Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)稱為broker。

無論是kafka集群,還是producer和consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。

kafka主要的組件介紹

Producer:消息生產者,就是向kafka broker發消息的客戶端。

Consumer:消息消費者,向kafka broker取消息的客戶端

Topic :我們可以理解為一個隊列,消息根據Topic進行歸類。

Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以對應多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個partion只會把消息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。

Broker :一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

kafka文件存儲機制

  1. topic中partition存儲分布

  2. partiton中文件存儲方式

  3. partiton中segment文件存儲結構

  4. 在partition中通過offset查找message

代碼模擬生產者消費者案例

pom.xml添加依賴

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

生產者:

package com.linys.scala.KAFKA_producer
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

/**
  * 實現producer
  */
object KafkaProducerDemo {
  def main(args: Array[String]): Unit = {
    val prop = new Properties
    // 指定請求的kafka集群列表
    prop.put("bootstrap.servers", "essum:9092")// 指定響應方式
    //prop.put("acks", "0")
    prop.put("acks", "all")
    // 請求失敗重試次數
    //prop.put("retries", "3")
    // 指定key的序列化方式, key是用於存放數據對應的offset
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 指定value的序列化方式
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 配置超時時間
    prop.put("request.timeout.ms", "60000")
    //prop.put("batch.size", "16384")
    //prop.put("linger.ms", "1")
    //prop.put("buffer.memory", "33554432")

    // 得到生產者的實例
    val producer = new KafkaProducer[String, String](prop)

    // 模擬一些數據並發送給kafka
    for (i <- 1 to 100) {
      val msg = s"${i}: this is a linys ${i} kafka data"
      println("send -->" + msg)
      // 得到返回值
      val rmd: RecordMetadata = producer.send(new ProducerRecord[String, String]("linys", msg)).get()
      println(rmd.toString)
      Thread.sleep(500)
    }

    producer.close()
  }
}

消費者:

package com.linys.scala.KAFKA_consumer
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
/**
  * 實現consumer
  */
object KafkaConsumerDemo {
  def main(args: Array[String]): Unit = {
    // 配置信息
    val prop = new Properties
    prop.put("bootstrap.servers", "essum:9092")
    // 指定消費者組
    prop.put("group.id", "group01")
    // 指定消費位置: earliest/latest/none
    prop.put("auto.offset.reset", "earliest")
    // 指定消費的key的反序列化方式
    prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    // 指定消費的value的反序列化方式
    prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put("enable.auto.commit", "true")
    prop.put("session.timeout.ms", "30000")
    // 得到Consumer實例
    val kafkaConsumer = new KafkaConsumer[String, String](prop)
    // 首先需要訂閱topic
    kafkaConsumer.subscribe(Collections.singletonList("linys"))
    // 開始消費數據
    while (true) {
      // 如果Kafak中沒有消息,會隔timeout這個值讀一次。比如上面代碼設置了2秒,也是就2秒后會查一次。
      // 如果Kafka中還有消息沒有消費的話,會馬上去讀,而不需要等待。
      val msgs: ConsumerRecords[String, String] = kafkaConsumer.poll(2000)
      // println(msgs.count())
      val it = msgs.iterator()
      while (it.hasNext) {
        val msg = it.next()
        println(s"partition: ${msg.partition()}, offset: ${msg.offset()}, key: ${msg.key()}, value: ${msg.value()}")
      }
    }
  }
}

執行輸出:

 

 

 

 

 

生產者配置參數解釋:

bootstrap.servers: kafka集群broker的地址
key.serializer:關鍵字的序列化方式
value.serializer:消息值的序列化方式
acks:指定必須要有多少個分區的副本接收到該消息,服務端才會向生產者發送響應,可選值為:0,1,2,…,all,如果設置為0,producter就只管發出不管kafka server有沒有確認收到。設置all則表示kafka所有的分區副本全部確認接收到才返回。
buffer.memory:生產者的內存緩沖區大小。如果生產者發送消息的速度 > 消息發送到kafka的速度,那么消息就會在緩沖區堆積,導致緩沖區不足。這個時候,send()方法要么阻塞,要么拋出異常。
max.block.ms:表示send()方法在拋出異常之前可以阻塞多久的時間,默認是60s
compression.type:消息在發往kafka之前可以進行壓縮處理,以此來降低存儲開銷和網絡帶寬。默認值是null,可選的壓縮算法有snappy、gzip和lz4
retries:生產者向kafka發送消息可能會發生錯誤,有的是臨時性的錯誤,比如網絡突然阻塞了一會兒,有的不是臨時的錯誤,比如“消息太大了”,對於出現的臨時錯誤,可以通過重試機制來重新發送
retry.backoff.ms:每次重試之間間隔的時間,第一次失敗了,那么休息一會再重試,休息多久,可以通過這個參數來調節
batch.size:生產者在發送消息時,可以將即將發往同一個分區的消息放在一個批次里,然后將這個批次整體進行發送,這樣可以節約網絡帶寬,提升性能。該參數就是用來規約一個批次的大小的。但是生產者並不是說要等到一個批次裝滿之后,才會發送,不是這樣的,有時候半滿,甚至只有一個消息的時候,也可能會發送,具體怎么選擇,我們不知道,但是不是說非要等裝滿才發。因此,如果把該參數調的比較大的話,是不會造成消息發送延遲的,但是會占用比較大的內存。但是如果設置的太小,會造成消息發送次數增加,會有額外的IO開銷
linger.ms:生產者在發送一個批次之前,可以適當的等一小會,這樣可以讓更多的消息加入到該批次。這樣會造成延時增加,但是降低了IO開銷,增加了吞吐量
client.id:服務器用來標志消息的來源,是一個任意的字符串
max.in.flight.requests.per.connection:一個消息發送給kafka集群,在收到服務端的響應之前的這段時間里,生產者還可以發n-1個消息。這個參數配置retries,可以保證消息的順序,后面會介紹
request.timeout.ms:生產者在發送消息之后,到收到服務端響應時,等待的時間限制
max.request.size:生產者發送消息的大小。可以是一個消息的大小,也可以發送的一個批次的消息大小
receive.buffer.bytes和send.buffer.bytes:tcp socket接收和發送消息的緩沖區大小,其實指的就是ByteBuffer的大小

消費者配置參數解釋:

groupid:一個字符串用來指示一組consumer所在的組群。實現同一個topic可由不同的組群消費

auto.offset.reset:可選三個參數

earliest ---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none---topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

socket.timeout.ms:默認值:3000,socket超時時間。

socket.buffersize: 默認值:64*1024,socket receive buffer。

fetch.size: 默認值:300 * 1024,控制在一個請求中獲取的消息的字節數。 這個參數在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代。

backoff.increment.ms:默認值:1000,這個參數避免在沒有新數據的情況下重復頻繁的拉數據。 如果拉到空數據,則多推后這個時間。

queued.max.message.chunks:默認值:2,consumer內部緩存拉回來的消息到一個隊列中。 這個值控制這個隊列的大小。

auto.commit.enable:默認值:true,如果true,consumer定期地往zookeeper寫入每個分區的offset。

auto.commit.interval.ms:默認值:10000,往zookeeper上寫offset的頻率。

auto.offset.reset:默認值:largest,如果offset出了返回,則 smallest: 自動設置reset到最小的offset. largest : 自動設置offset到最大的offset. 其它值不允許,會拋出異常。

consumer.timeout.ms:默認值:-1,默認-1,consumer在沒有新消息時無限期的block。如果設置一個正值, 一個超時異常會拋出。

rebalance.retries.max:默認值:4,rebalance時的最大嘗試次數。

max.poll.interval.ms:拉取的最大時間間隔,如果你一次拉取的比較多,建議加大這個值,長時間沒有調用poll,且間隔超過這個值時,就會認為這個consumer失敗了

max.poll.records:默認值:500,Consumer每次調用poll()時取到的records的最大數。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM