JAVA封裝消息中間件調用二(kafka消費者篇)


  上一遍我簡單介紹了kafka的生成者使用,調用方式比較簡單,今天我給大家分享下封裝kafka消費者,作為中間件,我們做的就是最大程度的解耦,使業務方接入我們依賴程度降到最低。

  第一步,我們先配置一個消費者核心類

  

package com.meiren.message.kafka.consumer;

import com.meiren.message.kafka.beans.ConsumerProperty;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by zhangwentao on 2017/5/18.
 */
public class ConsumerHandler {
    private final KafkaConsumer<String, String> consumer;
    private ExecutorService executors;

    public ConsumerHandler(ConsumerProperty  consumerProperty, List<String> topics) {
        Properties props = new Properties();
        props.put("bootstrap.servers", consumerProperty.getBrokerList());
        props.put("group.id", consumerProperty.getGroupId());
        props.put("enable.auto.commit", consumerProperty.getEnableAutoCommit());
        props.put("auto.commit.interval.ms", consumerProperty.getAutoCommitInterval());
        props.put("session.timeout.ms", consumerProperty.getSessionTimeout());
        props.put("key.deserializer", consumerProperty.getKeySerializer());
        props.put("value.deserializer", consumerProperty.getValueSerializer());
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topics);
    }

    public void execute(int workerNum) {
        executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        Thread t = new Thread(new Runnable(){//啟動一個子線程來監聽kafka消息
            public void run(){
       while (true) {
        ConsumerRecords<String, String> records = consumer.poll(200);
        for (final ConsumerRecord record : records) {
            System.out.println("監聽到kafka消息。。。。。。");
            executors.submit(new ConsumerWorker(record));
        }
      }
            }});
        t.start();

    }

    public void shutdown() {
        if (consumer != null) {
            consumer.close();
        }
        if (executors != null) {
            executors.shutdown();
        }
        try {
            if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("Timeout.... Ignore for this case");
            }
        } catch (InterruptedException ignored) {
            System.out.println("Other thread interrupted this shutdown, ignore for this case.");
            Thread.currentThread().interrupt();
        }
    }
}

這個核心類有3個部分組成:1.構造方法(生成一個消費者配置,訂閱topic),2.開啟多線程監聽消費者  3.關閉線程是和消費者

 public ConsumerHandler(ConsumerProperty  consumerProperty, List<String> topics) 
consumerProperty是消費者配置信息類,包含了消費者的配置屬性和topic
public class ConsumerProperty {

    private String brokerList;

    private String groupId;

    private  String enableAutoCommit="true";

    private String autoCommitInterval="1000";

    private String sessionTimeout="30000";

    private String keySerializer="org.apache.kafka.common.serialization.StringDeserializer";

    private String valueSerializer="org.apache.kafka.common.serialization.StringDeserializer";
    /**
     * topic以及消費的實現類
     */
    private List<MessageContainer> messageContainers;

  2.監聽消費者信息

 public void execute(int workerNum) { } 這段代碼的入參是線程數,開啟一個線程池ThreadPoolExecutor,建立一個長連接,每200毫秒去kafka服務器拉取消息,每拉到一個消息,就分配給一個線程類ConsumerWorker去處理這個消息
這里要特別注意是,監聽kafka的過程需要另起一個線程去監聽,不然主線程會一直在while(true)里面阻塞掉。

3.關閉線程池和消費者(一般情況下會一直處於監聽狀態)
第二步,我們設置服務啟動的時候去監聽
public class PropertyFactory {

    public  static ProducerProperty producerProperty;

    public  static ConsumerProperty consumerProperty;


    public ProducerProperty getProducerProperty() {
        return producerProperty;
    }

    public void setProducerProperty(ProducerProperty producerProperty) {
        this.producerProperty = producerProperty;
    }

    public ConsumerProperty getConsumerProperty() {
        return consumerProperty;
    }

    public void setConsumerProperty(ConsumerProperty consumerProperty) {
        this.consumerProperty = consumerProperty;
    }


    ConsumerHandler consumer=null;

    @PostConstruct
    public  void startListerConsumer(){
        consumer= new ConsumerListener(consumerProperty).startListen();
    }

