1、main方法中(1.0以上)
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Kafka消息消費者 * 〈功能詳細描述〉 * * @author 17090889 * @see [相關類/方法](可選) * @since [產品/模塊版本] (可選) */ public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic"; Properties props = new Properties(); // Kafka集群,多台服務器地址之間用逗號隔開 props.put("bootstrap.servers", "localhost:9092"); // 消費組ID props.put("group.id", "test_group1"); // Consumer的offset是否自動提交 props.put("enable.auto.commit", "true"); // 自動提交offset到zk的時間間隔,時間單位是毫秒 props.put("auto.commit.interval.ms", "1000"); // 消息的反序列化類型 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 訂閱的話題 consumer.subscribe(Arrays.asList(topic)); // Consumer調用poll方法來輪詢Kafka集群的消息,一直等到Kafka集群中沒有消息或者達到超時時間100ms為止 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.partition() + record.offset()); System.out.println(record.key()); System.out.println(record.value()); } } } }
2、Spring下kafka1.0以上版本(不依賴Spring-Kafka)
3、Spring下kafka 0.8版本
1)kafka消費者抽象工廠類
/** * kafka消費者抽象工廠類 * 〈功能詳細描述〉 * * @author * @see [相關類/方法](可選) * @since [產品/模塊版本] (可選) */ import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class); /** * 消費的Topic與消費線程數組成的Map */ private Map<String, Integer> topicThreadMap; /** * Consumer實例所需的配置 */ private Properties properties; /** * 線程池 */ private ThreadPoolExecutor taskExecutor; private ConsumerConnector consumerConnector; /** * zkConnect */ private String zkConnect; @Value("${kafka.groupId}") private String groupId; /** * sessionTimeOut */ @Value("${kafka.sessionTimeOut}") private String sessionTimeOut; /** * syncTime */ @Value("${kafka.syncTime}") private String syncTime; /** * commitInterval */ @Value("${kafka.commitInterval}") private String commitInterval; /** * offsetReset */ @Value("${kafka.offsetReset}") private String offsetReset; @Override public void afterPropertiesSet() { logger.info("afterPropertiesSet-start"); // 初始化properties if(properties==null){ properties = new Properties(); properties.put("zookeeper.connect", zkConnect); logger.info("zkConnect={}", zkConnect); // group 代表一個消費組 properties.put("group.id", groupId); logger.info("groupId={}", groupId); // zk連接超時 properties.put("zookeeper.session.timeout.ms", sessionTimeOut); properties.put("zookeeper.sync.time.ms", syncTime); properties.put("auto.commit.interval.ms", commitInterval); properties.put("auto.offset.reset", offsetReset); // 序列化類 properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("rebalance.max.retries", "10"); // 當rebalance發生時,兩個相鄰retry操作之間需要間隔的時間。 properties.put("rebalance.backoff.ms", "3100"); } ConsumerConfig consumerConfig = new ConsumerConfig(properties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap); // 實際有多少個stream,就設置多少個線程處理 // int messageProcessThreadNum = 0; // for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) { // messageProcessThreadNum = messageProcessThreadNum + streamList.size(); // } // 創建實際處理消息的線程池 taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000)); for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) { for (final KafkaStream<byte[], byte[]> stream : streams) { taskExecutor.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); try { String kafkaMsg = new String(data.message(),"UTF-8"); logger.info("來自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg); // 消息處理 onMessage(data); } catch (RuntimeException e) { logger.error("處理消息異常.", e); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } }); } } } /** * 消息處理類 * @param data */ protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data); @Override public void destroy() throws Exception { try { if (consumerConnector != null) { consumerConnector.shutdown(); } } catch (Exception e) { logger.warn("shutdown consumer failed", e); } try { if (taskExecutor != null) { taskExecutor.shutdown(); } } catch (Exception e) { logger.warn("shutdown messageProcessExecutor failed", e); } logger.info("shutdown consumer successfully"); } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public Map<String, Integer> getTopicThreadMap() { return topicThreadMap; } public void setTopicThreadMap(Map<String, Integer> topicThreadMap) { this.topicThreadMap = topicThreadMap; } public String getZkConnect() { return zkConnect; } public void setZkConnect(String zkConnect) { this.zkConnect = zkConnect; } }
2)具體的kafka消費者實現類
import com.xxx.sfmms.common.util.JsonConvertUtil; import com.xxx.sfmms.common.util.RedisUtil; import com.xxx.sfmms.common.util.StringUtil; import com.xxx.sfmms.service.intf.RecommendService; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; /** * 實名kafka消費者 * 〈功能詳細描述〉 * * @author 17090889 * @see [相關類/方法](可選) * @since [產品/模塊版本] (可選) */ public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory { private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class); private static final String STR_INVOKENO = "invokeNo"; @Autowired private RecommendService recommendService; /** * 消息處理 * @param data */ @Override protected void onMessage(MessageAndMetadata<byte[], byte[]> data) { MDC.put(STR_INVOKENO, StringUtil.getUuid()); String msg=""; try { msg=new String(data.message(),"UTF-8"); LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic()); } catch (UnsupportedEncodingException e) { LOGGER.info("字節數組轉字符串異常"); e.printStackTrace(); } // 實名的事后kafka數據 Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class); LOGGER.info("RealNameKafkaConsumer-map={}", map); String userNo = map.get("eppAccountNO"); LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo); String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS"); // 不是渠道6被邀請用戶 if(!"1".equals(flag)){ LOGGER.info("不是渠道6拉新用戶"); return; } // 20-初級認證 30-高級實名認證 40- 實名申訴降級、50-高級到期降級 60-實名撤銷(人工手動降級) 70-申訴找回身份降級 String authenStatus=map.get("authenStatus"); // 真實姓名 String realName=map.get("realName"); // 身份證號碼 String idNo = map.get("idNO"); // apptoken String appToken=map.get("appToken"); // 校驗任務 Map<String, String> paramMap = new HashMap<String, String>(4); paramMap.put("userNo", userNo); paramMap.put("authenStatus",authenStatus); paramMap.put("realName",realName); paramMap.put("idNo", idNo); paramMap.put("appToken",appToken); Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap); LOGGER.info("resultMap={}", resultMap); MDC.remove(STR_INVOKENO); } }
3)實現類的bean注入配置
<bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer"> <property name="topicThreadMap"> <map> <entry key="${realTopic}" value="5"/> </map> </property> <property name="zkConnect"> <value>${realZkConnect}</value> </property> </bean> <bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer"> <property name="topicThreadMap"> <map> <entry key="${rxdTopic}" value="5"/> </map> </property> <property name="zkConnect"> <value>${rxdZkConnect}</value> </property> </bean>
4)kafka consumer參數配置
#kafka監聽配置 #實zk realZkConnect=xxx #topic realTopic=xxx #任zk rxdZkConnect=xxx #任性貸topic rxdTopic=xxx kafka.sessionTimeOut=6000 kafka.syncTime=2000 kafka.commitInterval=30000 kafka.offsetReset=smallest kafka.groupId=xxx
5)依賴包配置
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> </exclusions> </dependency>
END