一種異步消費kafka消息的實現機制


本文將從消息流轉過程以及各步驟實現方式來進行闡述,代碼基於springboot項目,配置文件yml格式:

  1. 項目啟動時啟動kafka消息消費線程
  2. 接收kafka消息
  3. 將kafka消息添加進對應的阻塞隊列,消費消息
  4. 程序出錯處理辦法
  5. 總結

1.項目啟動時啟動kafka消息消費線程

​ 消費kafka消息的類實現一個生命周期管理接口,這個接口自己定義,我這設為LifeCycle。

public interface LifeCycle {
	/**
	 * start
	 */
	void startup();

	/**
	 * 生命周期結束時調用
	 */
	void shutdown();
}

​ 該LIfeCycle類在組件生命周期管理類ComponentContainer(自定義)中進行管理,該管理類實現org.springframework.context中的ApplicationListener接口。

/**
 *  組件生命周期管理
 */
@Slf4j
@Component
public class ComponentContainer implements ApplicationListener<ContextRefreshedEvent> {

	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		//  get beans of LifeCycle
		Map<String, LifeCycle> components = event.getApplicationContext().getBeansOfType(LifeCycle.class);
		Collection<LifeCycle> instances = retrievalInstance(components);
		//  startup
		instances.forEach(LifeCycle::startup);
		//  shutdown
		Runtime.getRuntime().addShutdownHook(new Thread(() -> {
			instances.forEach(LifeCycle::shutdown);
		}));
	}
	/**
	 * retrieval instance of LifeCycle
	 */
	private Collection<LifeCycle> retrievalInstance(Map<String, LifeCycle> components) {
		Collection<LifeCycle> allInstances = components == null ? new ArrayList<>() : new ArrayList<>(components.values());
		return allInstances;
	}
}

這樣程序啟動時,就會執行LifeCycle接口的實現類的startup方法了。


2.接收kafka消息

​ 注解org.springframework.kafka.annotation.KafkaListener監聽kafka消息,在yml配置文件中配置好topics和containerFactory的值

@KafkaListener(topics = {
		"${kafka.XXX.topics}"
	}, containerFactory = "${kafka.XXX.properties.listener-names}")
	public void onMessage1(ConsumerRecord<String, String> record) {
		try {
			LOGGER.info("收到消息 record = {}",record.value());
			doDealMessage(record.value());
		} catch (Exception e) {
			LOGGER.info("處理消息出錯 record = {}",record.value());
		}
	}


3.將kafka消息添加進對應的阻塞隊列,消費消息

​ kafka消息消費類MessageConsumer:

​ 兩個具體的消息消費類:Message1Consumer ,Message2Consumer


@Slf4j
@Service
public class MessageConsumer implements LifeCycle {

	/**
	 *  數據中轉隊列
	 */
	private CommonQueue<String> queue1;
	private CommonQueue<String> queue2;
	/**
	 *  收到kafka消息
	 * @param record
	 */
	@KafkaListener(topics = {
		"${kafka.XXX.topics}"
	}, containerFactory = "${kafka.XXX.properties.listener-name}")
	public void onMessage1(ConsumerRecord<String, String> record) {
		try {
			LOGGER.info("收到消息 record = {}",record.value());
			doDealMessage1(record.value());
		} catch (Exception e) {
			LOGGER.info("處理消息出錯 record = {}",record.value());
		}
	}

	/**
	 *  收到kafka消息
	 * @param record
	 */
	@KafkaListener(topics = {
		"${kafka.XXX.topics}"
	}, containerFactory = "${kafka.XXX1.properties.listener-name}")
	public void onMessage(ConsumerRecord<String, String> record) {
		try {
			LOGGER.info("收到消息 record ={}",record.value());
			doDealMessage2(record.value());
		} catch (Exception e) {
			LOGGER.info("處理消息出錯 record = {}",record.value());
		}
	}

	public void doDealMessage1(String data) {
		queue1.add(data);
	}
	public void doDealMessage2(String data) {
		queue2.add(data);
	}

	@Override
	public void startup() {
		queue1 = new CommonQueue<>(new Message1Consumer());
		queue2 = new CommonQueue<>(new Message2Consumer());
	}

	@Override
	public void shutdown() {
		if (queue1 != null) {
			queue1.shutdown();
		}
		if (queue2 != null) {
			queue2.shutdown();
		}
	}

	/**
	 *  數據1消費隊列
	 */
	private class Message1Consumer implements QueueConsumer<String> {

		@Override
		public void accept(String messageVo) {
			// 處理
			try {
				//處理消息1
				}
			} catch (Exception e) {
				LOGGER.error("處理圖譜數據出現異常,data={}", messageVo, e);
			}
		}
	}
	/**
	 *  數據2消費隊列
	 */
	private class Message2Consumer implements QueueConsumer<String> {

		@Override
		public void accept(String messageVo) {
			try {
				//處理消息2
			} catch (Exception e) {
				LOGGER.error("處理消息出現異常,data={}", messageVo, e);
			}
		}
	}
}

CommonQueue類:隊列類,初始化阻塞隊列,並開啟線程

public class CommonQueue<T> {
	private final Queue<T> queue;
	private final Thread consumerThread;
	private volatile boolean actived = true;

	public CommonQueue(QueueConsumer<T> consumer) {
		this.queue = new ArrayBlockingQueue<>(2000);
		this.consumerThread = new Thread(new Consumer(queue, consumer), "common-queue-consumer-thread");
		this.consumerThread.start();
	}

	public boolean add(T e) {
		return queue.add(e);
	}

	public void shutdown() {
		this.actived = false;
	}

	private class Consumer implements Runnable {
		private Queue<T> queue;
		private QueueConsumer<T> consumer;

		public Consumer(Queue<T> queue, QueueConsumer<T> consumer) {
			this.queue = queue;
			this.consumer = consumer;
		}

		@Override
		public void run() {
			while (actived) {
				T e = queue.poll();
				if (e != null) {
					this.consumer.accept(e);
				} else {
					try {
						Thread.sleep(100);
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
				}
			}
		}
	}
}

QueueConsumer接口:具體的消息消費類實現該接口

public interface QueueConsumer<T> {

   void accept(T e);
}

4.程序異常處理機制

​ 當程序出錯時,停止線程處理阻塞隊列中的消息

public void shutdown() {
		this.actived = false;
	}

​ JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以在一下幾種場景中被調用:

  1. 程序正常退出
  2. 使用System.exit()
  3. 終端使用Ctrl+C觸發的中斷
  4. 系統關閉
  5. OutOfMemory宕機
  6. 使用Kill pid命令干掉進程(注:在使用kill -9 pid時,是不會被調用的)

5.總結

​ 該實現機制在獲取到kafka消息后,將消息存到本地阻塞隊列ArrayBlockingQueue中,一類消息擁有自己的隊列,讓對應的線程去取並處理該阻塞隊列中的消息;一方面可以盡快的消費kafka的消息,防止消費者無法跟上數據生成的速度;另一方面容易擴展,具體的消息消費類實現通用accept()方法,實現方法的具體邏輯即可在新線程中異步執行消費,不需要在具體的消費類中關注是否開啟新線程執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM