搭建服務器處理系統(基於netty)-我們到底能走多遠系列(25)


----20140310更新start---------

注:demo源碼下載地址:http://pan.baidu.com/s/1qWnVsnU

----20140310更新end---------

推薦:

  google rest 一個不錯的http測試應用,google瀏覽器可以使用。做接口什么的,很有幫助。親,還不快了解一下。

扯淡:

  現在我算是進入一個能帶着你向前走的團隊,但是產品設計太扯淡,互聯網應用,開發周期異常的短,也高估了開發的能力,趕進度的開發bug很多啊。

  如果開發只是完成任務的行動,是不會感到痛苦的。所以說:想要做好產品的開發,痛苦才剛剛開始。原因就是開發無法左右產品的設計.....

 

主題:

  時刻關注排行的朋友注意啦,你們都得了排行強迫症啦,趕快找個護士就醫吧。

  一個排行.....(我需要護士)

關於排名的詳細:摸我

好吧,據說netty排在第一,那就學習一下吧!

更具公司很久以前的一個服務器框架代碼,不斷刪減,然后得到一個很簡單的服務器框架,分享一下。

自己畫的流程圖,流程比較簡單,這方面比較弱,不太會用圖表達:

 

 

1,啟用netty

我們需要監聽端口,這樣就可以處理連接上來的tcp消息了。這一步netty用java 的NIO和OIO都封裝了,我們自然選擇NIO啦。

一下是啟動服務器的代碼:對於這個啟動,你只要學習一下netty的手冊例子,就馬上明白了,它手冊的例子也很好,建議大家看看。

public class Start {

    public static void main(String[] args) {
        //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml");
        System.out.println("=============show time!=============");
        initNetty();
    }
    private static final int tcpSendBufferSize = 32768;
    private static final int tcpReceiveBufferSize = 32768;
    
    // 初始化端口的監聽
    public static void initNetty(){
        InetSocketAddress addr = new InetSocketAddress(8989);//需要監聽的端口,即tcp連接建立的端口
        //Executors.newCachedThreadPool()的解釋:
        //緩沖線程執行器,產生一個大小可變的線程池。
        //當線程池的線程多於執行任務所需要的線程的時候,對空閑線程(即60s沒有任務執行)進行回收;
        //當執行任務的線程數不足的時候,自動拓展線程數量。因此線程數量是JVM可創建線程的最大數目。
        ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. 
        //  Creates a new group with a generated name.
        DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup");
        
        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
        
        // PushServerPipelineFactory作為一個ChannelPipelineFactory產生的工廠類,我們可以把需要執行的Handler進行配置
        ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels);
        // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory.
        // 服務器新連接建立的時候,新的ChannelPipeline會通過我們定義的ChannelPipelineFactory產生,其實是調用了getPipeline()方法。
        bootstrap.setPipelineFactory(pipelineFactory);
        
        if (tcpReceiveBufferSize != -1) {
            bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize);
        }
        if (tcpSendBufferSize != -1) {
            bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize);
        }
        
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("child.reuseAddress", true);
        bootstrap.setOption("child.keepAlive", false);
        bootstrap.setOption("child.tcpNoDelay", true);
        
        System.out.println(" ===================netty started=====================");
        Channel serverChannel = bootstrap.bind(addr);
        allChannels.add(serverChannel);
    }

PushServerPipelineFactory 其實就是配置了一下Handler,他叫pushServerCommandHandler,他的作用就是把接受到的信息放進receivedQueen的隊列去就好了,其實就是調用了MessageManageraddSocketMessage方法。

我們看一下他的messageReceived方法就明白了,netty是事件機制的,messageReceived是重寫的方法,只要是受到一個連接的消息,就會觸發這個方法。

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent)
            throws Exception {
        CommandMessage command = (CommandMessage) messageEvent.getMessage();
        
        if (command.message.length() > 3) {
             Channel ch = channelHandlerContext.getChannel();
             ch.write("---------message received-------------");
             // 向消息隊列里插消息包,通過handleMessage這個方法,
             // 插入的MessagePack其實已經更具消息的不同被選擇成不同的子類
             // 我覺得這是很關鍵的設計,我們的業務邏輯就可以分成不同的MessagePack子類,然后實現它的onHandler方法
             messageManager.addSocketMessage(handleMessage(command.message, messageEvent));
           
        } else {
           // logger.warn("too short message.");
        }
    }
    //重點方法
    public MessagePack handleMessage(String msg, MessageEvent e) {
        MessagePack messagePack = null;

        int fid = SjsonUtil.getFIDFromMsg(msg);
 
        switch (fid) {
        case 25: // 調用TestCategoryMsg
            messagePack = new ShowTimeMessage(msg, e.getChannel());
            break;
        case 26: // 調用不同的業務邏輯
            messagePack = new TestCategoryMsg(msg, e.getChannel());
            break;
        default:
           // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress());
        }

        return messagePack;
    }

 

