多線程消費隊列中的接口數據,接口數據來源是kafka


首先看一下流程圖,在根據其中一個接口(快件接口)作為例子,來對整個流程進行詳解;

1  消費者執行消息類

package com.aspire.ca.prnp.service.impl;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aspire.ca.prnp.db.service.impl.Constants;
import com.aspire.ca.prnp.db.service.impl.InsertDealRealIDCheckRecordThread;
import com.aspire.ca.prnp.db.service.impl.QueueFactory;
import com.aspire.ca.prnp.domain.express.status.RealIDCheckRecord;
import com.aspire.ca.prnp.kafka.AbstractPrnpConsumer;
import com.aspire.ca.prnp.service.PrnpKernelServer;
import com.aspire.prnp.util.JsonUtil;
import com.aspire.ca.prnp.db.service.impl.ThreadFactory;

public class PostalStatusConsumer extends AbstractPrnpConsumer {

private Logger logger = LoggerFactory.getLogger(PostalStatusConsumer.class);

private QueueFactory queueFactory;

@Override  //record為消費數據,kafka到InsertDealRealIDCheckRecordThread
public Boolean execute(ConsumerRecord<String, String> record) throws Exception {
if (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() == 0) {
Constants.LAST_START_COUNT_TIME = new AtomicLong(System.currentTimeMillis());
}
logger.info("消費者執行消息 offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
RealIDCheckRecord realIDCheckRecord = JsonUtil.jsonToBean(record.value(), RealIDCheckRecord.class); //ConsumerRecord<String, String>轉化為實體類

//獲取隊列工廠類

queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");

//啟動InsertDealRealIDCheckRecordThread線程,將InsertDealRealIDCheckRecordThread放到消費隊列中,打開監控線程;

queueFactory.addKafkaRecord(InsertDealRealIDCheckRecordThread.class, realIDCheckRecord);  //6、2位置

//增加InsertDealRealIDCheckRecordThread數量,放到線程池中執行 ; 位置4
ThreadFactory.getInstance().exec(InsertDealRealIDCheckRecordThread.class, 20);  
if (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() + 5 >= Integer.MAX_VALUE) {
long costTime = System.currentTimeMillis() - Constants.LAST_START_COUNT_TIME.get();
if(costTime!=0){
logger.info(
">>KafkaConsumSpeedMonitor:本次消費計數期間,共消費數據:" + Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() + ",耗時:" + (costTime / 1000) + "秒,消費速率:" + (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() / costTime) + "條/秒");
}
Constants.KAFKA_CONSUM_MESSAGE_COUNT = new AtomicInteger(0);
} else {
long costTime = (System.currentTimeMillis() - Constants.LAST_START_COUNT_TIME.get())/1000;
if(Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()%1000==0&&costTime!=0){
logger.info(">>KafkaConsumSpeedMonitor:本次啟動共消費數據:"+Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()+"條,消費速率:"+(Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()/costTime)+" 條/秒");
}
}
Constants.KAFKA_CONSUM_MESSAGE_COUNT.incrementAndGet();
return true;
}

}

2 隊列工廠類

package com.aspire.ca.prnp.db.service.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;

/**
* 隊列工廠
*/
@Service
public class QueueFactory {
private Logger logger = LoggerFactory.getLogger(QueueFactory.class);
private Map<Class, Queue> QUEUE_CONTAINER = null;

private static Integer DEFAULT_THREAD_SIZE = null;

private static AtomicInteger DEFAULT_OBJECT_SIZE = null;

private Boolean IS_OBSERVER_THREAD_START = Boolean.FALSE;   // 線程池觀察線程是否啟動標記 

private List<String> ES_THREAD_CLASS_NAME=null;

private QueueFactory() {
init();
}

//初始化隊列工廠,默認消費線程為3個,消費的接口數(DEFAULT_OBJECT_SIZE)為2000

private void init() {
if (QUEUE_CONTAINER == null) {
QUEUE_CONTAINER = new ConcurrentHashMap<Class, Queue>();
}
if (DEFAULT_THREAD_SIZE == null) {
DEFAULT_THREAD_SIZE = 3;
}
if (DEFAULT_OBJECT_SIZE == null) {
DEFAULT_OBJECT_SIZE = new AtomicInteger(2000);
}
if(ES_THREAD_CLASS_NAME==null){
ES_THREAD_CLASS_NAME=new ArrayList<String>();
}

}

private static class QueueFactoryHolder {
public final static QueueFactory INSTANCE = new QueueFactory();
}

public static QueueFactory getInstance() {
return QueueFactoryHolder.INSTANCE;
}

/**
* 向隊列中增加一個對像
* @param key
* @param value
*/

synchronized public void add(Class key, Object value) {
if(ES_THREAD_CLASS_NAME.contains(key.getName())){
return;
}
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(3000L);
logger.debug(">>queue" + key.getName() + "休眠,對象數量為:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠異常", e);
}
}
queue.add(value);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "對象數量為:" + queue.size());
}
this.startThreadFactory(key);
}

/**
* 添加kafka消費消息
* @param key 線程類
* @param value 消費的實體類
*/
public void addKafkaRecord(Class key, Object value) {
if(ES_THREAD_CLASS_NAME.contains(key.getName())){
return;
}
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(10000L);
logger.debug(">>queue" + key.getName() + "休眠,對象數量為:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠異常", e);
}
}
queue.add(value);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "對象數量為:" + queue.size());
}
this.startThreadFactory(key);
}
/**
* 向隊列 中增加一批對像
*
* @param key
* @param values
*/
synchronized public void add(Class key, Collection values) {
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(3000L);
logger.debug(">>queue" + key.getName() + "休眠,對象數量為:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠異常", e);
}
}
queue.addAll(values);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "對象數量為:" + queue.size());
}
this.startThreadFactory(key);
}

/**
* 獲取一條數據
*
* @param key
* @param size
* @return
*/
synchronized public Object get(Class key) {
Object obj = null;
Queue queue = this.getQueue(key);
if (queue.size() > 0) {
obj = queue.poll();
}
return obj;
}

/**
* 從隊列中獲取一批數據
*
* @param key
* @param size
* @return
*/
synchronized public List get(Class key, int size) {
List result = new ArrayList(0);
for (int i = 0; i < size; i++) {
Object obj = get(key);
if (obj != null) {
result.add(obj);
} else {
break;
}
}
return result;
}

synchronized public int getSize(Class key) {
int size = 0;
if (!isEmpty(key)) {
size = this.getQueue(key).size();
}
return size;
}

/**
* 是否為空
*/
synchronized public Boolean isEmpty(Class key) {
Boolean res = Boolean.TRUE;
if (this.getQueue(key) != null && this.getQueue(key).size() > 0) {
res = Boolean.FALSE;
}
return res;
}

/**
* 獲取隊列,如果存在,直接從容器中取得,如果不存在,創建隊例,並存儲入容器
* @param key
* @return
*/
synchronized private Queue getQueue(Class key) {
Queue queue = null;
if (QUEUE_CONTAINER.containsKey(key)) {
queue = QUEUE_CONTAINER.get(key);
logger.debug(">>已通過Key:" + key + ",獲取隊列,隊列 當前大小:" + queue.size());
} else {
logger.debug(">>通過Key:" + key + "未獲取隊列 ,新創建隊列,並放入容器中。");
queue = new ConcurrentLinkedQueue();
QUEUE_CONTAINER.put(key, queue);
}
return queue;
}

public Map<Class, Queue> getAllQueue() {
return QUEUE_CONTAINER;
}

/**
* 啟動線程池
*
* @param clazz
*/
private void startThreadFactory(Class clazz) {
if (!IS_OBSERVER_THREAD_START) {
Runnable runnable = new CheckQueueFactoryThread();       // BeanUtils.instantiate(CheckQueueFactoryThread.class);
ExecuteUserCountThreadPool.getInstance().submitTask(runnable);  //線程池執行線程
IS_OBSERVER_THREAD_START = Boolean.TRUE;   //標記改成true,表示該線程已經被監控;
}
ThreadFactory.getInstance().exec(clazz, DEFAULT_THREAD_SIZE);
}

}

3 CheckQueueFactoryThread  監控隊列工廠線程

package com.aspire.ca.prnp.db.service.impl;

import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aspire.ca.prnp.domain.express.status.Indiv;
import com.aspire.ca.prnp.interfaces.IIndivUser;
import com.aspire.ca.prnp.service.PrnpKernelServer;

/**
*監控隊列工廠線程
*/
public class CheckQueueFactoryThread implements Runnable {

private static Logger logger = LoggerFactory.getLogger(CheckQueueFactoryThread.class);
private QueueFactory queueFactory = null;
private static Long LAST_OPERATE_DB_TIME = null;
private static Integer LOOP_COUNT = 0;

public CheckQueueFactoryThread() {
init();
}

@Override
public void run() {
while (true) {
StringBuilder sb = new StringBuilder();
sb.append(">>>CheckQueueFactoryThread Trace:");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("線程工廠觀察器");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("\n");
Map<Class, Queue> allQueue = queueFactory.getAllQueue();
if (allQueue != null && !allQueue.isEmpty()) {
Set set = allQueue.keySet();
if (set != null) {
Iterator<Class> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
Class clazz = keyIterator.next();
Queue queue = allQueue.get(clazz);
sb.append(">>>CheckQueueFactoryThread Trace:");
sb.append("Class is:" + clazz.getName());
sb.append(",queue size is:" + queue.size());
for (int i = 0; i < 10; i++) {
sb.append(" ");
}
sb.append("\n");
}
}
}
sb.append(">>>CheckQueueFactoryThread Trace:");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("結束線程工廠觀察器");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("\n");
logger.debug(">>>Trace Queue:" + sb.toString());
try {
Thread.sleep(30000L);
} catch (InterruptedException e) {
logger.error(">>DO Multi Data Save:", e);
}
}

}

private void init() {
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}

private void debug(String msg) {
String line = ">>batch save EcIndivAddr thread:" + msg+ " current loop count is:" + LOOP_COUNT;
logger.debug(line);
}

}

4  線程工廠類

package com.aspire.ca.prnp.db.service.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.BeanUtils;

/**
* 線程工廠
*/
public class ThreadFactory {
private static Map<String, Integer> THREAD_CONTAINER = null;

private static class ThreadFactoryHolder {
public final static ThreadFactory INSTANCE = new ThreadFactory();
}

private ThreadFactory() {
init();
}

private void init() {
if (THREAD_CONTAINER == null) {
THREAD_CONTAINER = new ConcurrentHashMap<String, Integer>();
}
}

//單列模式

public static ThreadFactory getInstance() {
return ThreadFactoryHolder.INSTANCE;
}

/**
* 啟動線程
* @param clazz
* @param size
*/
public void exec(Class clazz, int size) {
Integer runThreadSize = this.getThreadSize(clazz);
int need2CreateThreadSzie = size - runThreadSize;
if (need2CreateThreadSzie > 0) {
for (int i = 0; i < need2CreateThreadSzie; i++) {
Runnable runnable = BeanUtils.instantiate(clazz);  //轉化成可執行線程,clazz目標線程
ExecuteUserCountThreadPool.getInstance().submitTask(runnable);//線程池執行線程
}
THREAD_CONTAINER.put(clazz.getName(), size);
}
}

private Integer getThreadSize(Class clazz) {
Integer size = 0;
String className = clazz.getName();
if (THREAD_CONTAINER.containsKey(className)) {
size = THREAD_CONTAINER.get(className);
}
return size;
}

}

5線程池

package com.aspire.ca.prnp.db.service.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aspire.prnp.util.ConstantUtil;
import com.aspire.webbas.configuration.config.ConfigurationHelper;

/**
* 發送下發消息報文線程池管理
*/
public class ExecuteUserCountThreadPool {

private static ExecuteUserCountThreadPool userCountThreadPool = new ExecuteUserCountThreadPool();

/**
* 線程池對象
*/
private static ExecutorService threadPool;

/**
* 單例
*/
private ExecuteUserCountThreadPool(){
int threadCount = ConfigurationHelper.getConfiguration(ConstantUtil.CONFIG_FILE_NAME_CONFIG).getInt("send_outmessage_thread_count", 500);
//初始化線程池,大小為threadCount
threadPool = Executors.newFixedThreadPool(threadCount);
}

/**
* 獲取線程池管理實例
* @return
*/
public static ExecuteUserCountThreadPool getInstance(){
return userCountThreadPool;
}

/**
* 提交任務到線程池中執行
* @param task
*/
public void submitTask(Runnable task){
threadPool.submit(task);
}

/**
* 關閉線程池,停止接收新任務
*/
public void shutdown(){
threadPool.shutdown();
}
}

6 快件接口業務線程

package com.aspire.ca.prnp.db.service.impl;

import java.util.List;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aspire.ca.prnp.domain.express.status.RealIDCheckRecord;
import com.aspire.ca.prnp.interfaces.IExpressDeliveryStatus;
import com.aspire.ca.prnp.service.PrnpKernelServer;

/**
* 快件接口業務線程
*/
public class InsertDealRealIDCheckRecordThread implements Runnable {

private static Logger logger = LoggerFactory.getLogger(InsertDealRealIDCheckRecordThread.class);
private QueueFactory queueFactory = null;
private IExpressDeliveryStatus<RealIDCheckRecord, Boolean> service = null;  //快件接口類
private static Long LAST_OPERATE_DB_TIME = null;
private static Integer LOOP_COUNT = 0;  //本線程計數器
private static Long TIME_STEP=Constants.REFRESH_DB_TIME_STEP;

public InsertDealRealIDCheckRecordThread() {
init();
}
@Override
public void run() {
while (true) {
long timeStep = System.currentTimeMillis() - LAST_OPERATE_DB_TIME;
if (queueFactory.getSize(InsertDealRealIDCheckRecordThread.class) > 1 || timeStep > TIME_STEP) {

//這個位置得到實體參數時,是同步的,從消費隊列中取一個數據進行消費
List<RealIDCheckRecord> realIDCheckRecordList = queueFactory.get(InsertDealRealIDCheckRecordThread.class,1);
if (CollectionUtils.isNotEmpty(realIDCheckRecordList)) {
try {
service.expressStatusInfo(realIDCheckRecordList.get(0));   //快件接口業務
logger.debug("InsertDealRealIDCheckRecordThread消費kafka取出的快件狀態消息");
} catch (Exception e) {
logger.error(">>DO Multi Data Save:", e);
queueFactory.add(InsertDealRealIDCheckRecordThread.class,realIDCheckRecordList);
} finally {
LAST_OPERATE_DB_TIME = System.currentTimeMillis();

}
}
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
logger.error(">>DO Multi Data Save:", e);
}
LOOP_COUNT++;
}

}
private void init() {
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
service = (IExpressDeliveryStatus<RealIDCheckRecord, Boolean>) PrnpKernelServer.getCtx().getBean("expressDeliveryStatusImpl2");
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}

private void debug(String msg) {
String line = ">>batch save EcIndivAddr thread:" + msg+" current loop count is:"+LOOP_COUNT;
logger.debug(line);
}

}

7 快件接口 這里不貼了,可以參考流程圖中線程1、線程2、線程3、線程4相關隊列和線程操作;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM