bootstrap.servers |
kafka集群地址,ip+端口,以逗號隔開。不管這邊配置的是什么服務器,客戶端會使用所有的服務器。配置的列表只會影響初始發現所有主機。配置的格式應該是:ip:port,ip:port,因為配置的內容只是用於服務集群的初始發現(集群地址可能會變化),配置可以不包含所有的服務器(你可能需要配置多於一個,防止某個服務掛掉) |
list |
|
key.serializer |
實現Serializer接口的序列化類鍵 |
class |
|
value.serializer |
實現Serializer接口的序列化類值 |
class |
|
acks |
生產者認為一個請求完成,所需要kafka集群主服務的應答次數。這個配置控制已發送消息的持久性。下面是這個配置可能的值。acks=0:如果設置為0,生產者不會等待kafka的響應。消息會被立刻加到發送緩沖通道中,並且認為已經發送成功。這種情況下,不能保證kafka接收到了這條消息,retries配置不會生效,每條消息的偏移量都是1;acks=1:這個配置意味着kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機器的成功響應。這種情況下,在寫入日志成功后,集群主機器掛掉,同時從機器還沒來得及寫的話,消息就會丟失掉。acks=all:這個配置意味着leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機器掛掉。這是最強的可用性保證。 |
string |
1 |
buffer.memory |
生產者等待發送到kafka的消息隊列占用內容的大小。如果消息發送的速度比傳輸給kafka快,生產者會在拋出異常后,阻塞max.block.ms的時間。這個配置應該大體與生產者用到的內存差不多,但不全是,因為生產者使用的內存不全部用於消息隊列。還有些內存會被用於壓縮和保持長連接。 |
long |
33554432 |
compression.type |
生產者的數據壓縮類型。默認是不壓縮(no compression)。有效的配置可以是none,gzip,snappy或lz4。壓縮是數據的批量壓縮,所以批量的效果也就是壓縮的比例(壓縮的比例越好,數據量越小)。 |
string |
none |
retries |
配置為大於0的值的話,客戶端會在消息發送失敗時重新發送。重試等同於在發送有異常時重新發送消息。如果不把max.in.flight.requests.per.connection設為1,重試可能會改變消息的順序。兩條消息同時發送到同一個分區,第一條失敗了,並在第二條發送成功后重新發送,那么第二條消息可能在第一條消息前到達。 |
int |
0 |
ssl.key.password |
存在文件中的私鑰密碼,對於生產者來說可選。 |
password |
null |
ssl.keystore.location |
存儲私鑰的文件地址,可以用於不同客戶端的認證。 |
string |
null |
ssl.keystore.password |
私鑰文件存儲密碼。只有當ssl.keystore.location配置了,才有用。 |
password |
null |
ssl.truststore.location |
信任存儲文件路徑。 |
string |
null |
ssl.truststore.password |
信任存儲文件密碼 |
password |
null |
batch.size |
當多條消息需要發送到同一個分區時,生產者會嘗試合並網絡請求。這會提高client和生產者的效率。如果消息體大於這個配置,生產者不會嘗試發送消息。發送給kafka的消息包含不同的批次,每批發送給一個分區。批次大小太小的話可能會降低吞吐量。如果設為0,會禁用批處理功能。如果批次設置很大,可能會有些浪費內存,因為我們會預留這部分內存用於額外的消息。 |
int |
16384 |
client.id |
發送請求給kafka時帶上的生產者標識。目的是為了在ip+端口之外,通過邏輯上的應用名稱跟蹤請求,以便記錄在kafka日志中。 |
string |
"" |
connections.max.idle.ms |
在配置項的時間之后,關閉空閑的鏈接 |
long |
540000 |
linger.ms |
消息延遲發送的毫秒數,目的是為了等待多個消息,在同一批次發送,減少網絡請求。 |
long |
0 |
max.block.ms |
這個配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞時間,當緩沖區空間不夠或者源數據丟失時阻塞 |
int |
60000 |
max.request.size |
生產者一次請求的最大字節數,這也是一次消息體的最大值。注意到kafka集群有自己的消息限制,可能與這個值不一樣。這個配置限制的是生產者一次發送消息的大小,為的是避免發送大的數據量。 |
int |
1048576 |
partitioner.class |
實現Partitioner接口的分區類 |
class |
class org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes |
socket接收緩存空間的大小,讀數據時用 |
int |
32768 |
request.timeout.ms |
生產者發送消息后等待響應的最大時間,如果在配置時間內沒有得到響應,生產者會重試。 |
int |
30000 |
timeout.ms |
kafka集群的leader等待follower響應的超時時間。 |
int |
30000 |