Java 消息隊列-Java並發編程 阻塞隊列


自從Java 1.5之后,在java.util.concurrent包下提供了若干個阻塞隊列,主要有以下幾個:

  ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。

  LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。

  PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。

  DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

功能說明:由於部署阿里雲的SLS日志服務,寫了個日志接口,考慮到系統日志量提交有可能會阻塞,這里引入了線程池(生存者與消費者模式)來維護日志隊列的發送。一個發送消息模塊將消息發送到消息隊列中,無需等待返回結果,發送模塊繼續執行其他任務。消息隊列中的指令由線程池中的線程來處理。使用一個Queue來存放線程池溢出時的任務

 

public class ThreadPoolManager {
	
	private static ThreadPoolManager tpm = new ThreadPoolManager(); 
	
	// 線程池維護線程的最少數量 
	private final static int CORE_POOL_SIZE = 4; 
	
	// 線程池維護線程的最大數量 
	private final static int MAX_POOL_SIZE = 10; 
	
	// 線程池維護線程所允許的空閑時間 
	private final static int KEEP_ALIVE_TIME = 0; 
	
	// 線程池所使用的緩沖隊列大小
	private final static int WORK_QUEUE_SIZE = 10; 
	
	// 消息緩沖隊列 
	Queue<String> msgQueue = new LinkedList<String>(); 
	
	// 訪問消息緩存的調度線程 
	// 查看是否有待定請求,如果有,則創建一個新的AccessDBThread,並添加到線程池中
	final Runnable accessBufferThread = new Runnable() {
		
		@Override
		public void run() {
			if(hasMoreAcquire()){
				String msg = ( String ) msgQueue.poll(); 
				Runnable task = new AccessDBThread( msg ); 
				threadPool.execute( task ); 
			}
		}
	};
	
	
	final RejectedExecutionHandler handler = new RejectedExecutionHandler(){

		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			System.out.println(((AccessDBThread )r).getMsg()+"消息放入隊列中重新等待執行"); 
			msgQueue.offer((( AccessDBThread ) r ).getMsg() ); 
		}
	};
	
	// 管理數據庫訪問的線程池 
	
	@SuppressWarnings({ "rawtypes", "unchecked" })
	final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
			CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
			TimeUnit.SECONDS,new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler);
	
	// 調度線程池 
	final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 100 ); 
	
	
	
	@SuppressWarnings("rawtypes")
	final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBufferThread, 0, 1, TimeUnit.SECONDS);

	public static ThreadPoolManager newInstance() {
		
		return tpm; 
	}
	
	private ThreadPoolManager(){} 
	
	private boolean hasMoreAcquire(){
		return !msgQueue.isEmpty(); 
	}
	
	public void addLogMsg( String msg ) {
		Runnable task = new AccessDBThread( msg ); 
		threadPool.execute( task ); 
	}
}

  


//線程池中工作的線程
public class AccessDBThread implements Runnable {

	private String msg; 
	
	public AccessDBThread() {
		super();
	}

	public AccessDBThread(String msg) {
		this.msg = msg;
	}

	public String getMsg() {
		return msg;
	}

	public void setMsg(String msg) {
		this.msg = msg;
	}

	@Override
	public void run() {
		
		// 向數據庫中添加Msg變量值 
		System.out.println("Added the message: "+msg+" into the Database"); 
	}

}

//TestDriver.java是一個驅動測試,sendMsg方法不間斷的向ThreadPoolManager發送數據
public class TestDriver {

	ThreadPoolManager tpm = ThreadPoolManager.newInstance(); 
	
	public void sendMsg( String msg ) {
		
		tpm.addLogMsg( msg + "記錄一條日志 " ); 
	}
	
	public static void main(String[] args) {
		for(int i=0;i<1000;i++){
			new TestDriver().sendMsg( Integer.toString( i ) ); 
		}
		
		//new TestDriver().sendMsg("發起一條對象" ); 
	}
}

 

參考資料:

http://www.cnblogs.com/dolphin0520/p/3958019.html
http://www.cnblogs.com/dolphin0520/p/3932906.html
http://www.360doc.com/content/11/1126/19/1332348_167587893.shtml
http://www.360doc.com/content/06/0812/16/8473_179457.shtml
http://ifeve.com/java-blocking-queue/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM