kafka學習筆記2:生產者


這次的筆記主要記錄一下kafka的生產者的使用和一些重要的參數。
文中主要截圖均來自kafka權威指南

主要涉及到兩個類KafkaProducerProducerRecord.


總覽

生產者的主要架構如下:

首先創建了一個ProducerRecord
進行序列化 kv變為ByteArray
進入Partitioner 如果之前指定了分區 那這一步什么都不會做
接着將Record放入要被發送到的同樣的topic和分區的batch中
另一個單獨的thread會進行發送操作
發送成功會返回RecordMetadata(包含topic 分區 偏移量)
失敗的話會返回錯誤 producer會重試幾次(retries配置)直到成功或者放棄返回錯誤

構造Kafka Producer

示例代碼如下:

Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.56.101:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("metadata.fetch.timeout.ms", 5000);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry",
		"Precision Products", "France");
Future<RecordMetadata> f = producer.send(record);

返回的是一個Future,有以下處理方式:

  • 不在乎異常 可以不處理
  • 直接get拿到結果 同步的獲取方式
    另外send方法也支持傳入回調的方式進行異步處理

bootstrap.servers

指定了要連接到的broker,可以用,分隔寫入多個,連接broker cluster時不需要寫入全部的broker地址,broker本身會同步其他broker的信息,但最少放兩個以免寫一個對應的broker掛了。
在實際使用中,可能會出現org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.這個錯誤,請求本身是發送成功的,檢查zk topic也建立了,但就是獲取不到metadata。
這是因為填入的broker服務器和kafka在zk上注冊的host不一致導致的(在zk上可以通過get /brokers/ids/{id}查看),可以通過修改kafka的server.propertiesadvertised.host.nameadvertised.port改正。
但是要注意hosts文件會影響,比如配置了自己ip到一個域名上導致失敗,注意代碼在運行時log打出的實際的值。
例如電腦上裝了銀聯控件,結果把localhost映射到了一個奇怪的地址:

例子中已經改為了ip。
另外實驗環境也遷移至了虛擬機上,上一節用bash on windows搭建好的環境不知道為什么無論怎么改都獲取不到metadata

key.serialization

用來指定key的序列化器(下面會提到)


配置Producers

文檔:http://kafka.apache.org/documentation.html#producerconfigs
因為用的是0.9.0.1,一些參數在最新版已經被棄用了,自己使用多注意下。
這些的默認配置在ProducerConfig的static塊中定義:

static {
    CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                            .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                            .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                            .define(ACKS_CONFIG,
                                    Type.STRING,
                                    "1",
                                    in("all", "-1", "0", "1"),
                                    Importance.HIGH,
                                    ACKS_DOC)
    ... ...

這里列舉一下比較重要的參數:

ACKS

指定一次寫成功需要多少個分區分片必須收到record

  • 0
    不會等待broker的返回 不會知道是否成功 高吞吐

  • 1
    接受leader的 比0安全 但也會有還來不及復制leader就掛掉等導致失敗的情況

  • all
    所有的復本都接收到消息並回應 最安全的模式 但相對的延遲就比較慘了

默認值為1

BUFFER.MEMORY

緩存等待發送的消息
如果超過了 會在拋錯前等待max.block.ms
默認32M

COMPRESSION.TYPE

默認是none 不壓縮
可以是snappy gzip lz4
snappy用較少的cpu完成還行的壓縮 性能和網絡帶寬的權衡
gzip是使用更多的cpu和時間但是壓縮更好 網絡帶寬是主要限制時使用

RETRIES

重試次數 默認是0
默認間隔100ms (通過retry.backoff.ms控制)
不是所有的錯誤都會重試(RetriableException)
不要在業務里做重試

BATCH.SIZE

多個record被發送到同一個分區的時候 producer會進行批處理
內存 默認16k
滿了就發 但不意味着只有滿了才會發(見下一個參數)
設太大不會導致他的延遲 只不過會用更多的內存
設太小就慘了 會導致過於頻繁

LINGER.MS

在發送當前batch時等多久額外的信息
默認是0
producer會在batch滿了或者這個時間到了發送 默認是0就意味着即時發送
大於0會導致延遲 但是增加吞吐量

CLIENT.ID

任何字符串 自己定義 用來打log和metrics

MAX.IN.FLIGHT.REQUESTS.PER.CONNECTION

可以設置可以發送多少個請求不等回應 也就是允許同時有多少條消息在空中飛
默認是5
設大了會增加內存使用 但是提高吞吐
設過大了會導致降低吞吐 因為batch變得低效了
設置成1可以保證消息即使是在重試下也有序(下面順序保證中提到)

TIMEOUT.MS, REQUEST.TIMEOUT.MS, METADATA.FETCH.TIMEOUT.MS

第1和第3在0.11.0.1版本中被移除了。。可以參考舊文檔
第2 從服務端等待多久回應(默認30s)

MAX.BLOCK.MS

在send()和顯式通過partitionsFor()請求元數據中間等待的時間
在send buffer滿了 以及metadata沒返回阻塞

MAX.REQUEST.SIZE

一次請求的數據大小限制(限制最大的消息 以及批處理的大小)
默認1M 意味着可以同時發1000條1K的消息 或者發一條1M的消息
注意上一節broker里也有自己的配置 最好兩個配置相匹配

RECEIVE.BUFFER.BYTES SEND.BUFFER.BYTES

前者默認32K
后者默認128K
TCP發送和接受的buffer 用於socket寫入和讀取數據
設置為-1是用OS默認的設置
不同數據中心間可以適當提高點這個值因為網絡環境會更差

順序保證

kafka保證在一個分區里消息的順序
上述可以發現 如果設置了max.in.flights.request.pre.connection大於1(默認5)
並且retries大於0(默認0)那就完蛋了
可能寫給同一個分區第一條消息寫入失敗 進入retry
但第二條寫入成功
建議設置per.connection為1 但這會嚴重限制吞吐 只用於順序很重要的場合


序列化

自定義可以自己實現org.apache.kafka.common.serialization.Serializer接口。

使用Apache Avro

優點:

  • 語言中立的序列化格式
  • 兼容性

用於寫和讀的schema必須兼容
反序列化器必須能夠訪問用於寫數據的schema

保存每個record的schema開銷很大(通常是兩倍於record)
可以將schema保存在一個注冊器中管理 只需要在消息中寫入對應的id
Schema Registry - 有很多開源的選項 可以使用confluent platform
文檔: http://docs.confluent.io/current/schema-registry/docs/index.html

這個可以單獨記一個筆記,等實踐后再做整理。


分區

如果key為null並且默認的分區器被使用 會用round-robin來平衡
有key 默認的分區器會hash key(自帶的hash算法) 因為要保證同一個key總是寫入同樣的分區 他會使用所有的分區(不管是否可用) 寫到不可用的會報錯
如果分區數改變了 映射關系就會改變 同樣的key在之后可能會到其他分區中
所以預先確定分區的數量在這種i情況下比較重要


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM