一、Kafka設計原理參考:
http://blog.csdn.net/suifeng3051/article/details/48053965?locationNum=2
http://www.cnblogs.com/metoy/p/4452124.html
二、常見問題,以及解決方法:
http://www.tuicool.com/articles/FNbQbeq
三、Kafka官方指導文檔(Producer):
http://kafka.apache.org/documentation.html#producerconfigs
Producer主要配置:
bootstrap.servers host1:port1,host2:port2,...
.
(host配置的是主機名而不是ip時,出現無法連接:Exception in thread "main" org.apache.kafka.common.config.ConfigException: DNS resolution failed for url in bootstrap.servers
修改host的配置,把它改成ip。
修改消費者所在機器的hosts文件,加入主機名與ip的映射。
使用DNS(推薦)
)
key.serializer Serializer class for key that implements theSerializer
interface. 例如:"org.apache.kafka.common.serialization.StringSerializer"
value.serializer Serializer class for value that implements theSerializer
interface.例如:"org.apache.kafka.common.serialization.StringSerializer"
四、KafkaProducer的API文檔:
1.生產者不用連接zookeeper
2.簡單的producer往Kafka 集群 broker里面放消息的代碼如下;如需從大文件讀取數據,產生message,需要考慮大文件讀取,不能把整個文件一次性全部讀入內存。(http://www.importnew.com/14512.html Java讀取大文件)
3.最簡單地配置以上3項即可,從官方文檔可以看出,這三項沒有默認值,必須配置。其他的有默認值。
A Kafka client that publishes records to the Kafka cluster. Kafka生產者負責向Kafka集群發送消息。
生產者是線程安全的,多個生產者線程共享一個生產者實例。(這樣比多個生產者實例更快)(線程安全相關內容可參考:http://polaris.blog.51cto.com/1146394/382161)
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.
生產者擁有一個緩存區,用來暫存還沒被推送到集群的記錄,同時會有一個后台I/O線程負責把這些記錄發送到集群。如果關閉生產者失敗,將會造成緩存區內存泄露,
The send()
method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
send方法是異步的。當調用send方法時,它向緩存區中添加記錄,它會立即返回。這樣允許生產者高效地把相互獨立的記錄按批處理。
Producer的send方法是異步,返回一個Future對象
Future<RecordMetadata> send(ProducerRecord<K,V> record)
//Asynchronously send a record to a topic.
Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) //Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.
The acks
config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
acks設置決定請求被認定為處理完成的標准。當我們設置acks為“all”,將會阻塞所有記錄的提交,這是一種速度最慢但是可靠性最高的設置。
If the request fails, the producer can automatically retry, though since we have specified retries
as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).
如果請求失敗,生產者可以自動地重跑,但是我們設置retries為0的話就不會重跑。retries的設置也關系到能否使用副本功能。
The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size
config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).
生產者緩存了每個分區下未被發送的記錄。緩存大小可以通過batch.size配置。增大緩存可以增大批處理量,但是會占用更多內存(由於通常情況下,會為每一分區產生一個批,即:同一分區(目錄)的records才會放到一個batch里面)。
By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms
to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0
so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.
默認情況下,緩存記錄可以被立即發送,即使它被加入沒有使用的緩存區。然而如果你想要減少請求次數,你可以設置linger.ms為大於0的值。它將使得生產者等待指定毫秒數后再發送一個請求,以便等待更多的記錄到達填充到一批數據里面。 這類似於TCP中的Nagel(納格爾)算法。例如,在以上代碼段中,100條記錄將在一次請求中發送,當我們設置了linger時間為1毫秒。然而這樣設置會導致請求增加1毫秒的延時,為了更多的記錄到達,在緩沖區沒有填滿的情況下。請注意,記錄近乎同時達到的,將放在同一批,即使linger.ms設置為0,在這種過負載情況下將會略linger設置。然而大多數情況下,設置linger.ms當在非極限負載的情況下,犧牲一點點時延,可以使請求發送數據變少,請求更高效。
The buffer.memory
controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms
after which it throws a TimeoutException.
buffer.memory設置決定了Producer緩存區整個可用的內存。如果記錄記錄發送速度總是比推送到集群速度快,那么緩存區將被耗盡。當緩存區資源耗盡,消息發送send方法調用將被阻塞,阻塞的門限時間由max.block.ms設定,阻塞超過限定時間會拋出TimeoutException異常。
緩存區大小(buffer.memory):producer可用的緩存字節數(緩存的是待發送給集群的記錄),如果records發送速度比推送到集群的速度快,producer將
阻塞max.block.ms 毫秒,超過該阻塞時間,仍然存在records send速度過快,將拋出異常。
這個配置項可以設置成與producer用到的總內存大小相當,但是不能設置成物理內存最大值,
因為producer用到的內存還有一部分不用於緩存。例如有些內存會用於壓縮(如果配置了允許壓縮)、維護正在處理的請求。
buffer.memory The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent
faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the
producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as
for maintaining in-flight requests.
默認值:long 33554432 [0,...] high
The key.serializer
and value.serializer
instruct how to turn the key and value objects the user provides with their ProducerRecord
into bytes. You can use the included ByteArraySerializer
or StringSerializer
for simple string or byte types.
key.serializer和value.serializer設置了如何把用戶提供的key和value對象序列化成字節。你可以用ByteArraySerializer
或 StringSerializer 對簡單的 字符串對象或者字節數組對象 進行序列化。
(感覺Apache的Kafka官方文檔比API文檔說明的更詳細一點)
批大小(batch.size):producer將會嘗試把發送到相同partition的records放到一起成批地發送,從而減少requests個數。
這樣有助於同時提高client和server的性能,這個配置項控制默認的批大小(以字節為單位)。
不能使得一批記錄的大小比這個設置大。
發送給brokers的請求會包含多個批,每次發送一批數據。
較小的批可以使得分批操作更少,這樣會降低吞吐量(批大小為0的將導致批處理完成不可用)。較大的批將會導致內存浪費,
因為我們一般都會為額外的記錄預先分配緩存給指定批大小
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the
same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.
Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput
(a batch size of zero will disable batching entirely). A very large batch size may use memory
a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation
of additional records.
max.block.ms 決定send和partitionsFor方法被阻塞的時間。當緩存區滿了或者元數據不可用的時間將產生阻塞。
用戶提供的序列化或者分區者不會不利於這個時延。
The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods
can be blocked either because the buffer is full or metadata unavailable.
Blocking in the user-supplied serializers or partitioner will not
be counted against this timeout.
min.insync.replicas:When a producer sets acks to "all" (or "-1"),
當producer的ack設置為all(或者-1)時,
min.insync.replicas specifies the minimum number of replicas
min.insync.replicas指定了最小的副本數,
that must acknowledge a write for the write to be considered successful.
If this minimum cannot be met, then the producer will raise an exception(either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
如果最小應答數不能達到配置要求,則生產者會拋出一個異常(NotEnoughReplicas或NotEnoughReplicasAterAppend異常)
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees.
當min.insync.replicas和acks一起使用的時候,可以讓系統具有更強的穩定性保證。
A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2,
一個典型的配置是:一個topic配置副本因數為3,min.insync.replicas=2,acks=all,這樣可以讓生產者在多數副本沒有接收到寫應答的時候拋出一個異常。
and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
官方API文檔:http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
五、查看topic是否發送成功
調用send方法后,可以上Kafka集群環境(ssh連接過去即可)含有kafka腳本的目錄下,查看topic情況:
查看所有topic: ./kafka-topics.sh --list --zookeeper host1:port,host2:port
查看指定topic: ./kafka-topics.sh --describe --zookeeper localhost:port --topic kafka_topic
topic消息在磁盤中分布情況: du -sh /data*/zdh/kafka/data/kafka_topic*
通過消費者腳本,消費所有消息,可以看到消息內容:
./kafka-console-consumer.sh --zookeeper host:2181 --topic kafka_topic --from beginning (--from beginning 是從頭開始消費,不加則是消費當前正在發送到該topic的消息)