
首先看一下流程圖,在根據其中一個接口(快件接口)作為例子,來對整個流程進行詳解;
1 消費者執行消息類
package com.aspire.ca.prnp.service.impl;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aspire.ca.prnp.db.service.impl.Constants;
import com.aspire.ca.prnp.db.service.impl.InsertDealRealIDCheckRecordThread;
import com.aspire.ca.prnp.db.service.impl.QueueFactory;
import com.aspire.ca.prnp.domain.express.status.RealIDCheckRecord;
import com.aspire.ca.prnp.kafka.AbstractPrnpConsumer;
import com.aspire.ca.prnp.service.PrnpKernelServer;
import com.aspire.prnp.util.JsonUtil;
import com.aspire.ca.prnp.db.service.impl.ThreadFactory;
public class PostalStatusConsumer extends AbstractPrnpConsumer {
private Logger logger = LoggerFactory.getLogger(PostalStatusConsumer.class);
private QueueFactory queueFactory;
@Override //record為消費數據,kafka到InsertDealRealIDCheckRecordThread
public Boolean execute(ConsumerRecord<String, String> record) throws Exception {
if (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() == 0) {
Constants.LAST_START_COUNT_TIME = new AtomicLong(System.currentTimeMillis());
}
logger.info("消費者執行消息 offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
RealIDCheckRecord realIDCheckRecord = JsonUtil.jsonToBean(record.value(), RealIDCheckRecord.class); //ConsumerRecord<String, String>轉化為實體類
//獲取隊列工廠類
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
//啟動InsertDealRealIDCheckRecordThread線程,將InsertDealRealIDCheckRecordThread放到消費隊列中,打開監控線程;
queueFactory.addKafkaRecord(InsertDealRealIDCheckRecordThread.class, realIDCheckRecord); //6、2位置
//增加InsertDealRealIDCheckRecordThread數量,放到線程池中執行 ; 位置4
ThreadFactory.getInstance().exec(InsertDealRealIDCheckRecordThread.class, 20);
if (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() + 5 >= Integer.MAX_VALUE) {
long costTime = System.currentTimeMillis() - Constants.LAST_START_COUNT_TIME.get();
if(costTime!=0){
logger.info(
">>KafkaConsumSpeedMonitor:本次消費計數期間,共消費數據:" + Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() + ",耗時:" + (costTime / 1000) + "秒,消費速率:" + (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() / costTime) + "條/秒");
}
Constants.KAFKA_CONSUM_MESSAGE_COUNT = new AtomicInteger(0);
} else {
long costTime = (System.currentTimeMillis() - Constants.LAST_START_COUNT_TIME.get())/1000;
if(Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()%1000==0&&costTime!=0){
logger.info(">>KafkaConsumSpeedMonitor:本次啟動共消費數據:"+Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()+"條,消費速率:"+(Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()/costTime)+" 條/秒");
}
}
Constants.KAFKA_CONSUM_MESSAGE_COUNT.incrementAndGet();
return true;
}
}
2 隊列工廠類
package com.aspire.ca.prnp.db.service.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
/**
* 隊列工廠
*/
@Service
public class QueueFactory {
private Logger logger = LoggerFactory.getLogger(QueueFactory.class);
private Map<Class, Queue> QUEUE_CONTAINER = null;
private static Integer DEFAULT_THREAD_SIZE = null;
private static AtomicInteger DEFAULT_OBJECT_SIZE = null;
private Boolean IS_OBSERVER_THREAD_START = Boolean.FALSE; // 線程池觀察線程是否啟動標記
private List<String> ES_THREAD_CLASS_NAME=null;
private QueueFactory() {
init();
}
//初始化隊列工廠,默認消費線程為3個,消費的接口數(DEFAULT_OBJECT_SIZE)為2000
private void init() {
if (QUEUE_CONTAINER == null) {
QUEUE_CONTAINER = new ConcurrentHashMap<Class, Queue>();
}
if (DEFAULT_THREAD_SIZE == null) {
DEFAULT_THREAD_SIZE = 3;
}
if (DEFAULT_OBJECT_SIZE == null) {
DEFAULT_OBJECT_SIZE = new AtomicInteger(2000);
}
if(ES_THREAD_CLASS_NAME==null){
ES_THREAD_CLASS_NAME=new ArrayList<String>();
}
}
private static class QueueFactoryHolder {
public final static QueueFactory INSTANCE = new QueueFactory();
}
public static QueueFactory getInstance() {
return QueueFactoryHolder.INSTANCE;
}
/**
* 向隊列中增加一個對像
* @param key
* @param value
*/
synchronized public void add(Class key, Object value) {
if(ES_THREAD_CLASS_NAME.contains(key.getName())){
return;
}
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(3000L);
logger.debug(">>queue" + key.getName() + "休眠,對象數量為:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠異常", e);
}
}
queue.add(value);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "對象數量為:" + queue.size());
}
this.startThreadFactory(key);
}
/**
* 添加kafka消費消息
* @param key 線程類
* @param value 消費的實體類
*/
public void addKafkaRecord(Class key, Object value) {
if(ES_THREAD_CLASS_NAME.contains(key.getName())){
return;
}
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(10000L);
logger.debug(">>queue" + key.getName() + "休眠,對象數量為:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠異常", e);
}
}
queue.add(value);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "對象數量為:" + queue.size());
}
this.startThreadFactory(key);
}
/**
* 向隊列 中增加一批對像
*
* @param key
* @param values
*/
synchronized public void add(Class key, Collection values) {
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(3000L);
logger.debug(">>queue" + key.getName() + "休眠,對象數量為:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠異常", e);
}
}
queue.addAll(values);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "對象數量為:" + queue.size());
}
this.startThreadFactory(key);
}
/**
* 獲取一條數據
*
* @param key
* @param size
* @return
*/
synchronized public Object get(Class key) {
Object obj = null;
Queue queue = this.getQueue(key);
if (queue.size() > 0) {
obj = queue.poll();
}
return obj;
}
/**
* 從隊列中獲取一批數據
*
* @param key
* @param size
* @return
*/
synchronized public List get(Class key, int size) {
List result = new ArrayList(0);
for (int i = 0; i < size; i++) {
Object obj = get(key);
if (obj != null) {
result.add(obj);
} else {
break;
}
}
return result;
}
synchronized public int getSize(Class key) {
int size = 0;
if (!isEmpty(key)) {
size = this.getQueue(key).size();
}
return size;
}
/**
* 是否為空
*/
synchronized public Boolean isEmpty(Class key) {
Boolean res = Boolean.TRUE;
if (this.getQueue(key) != null && this.getQueue(key).size() > 0) {
res = Boolean.FALSE;
}
return res;
}
/**
* 獲取隊列,如果存在,直接從容器中取得,如果不存在,創建隊例,並存儲入容器
* @param key
* @return
*/
synchronized private Queue getQueue(Class key) {
Queue queue = null;
if (QUEUE_CONTAINER.containsKey(key)) {
queue = QUEUE_CONTAINER.get(key);
logger.debug(">>已通過Key:" + key + ",獲取隊列,隊列 當前大小:" + queue.size());
} else {
logger.debug(">>通過Key:" + key + "未獲取隊列 ,新創建隊列,並放入容器中。");
queue = new ConcurrentLinkedQueue();
QUEUE_CONTAINER.put(key, queue);
}
return queue;
}
public Map<Class, Queue> getAllQueue() {
return QUEUE_CONTAINER;
}
/**
* 啟動線程池
*
* @param clazz
*/
private void startThreadFactory(Class clazz) {
if (!IS_OBSERVER_THREAD_START) {
Runnable runnable = new CheckQueueFactoryThread(); // BeanUtils.instantiate(CheckQueueFactoryThread.class);
ExecuteUserCountThreadPool.getInstance().submitTask(runnable); //線程池執行線程
IS_OBSERVER_THREAD_START = Boolean.TRUE; //標記改成true,表示該線程已經被監控;
}
ThreadFactory.getInstance().exec(clazz, DEFAULT_THREAD_SIZE);
}
}
3 CheckQueueFactoryThread 監控隊列工廠線程
package com.aspire.ca.prnp.db.service.impl;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aspire.ca.prnp.domain.express.status.Indiv;
import com.aspire.ca.prnp.interfaces.IIndivUser;
import com.aspire.ca.prnp.service.PrnpKernelServer;
/**
*監控隊列工廠線程
*/
public class CheckQueueFactoryThread implements Runnable {
private static Logger logger = LoggerFactory.getLogger(CheckQueueFactoryThread.class);
private QueueFactory queueFactory = null;
private static Long LAST_OPERATE_DB_TIME = null;
private static Integer LOOP_COUNT = 0;
public CheckQueueFactoryThread() {
init();
}
@Override
public void run() {
while (true) {
StringBuilder sb = new StringBuilder();
sb.append(">>>CheckQueueFactoryThread Trace:");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("線程工廠觀察器");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("\n");
Map<Class, Queue> allQueue = queueFactory.getAllQueue();
if (allQueue != null && !allQueue.isEmpty()) {
Set set = allQueue.keySet();
if (set != null) {
Iterator<Class> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
Class clazz = keyIterator.next();
Queue queue = allQueue.get(clazz);
sb.append(">>>CheckQueueFactoryThread Trace:");
sb.append("Class is:" + clazz.getName());
sb.append(",queue size is:" + queue.size());
for (int i = 0; i < 10; i++) {
sb.append(" ");
}
sb.append("\n");
}
}
}
sb.append(">>>CheckQueueFactoryThread Trace:");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("結束線程工廠觀察器");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("\n");
logger.debug(">>>Trace Queue:" + sb.toString());
try {
Thread.sleep(30000L);
} catch (InterruptedException e) {
logger.error(">>DO Multi Data Save:", e);
}
}
}
private void init() {
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}
private void debug(String msg) {
String line = ">>batch save EcIndivAddr thread:" + msg+ " current loop count is:" + LOOP_COUNT;
logger.debug(line);
}
}
4 線程工廠類
package com.aspire.ca.prnp.db.service.impl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.BeanUtils;
/**
* 線程工廠
*/
public class ThreadFactory {
private static Map<String, Integer> THREAD_CONTAINER = null;
private static class ThreadFactoryHolder {
public final static ThreadFactory INSTANCE = new ThreadFactory();
}
private ThreadFactory() {
init();
}
private void init() {
if (THREAD_CONTAINER == null) {
THREAD_CONTAINER = new ConcurrentHashMap<String, Integer>();
}
}
//單列模式
public static ThreadFactory getInstance() {
return ThreadFactoryHolder.INSTANCE;
}
/**
* 啟動線程
* @param clazz
* @param size
*/
public void exec(Class clazz, int size) {
Integer runThreadSize = this.getThreadSize(clazz);
int need2CreateThreadSzie = size - runThreadSize;
if (need2CreateThreadSzie > 0) {
for (int i = 0; i < need2CreateThreadSzie; i++) {
Runnable runnable = BeanUtils.instantiate(clazz); //轉化成可執行線程,clazz目標線程
ExecuteUserCountThreadPool.getInstance().submitTask(runnable);//線程池執行線程
}
THREAD_CONTAINER.put(clazz.getName(), size);
}
}
private Integer getThreadSize(Class clazz) {
Integer size = 0;
String className = clazz.getName();
if (THREAD_CONTAINER.containsKey(className)) {
size = THREAD_CONTAINER.get(className);
}
return size;
}
}
5線程池
package com.aspire.ca.prnp.db.service.impl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aspire.prnp.util.ConstantUtil;
import com.aspire.webbas.configuration.config.ConfigurationHelper;
/**
* 發送下發消息報文線程池管理
*/
public class ExecuteUserCountThreadPool {
private static ExecuteUserCountThreadPool userCountThreadPool = new ExecuteUserCountThreadPool();
/**
* 線程池對象
*/
private static ExecutorService threadPool;
/**
* 單例
*/
private ExecuteUserCountThreadPool(){
int threadCount = ConfigurationHelper.getConfiguration(ConstantUtil.CONFIG_FILE_NAME_CONFIG).getInt("send_outmessage_thread_count", 500);
//初始化線程池,大小為threadCount
threadPool = Executors.newFixedThreadPool(threadCount);
}
/**
* 獲取線程池管理實例
* @return
*/
public static ExecuteUserCountThreadPool getInstance(){
return userCountThreadPool;
}
/**
* 提交任務到線程池中執行
* @param task
*/
public void submitTask(Runnable task){
threadPool.submit(task);
}
/**
* 關閉線程池,停止接收新任務
*/
public void shutdown(){
threadPool.shutdown();
}
}
6 快件接口業務線程
package com.aspire.ca.prnp.db.service.impl;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aspire.ca.prnp.domain.express.status.RealIDCheckRecord;
import com.aspire.ca.prnp.interfaces.IExpressDeliveryStatus;
import com.aspire.ca.prnp.service.PrnpKernelServer;
/**
* 快件接口業務線程
*/
public class InsertDealRealIDCheckRecordThread implements Runnable {
private static Logger logger = LoggerFactory.getLogger(InsertDealRealIDCheckRecordThread.class);
private QueueFactory queueFactory = null;
private IExpressDeliveryStatus<RealIDCheckRecord, Boolean> service = null; //快件接口類
private static Long LAST_OPERATE_DB_TIME = null;
private static Integer LOOP_COUNT = 0; //本線程計數器
private static Long TIME_STEP=Constants.REFRESH_DB_TIME_STEP;
public InsertDealRealIDCheckRecordThread() {
init();
}
@Override
public void run() {
while (true) {
long timeStep = System.currentTimeMillis() - LAST_OPERATE_DB_TIME;
if (queueFactory.getSize(InsertDealRealIDCheckRecordThread.class) > 1 || timeStep > TIME_STEP) {
//這個位置得到實體參數時,是同步的,從消費隊列中取一個數據進行消費
List<RealIDCheckRecord> realIDCheckRecordList = queueFactory.get(InsertDealRealIDCheckRecordThread.class,1);
if (CollectionUtils.isNotEmpty(realIDCheckRecordList)) {
try {
service.expressStatusInfo(realIDCheckRecordList.get(0)); //快件接口業務
logger.debug("InsertDealRealIDCheckRecordThread消費kafka取出的快件狀態消息");
} catch (Exception e) {
logger.error(">>DO Multi Data Save:", e);
queueFactory.add(InsertDealRealIDCheckRecordThread.class,realIDCheckRecordList);
} finally {
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}
}
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
logger.error(">>DO Multi Data Save:", e);
}
LOOP_COUNT++;
}
}
private void init() {
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
service = (IExpressDeliveryStatus<RealIDCheckRecord, Boolean>) PrnpKernelServer.getCtx().getBean("expressDeliveryStatusImpl2");
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}
private void debug(String msg) {
String line = ">>batch save EcIndivAddr thread:" + msg+" current loop count is:"+LOOP_COUNT;
logger.debug(line);
}
}
7 快件接口 這里不貼了,可以參考流程圖中線程1、線程2、線程3、線程4相關隊列和線程操作;
