我們知道KeywordMessage就是被kafka發送和存儲的對象。所以只需要模擬出這個就可以發送自定義消息了。
比如我需要將用戶的id,user,age,address和訪問ip和訪問date記錄為一個消息。我就自定義一個消息格式(id-user-age-address-ip-date)。
我立馬想到自己定義個javaBean.寫一個UserInfo類偽代碼。
class UserInfo(){
id;
user;
age;
address;
ip;
date;
toString(){
return this.getId()+"-"+this.getUser()+"-"+"..."+this.getDate();
}
}
你以為這樣就可以了嗎?當然不行啊!
還要按照kafka的消息類型進行封裝,在這里我們只需要實現Encoder類即可:繼續看看代碼就好;
public class KeywordMessage implements kafka.serializer.Encoder<UserInfo>{
public static final Logger LOG=LoggerFactory.getLogger(UserInfo.class);
@Override
public Message toMessage(Keyword words) {
LOG.info("start in encoding...");
return new Message(words.toString().getBytes());
}
}
這樣KeywordMessage就是一個可以被kafka發送和存儲的對象了。
我們再看看producer,producer數據的推送到broker的,所以發起者還是業務系統,下面的代碼就能直接發送一次數據。
/**配置producer必要的參數*/
Properties props = new Properties();
必要的一些配置省略。。。
/**選擇用哪個類來進行序列化,就是我們自定義的消息類*/
props.put("serializer.class", "org.kafka.message.UserInfo");
ProducerConfig config=new ProducerConfig(props);
/**構造測試數據*/
UserInfo userInfo = new UserInfo();
userInfo
.setId(1);
userInfo.setUser("xiaoming");
...
List<UserInfo> msg=new ArrayList<UserInfo>();
msg.add(userInfo);
/**構造數據發送對象*/
Producer<String, UserInfo> producer=new Producer<String, UserInfo>(config);
ProducerData<String,UserInfo> data=new ProducerData<String, UserInfo>("test", msg);
producer.send(data);
以上就是自定義封裝消息內容。
