首先是啟動一個生產者
final String kafkazk="localhost:9092";
String topic="testAPI";
Properties properties = new Properties() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkazk);
put(ProducerConfig.ACKS_CONFIG, "all");
put(ProducerConfig.RETRIES_CONFIG, 0);
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
put(ProducerConfig.LINGER_MS_CONFIG, 1);
put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
}};
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for(int i=0;i<10000000;i++){
producer.send(new ProducerRecord<>(topic, UUID.randomUUID().toString(), String.valueOf(i)));
}
}
啟動配置參數如下所示:
2018-06-19 17:34:23,636 - ProducerConfig values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
acks參數:
在考慮請求完成之前,生產者要求leader收到的確認數量,這將控制發送的記錄的持久性。
acks=0如果設置為零,則生產者不會等待來自服務器的任何確認。該記錄將被立即添加到套接字緩沖區並被視為已發送。在這種情況下,retries不能保證服務器已經收到記錄,並且配置不會生效(因為客戶端通常不會知道任何故障)。為每個記錄返回的偏移量將始終設置為-1。
acks=1這意味着領導者會將記錄寫入其本地日志中,但會在未等待所有追隨者完全確認的情況下作出響應。在這種情況下,如果領導者在承認記錄后但在追隨者復制之前立即失敗,那么記錄將會丟失。
acks=all這意味着領導者將等待全套的同步副本確認記錄。這保證只要至少有一個同步副本保持活動狀態,記錄就不會丟失。這是最強有力的保證。這相當於acks = -1設置。
batch.size
只要有多個記錄被發送到同一個分區,生產者就會嘗試將記錄一起分成更少的請求。這有助於客戶端和服務器的性能。該配置以字節為單位控制默認的批量大小。不會嘗試批量大於此大小的記錄。發送給brokers的請求將包含多個批次,每個分區有一個可用於發送數據的分區。
小批量大小將使批次不太常見,並可能降低吞吐量(批量大小為零將完全禁用批次)。一個非常大的批量大小可能會更浪費一點使用內存,因為我們將始終為預期的額外記錄分配指定批量大小的緩沖區。
block.on.buffer.full
當我們的內存緩沖區耗盡時,我們必須停止接受新記錄(塊)或拋出錯誤。默認情況下,此設置為false,生產者不再拋出BufferExhaustException,而是使用該max.block.ms值來阻塞,之后將拋出TimeoutException。將此屬性設置為true會將其設置max.block.ms為Long.MAX_VALUE。此外,如果此屬性設置為true,則參數metadata.fetch.timeout.ms不再有效。
此參數已被棄用,並將在未來版本中刪除,應該使用參數max.block.ms。
bootstrap.servers
用於建立到Kafka集群的初始連接的主機/端口對列表。客戶端將使用所有服務器,而不管在此指定哪些服務器用於引導 - 此列表僅影響用於發現全套服務器的初始主機。該列表應該以表格的形式出現host1:port1,host2:port2,...。由於這些服務器僅用於初始連接以發現完整的群集成員資格(可能會動態更改),因此此列表不需要包含全套服務器(但您可能需要多個服務器) 。
buffer.memory
生產者可用於緩沖等待發送到服務器的記錄的總內存字節數。如果記錄的發送速度比發送到服務器的速度快,那么生產者將會阻止max.block.ms它,然后它會拋出異常。
這個設置應該大致對應於生產者將使用的總內存,但不是硬性限制,因為不是所有生產者使用的內存都用於緩沖。一些額外的內存將用於壓縮(如果啟用了壓縮功能)以及維護正在進行的請求。
client.id
發出請求時傳遞給服務器的id字符串。這樣做的目的是通過允許將邏輯應用程序名稱包含在服務器端請求日志中,從而能夠跟蹤ip / port之外的請求源,如果不手動指定,代碼中會自動生成一個id。
compression.type
指定給定主題的最終壓縮類型。該配置接受標准壓縮編解碼器('gzip','snappy','lz4')。它另外接受'未壓縮',這相當於沒有壓縮,這意味着保留制片人設置的原始壓縮編解碼器,也可以修改源碼,自定義壓縮類型。
connections.max.idle.ms
在此配置指定的毫秒數后關閉空閑連接。
interceptor.classes
用作攔截器的類的列表。通過實現ProducerInterceptor接口,您可以在生產者發布到Kafka集群之前攔截(並可能會改變)生產者收到的記錄。默認情況下,沒有攔截器,可自定義攔截器。
key.serializer
用於實現Serializer接口的密鑰的串行器類。
linger.ms
生產者將在請求傳輸之間到達的任何記錄歸入單個批處理請求。通常情況下,這只會在記錄到達速度快於發送時才發生。但是,在某些情況下,即使在中等負載下,客戶端也可能希望減少請求的數量。此設置通過添加少量人工延遲來實現此目的 - 即不是立即發送記錄,而是生產者將等待達到給定延遲以允許發送其他記錄,以便發送可以一起批量發送。這可以被認為與TCP中的Nagle算法類似。這個設置給出了批量延遲的上限:一旦我們得到batch.size值得記錄的分區,它將被立即發送而不管這個設置如何,但是如果我們為這個分區累積的字節數少於這個數字,我們將在指定的時間內“等待”,等待更多的記錄出現。該設置默認為0(即無延遲)。linger.ms=5例如,設置可以減少發送請求的數量,但會對在無效負載中發送的記錄添加高達5毫秒的延遲。
max.block.ms
配置控制了KafkaProducer.send()並將KafkaProducer.partitionsFor()被阻塞多長時間。由於緩沖區已滿或元數據不可用,這些方法可能會被阻塞止。用戶提供的序列化程序或分區程序中的阻塞將不計入此超時。
max.in.flight.requests.per.connection
在阻塞之前,客戶端將在單個連接上發送的最大數量的未確認請求。請注意,如果此設置設置為大於1並且發送失敗,則由於重試(即,如果重試已啟用),可能會重新排序消息。
max.request.size
請求的最大大小(以字節為單位)。這實際上也是最大記錄大小的上限。請注意,服務器在記錄大小上有自己的上限,這可能與此不同。此設置將限制生產者在單個請求中發送的記錄批次數,以避免發送大量請求。
metadata.fetch.timeout.ms
首次將數據發送到主題時,我們必須獲取有關該主題的元數據,以了解哪些服務器托管主題的分區。此配置指定在將異常拋回到客戶端之前,此獲取成功的最長時間(以毫秒為單位)。
metadata.max.age.ms
以毫秒為單位的時間段之后,即使我們沒有看到任何分區領導改變以主動發現任何新代理或分區,我們強制更新元數據。
metric*
以metric開頭的是指標相關的,后面會討論。
partitioner.class
實現Partitioner接口的分區器類。默認使用DefaultPartitioner來進行分區。
receive.buffer.bytes
讀取數據時使用的TCP接收緩沖區(SO_RCVBUF)的大小。如果該值為-1,則將使用操作系統默認值。
reconnect.backoff.ms
嘗試重新連接到給定主機之前等待的時間量。這避免了在緊密循環中重復連接到主機。該退避適用於消費者發送給代理的所有請求。
request.timeout.ms
該配置控制客戶端等待請求響應的最長時間。如果在超時過去之前未收到響應,客戶端將在必要時重新發送請求,或者如果重試耗盡,請求失敗。
retries
生產者發送失敗后的重試次數,默認0
retry.backoff.ms
嘗試重試對給定主題分區的失敗請求之前等待的時間量。這可以避免在某些故障情況下重復發送請求。
sasl與ssl
與kafka安全相關