    @PreDestroy
    public void shutDown(){
    if(consumer!=null){
        consumer.shutdown();
    }
    }
}

這是一個屬性工程的bean,當這個bean被創建完成后,會執行startListerConsumer()方法(@PostConstruct的含義就是在bean被創建之后執行)  ,startListerConsumer的作用開啟監聽

ConsumerHandler  consumers = new ConsumerHandler( consumerProperty, topics);
        consumers.execute(workerNum);

另外,我們看到這個beanFactory有2個屬性ProducerProperty 和ConsumerProperty ,這個2個分別是消費者個和生產者的配置,是bean在初始化的時候注入進去的

這里重點介紹一下說ConsumerProperty 的messageContainers屬性,它是一個集合對象,包含需要訂閱的topic和處理該Topic的實現了MessageListener接口的實現類

public class MessageContainer {
    private String topic;

    private MessageListener messageHandle;

 
}
public interface MessageListener {

    public void onMessage(ConsumerMessageBO message);
}

上面說到監聽到每個消息都會分配一個
ConsumerWorker去處理消息,我們看看具ConsumerWorker的
public class ConsumerWorker implements Runnable {

         private ConsumerRecord<String, String> consumerRecord;

            public ConsumerWorker(ConsumerRecord record) {
               this.consumerRecord = record;
           }

    public void run() {
        ConsumerMessageBO  consumerMessageBO= JSONObject.parseObject(consumerRecord.value(),ConsumerMessageBO.class);
        consumerMessageBO.setOffset(consumerRecord.offset());
        consumerMessageBO.setPartition(consumerRecord.partition());
        for(MessageContainer messageContainer: PropertyFactory.consumerProperty.getMessageContainers()){
            if(consumerRecord.topic().equals(messageContainer.getTopic())){
                messageContainer.getMessageHandle().onMessage(consumerMessageBO);
            }
        }

    }

根據監聽到topic,然后和ConsumerProperty 的messageContainers屬性的topic進行比對,找到對應topic處理的實現類調用其onMessage方法

我們JAVA的核心代碼基本已經寫完了

第三步、業務方接入我們封裝的部分

新建一個spring-kafka.xml文件


    <bean id="consumerProperty" class="com.meiren.message.kafka.beans.ConsumerProperty">
        <property name="brokerList" value="${kafka.bootstrap.servers}"/>
        <property name="groupId" value="${kafka.group.id}"/>
        <property name="messageContainers" >
            <list>
                <ref bean="smsMessageContainer"></ref>
                <ref bean="emailMessageContainer"></ref>
            </list>
        </property>
    </bean>
    <bean id="producerProperty" class="com.meiren.message.kafka.beans.ProducerProperty">
        <property name="brokerList" value="${kafka.bootstrap.servers}"/>
    </bean>

    <bean id ="emailMessageHandler" class="com.meiren.message.kafka.handle.EmailMessageHandler"/>
    <bean id ="smsMessageHandler" class="com.meiren.message.kafka.handle.SmsMessageHandler"/>

    <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
        <constructor-arg value="sms_async_send"/>
        <property name="messageHandle" ref="smsMessageHandler"></property>
    </bean>
<bean id ="emailMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer"> <constructor-arg value="email_async_send"/> <property name="messageHandle" ref="emailMessageHandler"></property> </bean>   
  <!--配置工廠類 -->
 
<bean class="com.meiren.message.kafka.beans.PropertyFactory">
    <property name="consumerProperty" ref="consumerProperty"/>
     <property name="producerProperty" ref="producerProperty"/>
  </bean>
</beans>

這個配置文件對應的就是PropertyFactory的屬性,其實就是消費者個和生產者的配置。

我們配置好這個文件后,我們需要一個消息實現類

public class SmsMessageHandler  implements MessageListener{
    public  static final Logger log= LoggerFactory.getLogger(SmsMessageHandler.class);
    @Autowired
    private SmsSendLogDao  smsSendLogDao;
    public void onMessage(ConsumerMessageBO consumerMessageBO) {


            }

        }catch (Exception e){
            System.out.println("轉換消息異常:"+e.getMessage());
        }

    }

只要實現了MessageListener接口,並且在spring-kafka.xm配置好對應的topic就可以了

 <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
        <constructor-arg value="sms_async_send"/>
        <property name="messageHandle" ref="smsMessageHandler"></property>
    </bean>

整個接入就完成了,由於這是第一版本,所以封裝的程度還不算很好,但是也基本符合應用(一個配置文件,一個實現類),有不足之處將會在后面版本進行完善迭代。

至此我們已經將kafka集成spring的功能簡單實現了,下一篇我將介紹消息隊列(kafka)的一些實際應用。

 

 

 
        
 
 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM