JAVA封裝消息中間件調用一(kafka生產者篇)


  這段時間因為工作關系一直在忙於消息中間件的發開,現在趁着項目收尾階段分享下對kafka的一些使用心得。

  kafka的原理我這里就不做介紹了,可參考http://orchome.com/kafka/index 這里我重點給大家介紹下kafka生產者的使用

  kafka可分為新舊版本,舊版本(0.8Scala版本)我們不去研究,新版本(0.9和0.10)增加了異步發送的API

  示例代碼如下

  pom.xml增加依賴

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.10.1.1</version>
        </dependency>

JAVA發送方法:

Properties props = new Properties();
 props.put("bootstrap.servers", bootstrap.servers);
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 0);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 Producer<String,String> producer = new KafkaProducer<String,String>(props);
ProducerRecord<String,String> record = new ProducerRecord<String,String>(message.getTopic().getName(), message.getMessageId(), JSONObject.toJSONString(message));
RecordMetadata recordMetadata = producer.send(record).get();

ack是判別請求是否為完整的條件(就是是判斷是不是成功發送了)。我們指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的。

retries,如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重復消息的可能性。

producer(生產者)緩存每個分區未發送消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。並需要更多的內存(因為每個“活躍”的分區都有1個緩沖區)。

默認緩沖可立即發送,即遍緩沖空間還沒有滿,但是,如果你想減少請求的數量,可以設置linger.ms大於0。這將指示生產者發送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似於TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,因為我們設置了linger(逗留)時間為1毫秒,然后,如果我們沒有填滿緩沖區,這個設置將增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處於高負載的情況下,如果設置比0大,以少量的延遲代價換取更少的,更有效的請求。

buffer.memory 控制生產者可用的緩存總量,如果消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用將被阻塞,阻塞時間的閾值通過max.block.ms設定,之后它將拋出一個TimeoutException。

key.serializervalue.serializer示例,將用戶提供的key和value對象ProducerRecord轉換成字節,你可以使用附帶的ByteArraySerializaerStringSerializer處理簡單的string或byte類型。



ProducerRecord介紹:topic【消息主題】 key【消息的key值,通常用於消息的分區】 value【消息體】
    /**
     * Create a record to be sent to Kafka
     * 
     * @param topic The topic the record will be appended to
        
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value);
    }    
 producer.send發送方式為異步發送,添加消息到緩沖區等待發送,並立即返回。生產者將單個的消息批量在一起發送來提高效率。 由於send調用是異步的,它將為分配消息的此消息的RecordMetadata返回一個Future。如果future調用get(),則將阻塞,直到相關請求完成並返回該消息的metadata,或拋出發送異常

  完全無阻塞的話,可以利用回調參數提供的請求完成時將調用的回調通知。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                         
                       System.out.println("獲取消息發送結果");
                   }
               });

 總體來說,新版API生產者發送方式比較簡單,這里我也不多做描述。重點在與消費者的實現,我將會在下一篇給大家詳細介紹

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM