我們先來看下簡單的kafka生產者和消費者模式代碼:
生產者KafkaProducer
/** * @author xiaofeng * @version V1.0 * @title: KafkaProducer.java * @package: com.yingda.xsignal.app.test * @description: kafka生產者demo * @date 2018/4/4 0004 上午 11:20 */ public class KafkaProducer extends Thread { private String topic; public KafkaProducer(String topic) { super(); this.topic = topic; } @Override public void run() { Producer producer = createProducer(); int i = 0; while (true) { String msg = "message"; producer.send(new KeyedMessage<Integer, String>(topic, msg + (i++))); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private Producer createProducer() { Properties properties = new Properties(); //聲明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "10.0.2.22:9092"); properties.put("batch.size", 4096); return new Producer<Integer, String>(new ProducerConfig(properties)); } public static void main(String[] args) { new KafkaProducer("TEST_TOPIC").start(); } }
消費者KafkaConsumer
/** * @author xiaofeng * @version V1.0 * @title: KafkaConsumer.java * @package: com.yingda.xsignal.app.test * @description: 單線程消費模式 * @date 2018/4/4 0004 上午 11:18 */ public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic) { super(); this.topic = topic; } @Override public void run() { ConsumerConnector consumer = createConsumer(); Map<String, Integer> topicCountMap = Maps.newHashMap(); // 一次從主題中獲取一個數據 topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 獲取每次接收到的這個數據 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("接收到: " + message); } } private ConsumerConnector createConsumer() { Properties properties = new Properties(); // //聲明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); // // 消費組 properties.put("group.id", "test-group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("TEST_TOPIC").start(); }
分別啟動producer 和consumer
查看消費者控制台:

雖然已成功生產和消費,但是這種消費模式很明顯是單個topic和單線程的形式,那么如果我一次性要訂閱多個topic 而且需要多線程消費該怎樣做呢?接下來讓我們一探究竟吧!
構建多線程消費KafkaConsumer
/** * @author xiaofeng * @version V1.0 * @title: OrderBackConsumer.java * @package: com.yingda.xsignal.app.consumer * @description: 訂單備份消費者 * @date 2018/3/16 0016 下午 4:46 */ public class OrderBackConsumer extends BaseSpringApp { protected static final Logger logger = LoggerFactory.getLogger(OrderBackConsumer.class); private final ConsumerConnector consumer; private final String signalTopic = "SIGNAL_ORDERINFO"; private final String followTopic = "FOLLOW_ORDERINFO"; private final String signalHisTopic = "HIS_ORDERINFO"; private final String followHisTopic = "FOLLOW_HIS_ORDERINFO"; private ConsumerConfig consumerConfig; private static int threadNum = 6; /** * Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 6; /** * Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 1024; /** * thread prefix name */ private String ThreadNamePrefix = "kafka-consumer-pool-%d"; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(ThreadNamePrefix).build(); /** * Common Thread Pool */ ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public OrderBackConsumer(String[] args) { super(args, "classpath:app-KafkaConsumer.xml"); Properties properties = new Properties(); //開發環境:10.0.2.22:2181 properties.put("zookeeper.connect", "10.0.2.22:2181"); // 組名稱 properties.put("group.id", "back_consumer_group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); consumerConfig = new ConsumerConfig(properties); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); } @Override public void shutdown() { if (consumer != null) { consumer.shutdown(); } if (pool != null) { pool.shutdown(); } try { if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int numThreads) { Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(signalTopic, new Integer(numThreads)); topicCountMap.put(followTopic, new Integer(numThreads)); topicCountMap.put(signalHisTopic, new Integer(numThreads)); topicCountMap.put(followHisTopic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); consumerMap.values().stream().forEach(value -> { List<KafkaStream<byte[], byte[]>> streams = value; int threadNumber = 0; /** * 可以為每隔topic創建一個線程池,因為每個topic我設置的partition=6 * (kafka consumer通過增加線程數來增加消費能力,但是需要足夠的分區,如目前我設置的partition=6,那么並發可以啟動6個線程同時消費) * ExecutorService pool = createThreadPool(); */ for (final KafkaStream stream : streams) { pool.submit(new KafkaOrderConsumer(stream, threadNumber)); threadNumber++; } }); } /** * 創建線程池 * * @return */ private ExecutorService createThreadPool() { ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); return pool; } public static void main(String[] args) { int threads = 1; if (args.length < 1) { threads = threadNum; } else { threads = Integer.parseInt(args[0]); } OrderBackConsumer example = new OrderBackConsumer(args); example.run(threads); try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException ie) { } example.shutdown(); }
/** * @author xiaofeng * @version V1.0 * @title: KafkaOrderConsumer.java * @package: com.yingda.xsignal.app.service.impl * @description: kafka消費服務 * @date 2018/3/20 0020 下午 8:03 */ public class KafkaOrderConsumer implements Runnable { protected static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumer.class); private KafkaStream stream; private int threadNumber; public KafkaOrderConsumer(KafkaStream stream, int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; } @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { //消費隊列內容 final MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); try { final byte[] messageBytes = (byte[]) messageAndMetadata.message(); if (messageBytes != null && messageBytes.length > 0) { String content = new String(messageBytes); logger.info("message:'" + content + "'"); /** * @// TODO: 2018/3/20 0020 消費入庫 */ } } catch (Exception e) { logger.error("kafka back order consumer error", e); } } logger.info("Shutting down Thread: " + threadNumber); } }
以上代碼中,我們創建了一個線程池,線程數為6,因為我設置的partition=6,而且一次性訂閱了4個topic(當然這些topic要真實存在哦),測試的時候隨便往哪個topic中寫數據都可以收到相應的消費數據哦。
