Kafka中Producer端封裝自定義消息


我們知道KeywordMessage就是被kafka發送和存儲的對象。所以只需要模擬出這個就可以發送自定義消息了。

比如我需要將用戶的id,user,age,address和訪問ip和訪問date記錄為一個消息。我就自定義一個消息格式(id-user-age-address-ip-date)。

我立馬想到自己定義個javaBean.寫一個UserInfo類偽代碼。

class UserInfo(){

    id;

   user;

   age;

   address;

   ip;

   date;

   toString(){

    return this.getId()+"-"+this.getUser()+"-"+"..."+this.getDate(); 

}

}

你以為這樣就可以了嗎?當然不行啊!

還要按照kafka的消息類型進行封裝,在這里我們只需要實現Encoder類即可:繼續看看代碼就好;

public class KeywordMessage implements kafka.serializer.Encoder<UserInfo>{
     
    public static final Logger LOG=LoggerFactory.getLogger(UserInfo.class);
     
    @Override
    public Message toMessage(Keyword words) {
        LOG.info("start in encoding...");
        return new Message(words.toString().getBytes());
    }
}
這樣KeywordMessage就是一個可以被kafka發送和存儲的對象了。
 
我們再看看producer,producer數據的推送到broker的,所以發起者還是業務系統,下面的代碼就能直接發送一次數據。
/**配置producer必要的參數*/
Properties props = new Properties();
必要的一些配置省略。。。
/**選擇用哪個類來進行序列化,就是我們自定義的消息類*/
props.put("serializer.class", "org.kafka.message.UserInfo");
ProducerConfig config=new ProducerConfig(props);
/**構造測試數據*/
UserInfo userInfo = new UserInfo();
userInfo.setId(1);
userInfo.setUser("xiaoming");
 ...
List<UserInfo> msg=new ArrayList<UserInfo>();
msg.add(userInfo);
/**構造數據發送對象*/
Producer<String, UserInfo> producer=new Producer<String, UserInfo>(config);      
ProducerData<String,UserInfo> data=new ProducerData<String, UserInfo>("test", msg);
producer.send(data);
 
以上就是自定義封裝消息內容。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM