Java服務器端消息隊列實戰


服務端口監聽--報文接收--報文解碼--業務處理--報文編碼--寫回客戶端

從服務端與客戶端成功握手並產生一個socket后,為了提高吞吐能力,接下來的事情就可以交給多線程去處理。

為了對接入的請求做合理的限制、控制,引入消息隊列緩沖技術。

隊列,主動推送消息和被動拉取消息兩種方式實現,並且可以在兩種實現上增加自定義的策略,例如:流量控制等。

接下來將使用Java語言實現隊列與多線程整合技術的實現。

 

這里直接使用LinkedBlockingQueue隊列,自帶隊列阻塞功能,免去線程安全控制。

package hope.queue.blockdemo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * 消息隊列緩沖定義
 * @author hp
 *
 */
public class PushBlockQueue extends LinkedBlockingQueue<Object>{
    
    private static final long serialVersionUID = -8224792866430647454L;
    private static ExecutorService es = Executors.newFixedThreadPool(10);//線程池
    private static PushBlockQueue pbq = new PushBlockQueue();//單例
    private boolean flag = false;
    
    private PushBlockQueue(){}
    
    public static PushBlockQueue getInstance(){
        return pbq;
    }
    
    /**
     * 隊列監聽啟動
     */
    public void start(){
        if(!this.flag){
            this.flag = true;
        }else{
            throw new IllegalArgumentException("隊列已處於啟動狀態,不允許重復啟動.");
        }
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(flag){
                    try {
                        Object obj = take();//使用阻塞模式獲取隊列消息
                        //將獲取消息交由線程池處理
                        es.execute(new PushBlockQueueHandler(obj));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        
    }
    
    /**
     * 停止隊列監聽
     */
    public void stop(){
        this.flag = false;
    }
}

 

定義隊列處理器,這個處理器實現Runnable接口,是為了與線程池做銜接。

package hope.queue.blockdemo;
/**
 * 隊列消息處理實現
 * @author hp
 *
 */
public class PushBlockQueueHandler implements Runnable {

    private Object obj;
    public PushBlockQueueHandler(Object obj){
        this.obj = obj;
    }
    
    @Override
    public void run() {
        doBusiness();
    }
    
    /**
     * 業務處理時限
     */
    public void doBusiness(){
        System.out.println(" 處理請求 "+obj );
    }

}

 

測試實例

package hope.queue.blockdemo;

public class AppTest {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        PushBlockQueue.getInstance().start();
        for(;;){
            Thread.sleep(1000);
            PushBlockQueue.getInstance().put("0123456");
        }
    }

}

 

輸出結果

處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456
處理請求 0123456

 

這種模式可以應用到很多場景,希望對大家工作上有所幫助。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM