Kafka Java 客戶端開發


依賴包導入

	<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.8.2.1</version>
        </dependency>
        <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.8.2.1</version>
	 </dependency>

producer開發

producer參數說明

  • metadata.broker.list:指定kafka節點列表,用於獲取metadata,不必全部指定.如metadata.broker.list=192.168.1.10:9092,192.168.1.11:9092
  • partitioner.class:指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區。partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
  • compression.codec:是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。compression.codec=none
  • serializer.class:指定序列化處理類,默認為kafka.serializer.DefaultEncoder,即byte[]
  • compressed.topics:如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮
  • request.required.acks:producer接收消息ack的時機,默認為0。0: producer不會等待broker發送ack。1: 當leader接收到消息之后發送ack 。 2: 當所有的follower都同步消息成功后發送ack。
  • request.timeout.ms:在向producer發送ack之前,broker允許等待的最大時間 。如果超時,broker將會向producer發送一個error ACK.意味着上一次消息因為某種原因未能成功(比如follower未能同步成功) 。
  • producer.type:同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量,也意味着消息將會在本地buffer中,並適時批量發送,但是也可能導致丟失未發送過去的消息。
  • queue.buffering.max.ms :在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,默認為5000ms,此值和batch.num.messages協同工作。
  • queue.buffering.max.messages:在async模式下,producer端允許buffer的最大消息量,無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積,此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000。
  • batch.num.messages:如果是異步,指定每次批量發送數據量,默認為200
  • queue.enqueue.timeout.ms:當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后 ,阻塞一定時間后,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息),此時producer可以繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間。-1: 無阻塞超時限制,消息不會被拋棄 ,0:立即清空隊列,消息被拋棄
  • message.send.max.retries:當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數。因為broker並沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失) 有可能導致broker接收到重復的消息,默認值為3。
  • topic.metadata.refresh.interval.ms:producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 ,因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即刷新 ,(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置額外的刷新機制,默認值600000

一個簡單的生產者例子如下:

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import java.util.Properties;

public class SimpleProducer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put("metadata.broker.list", "127.0.0.1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for(int i=0;i<10;i++){
            String ip = "192.168.1." + (i+1);
            String msg = "receive msg from " + ip;
            KeyedMessage<String,String> data = new KeyedMessage<String, String>("simple_test",ip,msg);
            producer.send(data);
        }
        producer.close();
    }
}

指定關鍵字key,發送消息到指定partitions

如果需要實現自定義partitions消息發送,需要實現Partitioner接口

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class MyPartitioner implements Partitioner {

    public MyPartitioner(VerifiableProperties props){

    }

    /**
     * 返回分區索引編號
     * @param key
     * @param numPartitions topic中的分區總數
     * @return
     */
    public int partition(Object key, int numPartitions) {
        String partKey = (String) key;
        if("test1".equals(partKey)){
            return 1;
        }
        return 0;
    }

}

consumer開發

consumer參數說明

  • zookeeper.connect:zookeeper連接服務器地址
  • zookeeper.session.timeout.ms:zookeeper的session過期時間,默認5000ms,用於檢測消費者是否掛掉
  • zookeeper.sync.time.ms:當consumer reblance時,重試失敗時時間間隔
  • group.id:指定消費組
  • auto.commit.enable:當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息 ,注意offset信息並不是每消費一次消息就向zk提交一次,而是現在本地保存(內存),並定期提交,默認為true
  • auto.commit.interval.ms:自動更新時間。默認60 * 1000
  • conusmer.id:當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤消息消費情況,便於觀察
  • client.id:消費者客戶端編號,用於區分不同客戶端,默認客戶端程序自動產生
  • queued.max.message.chunks:最大取多少塊緩存到消費者(默認10)
  • rebalance.max.retries:當有新的consumer加入到group時,將會reblance,此后將會有partitions的消費端遷移到新 的consumer上,如果一個consumer獲得了某個partition的消費權限,那么它將會向zk注冊 "Partition Owner registry"節點信息,但是有可能此時舊的consumer尚沒有釋放此節點, 此值用於控制,注冊節點的重試次數。
  • fetch.min.bytes:獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk每次feth將得到多條消息,此值為總大小,提升此值,將會消耗更多的consumer端內存
  • fetch.wait.max.ms: 當消息的尺寸不足時,server阻塞的時間,如果超時,消息將立即發送給consumer
  • auto.offset.reset:如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、 anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
  • derializer.class:指定序列化處理類,默認為kafka.serializer.DefaultDecoder,即byte[]

多線程並行消費topic

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class SimpleConsumer implements Runnable{

    private KafkaStream stream;
    private int threadNum;

    public SimpleConsumer(KafkaStream stream, int threadNum) {
        this.stream = stream;
        this.threadNum = threadNum;
    }

    public void run() {
        ConsumerIterator<byte[],byte[]> it =  this.stream.iterator();
        while (it.hasNext()){
            System.out.println(new String(it.next().message()));
        }
    }

}
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleConsumerTest {

    private ConsumerConnector connector;
    private String topic;
    private ExecutorService executorService;

    public SimpleConsumerTest(String a_zookeeper, String a_groupId, String a_topic) {
        this.connector = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
        this.topic = a_topic;
    }

    private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect",a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    public void shutdown(){
        if(connector != null){
            connector.shutdown();
        }
        if(executorService != null){
            executorService.shutdown();
        }
    }

    public void start(int threadNum){
        Map<String,Integer> topicCount = new HashMap<String,Integer>();
        topicCount.put(topic,new Integer(threadNum));

        Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = connector.createMessageStreams(topicCount);
        List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);

        executorService = Executors.newFixedThreadPool(threadNum);
        int thread = 1;
        for(KafkaStream<byte[],byte[]> stream : streams){
            executorService.submit(new SimpleConsumer(stream,thread++));
        }
    }

    public static void main(String[] args) throws InterruptedException {
        String zks = "127.0.0.1:2181";
        String gruopId = "fdf";
        String topic = "tets";
        SimpleConsumerTest simpleConsumerTest = new SimpleConsumerTest(zks,gruopId,topic);
        simpleConsumerTest.start(10);
        Thread.sleep(10000);
        simpleConsumerTest.shutdown();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM