監聽kafka消息


1、main方法中(1.0以上)

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Kafka消息消費者
 * 〈功能詳細描述〉
 *
 * @author 17090889
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */
public class ConsumerSample {
    public static void main(String[] args) {
        String topic = "test-topic";
        Properties props = new Properties();
        // Kafka集群,多台服務器地址之間用逗號隔開
        props.put("bootstrap.servers", "localhost:9092");
        // 消費組ID
        props.put("group.id", "test_group1");
        // Consumer的offset是否自動提交
        props.put("enable.auto.commit", "true");
        // 自動提交offset到zk的時間間隔,時間單位是毫秒
        props.put("auto.commit.interval.ms", "1000");
        // 消息的反序列化類型
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 訂閱的話題
        consumer.subscribe(Arrays.asList(topic));
        // Consumer調用poll方法來輪詢Kafka集群的消息,一直等到Kafka集群中沒有消息或者達到超時時間100ms為止
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.partition() + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }
    }
}

 

2、Spring下kafka1.0以上版本(不依賴Spring-Kafka)

 

3、Spring下kafka 0.8版本

  1)kafka消費者抽象工廠類

/**
 * kafka消費者抽象工廠類
 * 〈功能詳細描述〉
 *
 * @author
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class);

    /**
     * 消費的Topic與消費線程數組成的Map
     */
    private Map<String, Integer> topicThreadMap;
    /**
     * Consumer實例所需的配置
     */
    private Properties properties;

    /**
     * 線程池
     */
    private ThreadPoolExecutor taskExecutor;

    private ConsumerConnector consumerConnector;

    /**
     * zkConnect
     */
    private String zkConnect;

    @Value("${kafka.groupId}")
    private String groupId;

    /**
     * sessionTimeOut
     */
    @Value("${kafka.sessionTimeOut}")
    private String sessionTimeOut;

    /**
     * syncTime
     */
    @Value("${kafka.syncTime}")
    private String syncTime;

    /**
     * commitInterval
     */
    @Value("${kafka.commitInterval}")
    private String commitInterval;

    /**
     * offsetReset
     */
    @Value("${kafka.offsetReset}")
    private String offsetReset;


    @Override
    public void afterPropertiesSet() {
        logger.info("afterPropertiesSet-start");
        // 初始化properties
        if(properties==null){
            properties = new Properties();
            properties.put("zookeeper.connect", zkConnect);
            logger.info("zkConnect={}", zkConnect);
            // group 代表一個消費組
            properties.put("group.id", groupId);
            logger.info("groupId={}", groupId);
            // zk連接超時
            properties.put("zookeeper.session.timeout.ms", sessionTimeOut);
            properties.put("zookeeper.sync.time.ms", syncTime);
            properties.put("auto.commit.interval.ms", commitInterval);
            properties.put("auto.offset.reset", offsetReset);
            // 序列化類
            properties.put("serializer.class", "kafka.serializer.StringEncoder");

            properties.put("rebalance.max.retries", "10");
            // 當rebalance發生時,兩個相鄰retry操作之間需要間隔的時間。
            properties.put("rebalance.backoff.ms", "3100");
        }

        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap);
        // 實際有多少個stream,就設置多少個線程處理
//        int messageProcessThreadNum = 0;
//        for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) {
//            messageProcessThreadNum = messageProcessThreadNum + streamList.size();
//        }
        // 創建實際處理消息的線程池
        taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000));
        for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) {
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                taskExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        ConsumerIterator<byte[], byte[]> it = stream.iterator();
                        while (it.hasNext()) {
                            MessageAndMetadata<byte[], byte[]> data = it.next();
                            try {
                                String kafkaMsg = new String(data.message(),"UTF-8");
                                logger.info("來自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg);
                                // 消息處理
                                onMessage(data);
                            } catch (RuntimeException e) {
                                logger.error("處理消息異常.", e);
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }

                        }
                    }

                });
            }
        }

    }

    /**
     * 消息處理類
     * @param data
     */
    protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data);

    @Override
    public void destroy() throws Exception {
        try {
            if (consumerConnector != null) {
                consumerConnector.shutdown();
            }
        } catch (Exception e) {
            logger.warn("shutdown consumer failed", e);
        }
        try {
            if (taskExecutor != null) {
                taskExecutor.shutdown();
            }
        } catch (Exception e) {
            logger.warn("shutdown messageProcessExecutor failed", e);
        }
        logger.info("shutdown consumer successfully");
    }

    public Properties getProperties() {
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public Map<String, Integer> getTopicThreadMap() {
        return topicThreadMap;
    }

    public void setTopicThreadMap(Map<String, Integer> topicThreadMap) {
        this.topicThreadMap = topicThreadMap;
    }

    public String getZkConnect() {
        return zkConnect;
    }

    public void setZkConnect(String zkConnect) {
        this.zkConnect = zkConnect;
    }
}

  2)具體的kafka消費者實現類

import com.xxx.sfmms.common.util.JsonConvertUtil;
import com.xxx.sfmms.common.util.RedisUtil;
import com.xxx.sfmms.common.util.StringUtil;
import com.xxx.sfmms.service.intf.RecommendService;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

/**
 * 實名kafka消費者
 * 〈功能詳細描述〉
 *
 * @author 17090889
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */
public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory {

    private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class);

    private static final String STR_INVOKENO = "invokeNo";

    @Autowired
    private RecommendService recommendService;


    /**
     * 消息處理
     * @param data
     */
    @Override
    protected void onMessage(MessageAndMetadata<byte[], byte[]> data) {
        MDC.put(STR_INVOKENO, StringUtil.getUuid());
        String msg="";
        try {
            msg=new String(data.message(),"UTF-8");
            LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic());
        } catch (UnsupportedEncodingException e) {
            LOGGER.info("字節數組轉字符串異常");
            e.printStackTrace();
        }
        // 實名的事后kafka數據
        Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class);
        LOGGER.info("RealNameKafkaConsumer-map={}", map);
        String userNo = map.get("eppAccountNO");
        LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo);
        String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS");
        // 不是渠道6被邀請用戶
        if(!"1".equals(flag)){
            LOGGER.info("不是渠道6拉新用戶");
            return;
        }
        // 20-初級認證 30-高級實名認證   40- 實名申訴降級、50-高級到期降級 60-實名撤銷(人工手動降級) 70-申訴找回身份降級
        String authenStatus=map.get("authenStatus");
        // 真實姓名
        String realName=map.get("realName");
        // 身份證號碼
        String idNo = map.get("idNO");
        // apptoken
        String appToken=map.get("appToken");
        // 校驗任務
        Map<String, String> paramMap = new HashMap<String, String>(4);
        paramMap.put("userNo", userNo);
        paramMap.put("authenStatus",authenStatus);
        paramMap.put("realName",realName);
        paramMap.put("idNo", idNo);
        paramMap.put("appToken",appToken);
        Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap);
        LOGGER.info("resultMap={}", resultMap);
        MDC.remove(STR_INVOKENO);
    }
}

  3)實現類的bean注入配置

<bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer">
   <property name="topicThreadMap">
      <map>
         <entry key="${realTopic}" value="5"/>
      </map>
   </property>
   <property name="zkConnect">
      <value>${realZkConnect}</value>
   </property>
</bean>


<bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer">
   <property name="topicThreadMap">
      <map>
         <entry key="${rxdTopic}" value="5"/>
      </map>
   </property>
   <property name="zkConnect">
      <value>${rxdZkConnect}</value>
   </property>
</bean>

  4)kafka consumer參數配置

#kafka監聽配置
#實zk
realZkConnect=xxx
#topic
realTopic=xxx
#任zk
rxdZkConnect=xxx
#任性貸topic
rxdTopic=xxx
kafka.sessionTimeOut=6000
kafka.syncTime=2000
kafka.commitInterval=30000
kafka.offsetReset=smallest
kafka.groupId=xxx

  5)依賴包配置

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.9.2</artifactId>
   <version>0.8.1.1</version>
   <exclusions>
      <exclusion>
         <artifactId>jmxtools</artifactId>
         <groupId>com.sun.jdmk</groupId>
      </exclusion>
      <exclusion>
         <artifactId>jmxri</artifactId>
         <groupId>com.sun.jmx</groupId>
      </exclusion>
   </exclusions>
</dependency>

 

END


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM