上一遍我簡單介紹了kafka的生成者使用,調用方式比較簡單,今天我給大家分享下封裝kafka消費者,作為中間件,我們做的就是最大程度的解耦,使業務方接入我們依賴程度降到最低。
第一步,我們先配置一個消費者核心類
package com.meiren.message.kafka.consumer; import com.meiren.message.kafka.beans.ConsumerProperty; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by zhangwentao on 2017/5/18. */ public class ConsumerHandler { private final KafkaConsumer<String, String> consumer; private ExecutorService executors; public ConsumerHandler(ConsumerProperty consumerProperty, List<String> topics) { Properties props = new Properties(); props.put("bootstrap.servers", consumerProperty.getBrokerList()); props.put("group.id", consumerProperty.getGroupId()); props.put("enable.auto.commit", consumerProperty.getEnableAutoCommit()); props.put("auto.commit.interval.ms", consumerProperty.getAutoCommitInterval()); props.put("session.timeout.ms", consumerProperty.getSessionTimeout()); props.put("key.deserializer", consumerProperty.getKeySerializer()); props.put("value.deserializer", consumerProperty.getValueSerializer()); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topics); } public void execute(int workerNum) { executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy()); Thread t = new Thread(new Runnable(){//啟動一個子線程來監聽kafka消息 public void run(){ while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (final ConsumerRecord record : records) { System.out.println("監聽到kafka消息。。。。。。"); executors.submit(new ConsumerWorker(record)); } } }}); t.start(); } public void shutdown() { if (consumer != null) { consumer.close(); } if (executors != null) { executors.shutdown(); } try { if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("Timeout.... Ignore for this case"); } } catch (InterruptedException ignored) { System.out.println("Other thread interrupted this shutdown, ignore for this case."); Thread.currentThread().interrupt(); } } }
這個核心類有3個部分組成:1.構造方法(生成一個消費者配置,訂閱topic),2.開啟多線程監聽消費者 3.關閉線程是和消費者
public ConsumerHandler(ConsumerProperty consumerProperty, List<String> topics)
consumerProperty是消費者配置信息類,包含了消費者的配置屬性和topic
public class ConsumerProperty { private String brokerList; private String groupId; private String enableAutoCommit="true"; private String autoCommitInterval="1000"; private String sessionTimeout="30000"; private String keySerializer="org.apache.kafka.common.serialization.StringDeserializer"; private String valueSerializer="org.apache.kafka.common.serialization.StringDeserializer"; /** * topic以及消費的實現類 */ private List<MessageContainer> messageContainers;
2.監聽消費者信息
public void execute(int workerNum) { } 這段代碼的入參是線程數,開啟一個線程池ThreadPoolExecutor,建立一個長連接,每200毫秒去kafka服務器拉取消息,每拉到一個消息,就分配給一個線程類ConsumerWorker去處理這個消息
這里要特別注意是,監聽kafka的過程需要另起一個線程去監聽,不然主線程會一直在while(true)里面阻塞掉。
3.關閉線程池和消費者(一般情況下會一直處於監聽狀態)
第二步,我們設置服務啟動的時候去監聽
public class PropertyFactory { public static ProducerProperty producerProperty; public static ConsumerProperty consumerProperty; public ProducerProperty getProducerProperty() { return producerProperty; } public void setProducerProperty(ProducerProperty producerProperty) { this.producerProperty = producerProperty; } public ConsumerProperty getConsumerProperty() { return consumerProperty; } public void setConsumerProperty(ConsumerProperty consumerProperty) { this.consumerProperty = consumerProperty; } ConsumerHandler consumer=null; @PostConstruct public void startListerConsumer(){ consumer= new ConsumerListener(consumerProperty).startListen(); } @PreDestroy public void shutDown(){ if(consumer!=null){ consumer.shutdown(); } } }
這是一個屬性工程的bean,當這個bean被創建完成后,會執行startListerConsumer()方法(@PostConstruct的含義就是在bean被創建之后執行) ,startListerConsumer的作用開啟監聽
ConsumerHandler consumers = new ConsumerHandler( consumerProperty, topics); consumers.execute(workerNum);
另外,我們看到這個beanFactory有2個屬性ProducerProperty 和ConsumerProperty ,這個2個分別是消費者個和生產者的配置,是bean在初始化的時候注入進去的
這里重點介紹一下說ConsumerProperty 的messageContainers屬性,它是一個集合對象,包含需要訂閱的topic和處理該Topic的實現了MessageListener接口的實現類
public class MessageContainer { private String topic; private MessageListener messageHandle; }
public interface MessageListener { public void onMessage(ConsumerMessageBO message); }
上面說到監聽到每個消息都會分配一個ConsumerWorker去處理消息,我們看看具ConsumerWorker的
public class ConsumerWorker implements Runnable { private ConsumerRecord<String, String> consumerRecord; public ConsumerWorker(ConsumerRecord record) { this.consumerRecord = record; } public void run() { ConsumerMessageBO consumerMessageBO= JSONObject.parseObject(consumerRecord.value(),ConsumerMessageBO.class); consumerMessageBO.setOffset(consumerRecord.offset()); consumerMessageBO.setPartition(consumerRecord.partition()); for(MessageContainer messageContainer: PropertyFactory.consumerProperty.getMessageContainers()){ if(consumerRecord.topic().equals(messageContainer.getTopic())){ messageContainer.getMessageHandle().onMessage(consumerMessageBO); } } }
根據監聽到topic,然后和ConsumerProperty 的messageContainers屬性的topic進行比對,找到對應topic處理的實現類調用其onMessage方法
我們JAVA的核心代碼基本已經寫完了
第三步、業務方接入我們封裝的部分
新建一個spring-kafka.xml文件
<bean id="consumerProperty" class="com.meiren.message.kafka.beans.ConsumerProperty"> <property name="brokerList" value="${kafka.bootstrap.servers}"/> <property name="groupId" value="${kafka.group.id}"/> <property name="messageContainers" > <list> <ref bean="smsMessageContainer"></ref> <ref bean="emailMessageContainer"></ref> </list> </property> </bean> <bean id="producerProperty" class="com.meiren.message.kafka.beans.ProducerProperty"> <property name="brokerList" value="${kafka.bootstrap.servers}"/> </bean> <bean id ="emailMessageHandler" class="com.meiren.message.kafka.handle.EmailMessageHandler"/> <bean id ="smsMessageHandler" class="com.meiren.message.kafka.handle.SmsMessageHandler"/> <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer"> <constructor-arg value="sms_async_send"/> <property name="messageHandle" ref="smsMessageHandler"></property> </bean>
<bean id ="emailMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer"> <constructor-arg value="email_async_send"/> <property name="messageHandle" ref="emailMessageHandler"></property> </bean>
<!--配置工廠類 -->
<bean class="com.meiren.message.kafka.beans.PropertyFactory">
<property name="consumerProperty" ref="consumerProperty"/>
<property name="producerProperty" ref="producerProperty"/>
</bean>
</beans>
這個配置文件對應的就是PropertyFactory的屬性,其實就是消費者個和生產者的配置。
我們配置好這個文件后,我們需要一個消息實現類
public class SmsMessageHandler implements MessageListener{ public static final Logger log= LoggerFactory.getLogger(SmsMessageHandler.class); @Autowired private SmsSendLogDao smsSendLogDao; public void onMessage(ConsumerMessageBO consumerMessageBO) { } }catch (Exception e){ System.out.println("轉換消息異常:"+e.getMessage()); } }
只要實現了MessageListener接口,並且在spring-kafka.xm配置好對應的topic就可以了
<bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
<constructor-arg value="sms_async_send"/>
<property name="messageHandle" ref="smsMessageHandler"></property>
</bean>
整個接入就完成了,由於這是第一版本,所以封裝的程度還不算很好,但是也基本符合應用(一個配置文件,一個實現類),有不足之處將會在后面版本進行完善迭代。
至此我們已經將kafka集成spring的功能簡單實現了,下一篇我將介紹消息隊列(kafka)的一些實際應用。