PushServerPipelineFactory 除了配置好Handler,還把MessageManager啟動起來了,MessageManager是spring的配置文件中配置的。注意他的init-method,就是實例化這個bean的時候會執行它的start方法,這個比較重要,因為MessageManager就是處理消息隊列的模塊,所以他需要在服務器啟動時啟動線程池去處理消息隊列。MessageManager提供的方法就是用來維護一個叫receivedQueen的隊列。

  <bean id="messageManager" class="netty.gate.message.MessageManager" init-method="start">
  </bean> 

PushServerPipelineFactory

public class PushServerPipelineFactory implements ChannelPipelineFactory {

    private DefaultChannelGroup channelGroup;

    private final PushServerCommandHandler pushServerCommandHandler;

    private final PushServerEncoder pushServerEncoder = new PushServerEncoder();

    public PushServerPipelineFactory(DefaultChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
        
        this.pushServerCommandHandler = new PushServerCommandHandler(this.channelGroup);
        
        pushServerCommandHandler.setMessageManager((MessageManager) TaskBeanFactory.getBean("messageManager"));

    }

    public final ChannelPipeline getPipeline() throws Exception {
        return Channels.pipeline(new PushServerCommandDecoder(), pushServerCommandHandler, pushServerEncoder);
    }

    protected ChannelPipelineFactory createPushServerPipelineFactory(DefaultChannelGroup allChannels) {
        return new PushServerPipelineFactory(allChannels);
    }

}

很關鍵的MessageManager: 維護的是receivedQueen隊列

public class MessageManager {

    // MessageManager的消息隊列,下面的addSocketMessage方法就是向這個隊列塞MessagePack的
    private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>(512);

    private ExecutorService pool;

    private int reStartThreadCount = 0;


    public void start() {
        this.pool = Executors.newCachedThreadPool();
        pool.submit(new PushRecvThread());
    }

    private class PushRecvThread implements Runnable {

        public void run() {
            while (true) {
                MessagePack message = waitForProcessMessage();
                if (message != null) {
                    // 利用多態執行繼承MessagePack的子類方法
                    message.onHandler(TaskBeanFactory.getContextInstance());
                }
            }
        }
    }
    
    public MessagePack waitForProcessMessage() {
        MessagePack message = null;
        while (message == null) {
            try {
                // 從隊列中取繼承MessagePack的實例
                message = receivedQueen.poll(10, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // TODO log
            }
        }
        return message;
    }
    
    public void addSocketMessage(MessagePack message) {
        if (message != null) {
            try {
                boolean success = receivedQueen.offer(message, 15, TimeUnit.SECONDS);
                if (false == success) {
                    // maybe PushRecvThread is break,restart the thread again
                    if (reStartThreadCount < 10) {
                        pool.submit(new PushRecvThread());
                        reStartThreadCount++;
                    }
                } else {
                }
            } catch (InterruptedException e) {
                // TODO log
            }
        }
        return;
    }

正真的處理邏輯的代碼是寫在這些繼承MessagePack抽象類里的,里面的一個onHandler方法是必須實現的,所以使用了抽象類

下面代碼的onHandler中,就可以寫那些調用service層的,處理數據庫,發郵件,調用接口啊等各種需求操作了。

public class TestCategoryMsg extends MessagePack {

    private static final String MESSAGE_NAME = "TEST_MESSAGE"; // 消息名稱

    public TestCategoryMsg(String msg, Channel channel) {
        super(msg, channel);
    }

    @Override
    public void onHandler(ApplicationContext ctx) {
        channel.write("---------------i dont know why--------------");
    }

    public String getName() {
        return MESSAGE_NAME;
    }

}


到此基本上一個服務器從接受數據,到回應數據的流程已經走完了。

我想上源碼,可是看不到附件... 我表示無能為力啦 !

 

 萬水千山總是情,也許您可以支持一下,謝謝!

 

 

讓我們繼續前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不會成功。
共勉。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM