Kafka原理與java simple producer示例


brokers和消費者使用zk來獲取狀態信息和追蹤消息坐標。
每一個partition是一個有序的,不可變的消息序列。
只有當partition里面的file置換到磁盤文件以后,才開放給消費者來消費。
每一個partition是跨服務器地被復制到其他地方,為了容錯的目的。
這個partition可以理解為hadoop中block的單位。
但是只有被選擇為leader的服務器partition來服務消費者的讀和生產者的寫,
followers只是把數據同步過去。同步狀態較好的被列入ISR,這些ISR和leader
信息都保存在zk中,當leader狀態異常,ISR中的某一個Follower變成新的leader.
在整個kafka集群中,每一個服務器扮演一個雙重角色,它可能是某個top的leader partition,
也同時可以是另一個topic的follower partition.這確保了集群的負載均衡。

每一個消費者代表一個進程,多個消費者組成一個消費者組。
一個topic中的一條消息只能被一個消費者組中的某一個消費者消費,如果需要被多個消費者消費,則這些消費者需要在不同的消費者組中。
原因可能是以消費者組的單位在zk中保持partition的offset.

kafka的設計中,broker是無狀態的,這意味着它並不負責管理哪些消費者消費了哪些partition中的消息到什么位置,甚至誰消費的都不理會。
對於消息保持策略,kafka采用了基於時間的SLA,一個消息將會被自動刪除當它達到了這個SLA.

kafka的復制策略有兩種,同步和異步,同步會在lead replica和follower都完成消息的存儲后才給producer發確認信息。
異步同步,只要lead replica收到了信息,就給producer發確認信息,如果這個時候lead replica的broker出問題,就會有風險。

生產者
kafka的message api for producer
從前面分析得知,數據被封裝成消息,如何發送給kafka呢?首先需要獲取這個topic的 lead partition。
消息可以一條一條發送,也可以批量壓縮異步發送。即攢到一定的數量或一定的時間再發送。
Producer:Kafka provides    the kafka.javaapi.producer.Producer class (classProducer<K,V>)。默認的分區策略是對key進行hash.

import    java.util.Date;
import    java.util.Properties;
import    kafka.javaapi.producer.Producer;
import    kafka.producer.KeyedMessage;
import    kafka.producer.ProducerConfig;
public    class    SimpleProducer    {
    private    static    Producer<String,    String>    producer;
    public    SimpleProducer()    {
        Properties    props    =    new    Properties();
//    Set    the    broker    list    for    requesting    metadata    to    find    the    lead    broker
        props.put("metadata.broker.list",
                "192.168.146.132:9092,    192.168.146.132:9093, 192.168.146.132:9094");
//This    specifies    the    serializer    class    for    keys
        props.put("serializer.class",    "kafka.serializer.StringEncoder");
//    1    means    the    producer    receives    an    acknowledgment    once    the    lead replica
//    has    received    the    data.    This    option    provides    better    durability    as    the
//    client    waits    until    the    server    acknowledges    the    request    as successful.
        props.put("request.required.acks",    "1");
        ProducerConfig    config    =    new    ProducerConfig(props);
        producer    =    new    Producer<String,    String>(config);
    }
    public    static    void    main(String[]    args)    {
        int    argsCount    =    args.length;
        if    (argsCount    ==    0    ||    argsCount    ==    1)
            throw    new    IllegalArgumentException(
                    "Please    provide    topic    name    and    Message    count    as    arguments"); 

        String    topic    =    (String)    args[0];
        String    count    =    (String)    args[1];
        int    messageCount    =    Integer.parseInt(count);
        System.out.println("Topic    Name    -    "    +    topic);
        System.out.println("Message    Count    -    "    +    messageCount);
        SimpleProducer    simpleProducer    =    new    SimpleProducer();
        simpleProducer.publishMessage(topic,    messageCount);
    }
    private    void    publishMessage(String    topic,    int    messageCount)    {
        for    (int    mCount    =    0;    mCount    <    messageCount;    mCount++)    {
            String    runtime    =    new    Date().toString();
            String    msg    =    "Message    Publishing    Time    -    "    +    runtime;
            System.out.println(msg);
//    Creates    a    KeyedMessage    instance
            KeyedMessage<String,    String>    data    =
                    new    KeyedMessage<String,    String>(topic,    msg);
//    Publish    the    message
            producer.send(data);
        }
//    Close    producer    connection    with    broker.
        producer.close();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM