【MINA】用mina做業務服之間的通信,實現業務負載均衡思路


學習mina目的還是搭建通信架構,學完mina我們了解了如何實現客戶端和服務端,也就是一個正常channel我們是知道怎么建立的

但是問題是,我們應用環境通信分為兩種

1.前后端通信


 

其實這個比較好實現,提供一個mina server端,供前端語言通過socket建連接就行,這個通信就算是ok了,編解碼等通信解析的細節這里不講了

以前的游戲服務端架構業務多用短連接,聊天用長連接,聊天的部分其實就是上面表述的情況

現在是長連接的天下,聊天依舊是長連接,業務也做成長連接,實現了真正意義上的長連接游戲架構,這其實就表述了一種當下典型架構,

就是后端提供兩個開放的通信端口【即兩個mina server】,供前端的socket連接,一個負責聊天,登錄,注冊,另一個負責其他業務,這樣就實現了協議通信的負載均衡

2.后端的業務服通信【這是本文的重點】


 

那么后端的業務就不需要負載均衡嗎?比如job,異步更新db,活動副本等

當然也是需要的,怎么做那,先拿1中的做個解釋

                         mainserevr[聊天,登錄,注冊]---nodeserver[其他業務]

這兩個mina sever端已經建立起來了,但是兩個server之間還不能通信,我們有兩個選擇,要么在mainserevr上起個mina client去連nodeserver,要么在nodeserver

上起個mina client去連mainserevr,思路肯定是這樣的,一旦這個通道建立了,其實互為server和client的,會有一個iosession被通道持有,只要有這個iosession,

就可以主動write,當然對於通道的另一端可以response,也可以通過取得iosession來主動寫

實現方式,我們在nodeserevr上提供一個mainserverClient這樣一個spring的bean去連接mainserver,這樣在nodeserver上就可以向mainserevr發消息了

 

3.帶着這個思路設計一下


 

我把游戲中的業務分為

     public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node
	public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server
	public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server
	public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 異步DB
	public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活動
	public static final String SERVER_TYPE_OTHER_STR = "other";// 其他
	public static final String SERVER_TYPE_GM_STR = "GM";//管理端

 

每次啟動一種server時,首先啟動一次mina serevr,然后啟動多個mina client去連接其他的mina server,

比如啟動nodeserevr 服務端,然后啟動多個client分別連接mainserevr,jobserevr等的服務端,這樣我就可以

在nodeserver上給其他業務serevr發請求了,具體啟動哪些client看需要

 

搞一個啟動server類型的方法

public static ClassPathXmlApplicationContext start(String serverTypeStr) {
        try {
                        //關閉連接池的鈎子線程
            ProxoolFacade.disableShutdownHook();
                        //spring 的核心配置文件
            String xmlFile = "applicationContext.xml";

            ....
            log.info("啟動 {} server................", serverTypeName);

            // 設置到系統環境變量
            System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");
            System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,
                    serverTypeName);

            // final ClassPathXmlApplicationContext parent = new
            // ClassPathXmlApplicationContext(
            // xmlFile);
            String fileName = null;

              //這是把spring的住配置文件拆分了一部分內容出來,目前是只加載本server需要的bean
            if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {
                fileName = "wolf/app_nodeserver.xml";
            } else {
                fileName = "wolf/app_server.xml";
            }

            //手動啟動spring
            final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    new String[] { xmlFile, fileName });

            if (context != null) {
                ServiceLocator.getInstance().setApplicationContext(context);
            }

            // 啟動socket server
            final WolfServer server = (WolfServer) ServiceLocator
                    .getSpringBean("wolf_server");
            server.setServerType(serverType);
                        //這個調用就是我們熟悉的啟動mina server端
            server.start();

            //這個動用做兩件事,選區需要的serevr類型建立mina client連接
            startClient(server);

                        //鈎子線程用來監聽應用停止,為了做停止時的后續處理
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                public void run() {
                    _shutdown();
                }
            }, "shutdownHookThread"));

              //為了支持web,springMVC,內置一個web server
            if (NodeSessionMgr.SERVER_TYPE_MAIN_STR
                    .equalsIgnoreCase(serverTypeStr)) {
                JettyServer jettyServer = (JettyServer) ServiceLocator
                        .getSpringBean("jettyServer");
                jettyServer.start();
            }

            log.info("start {} end................", serverTypeName);
            return context;

        } catch (Exception e) {
            e.printStackTrace();
            shutdown();
        } finally {

        }
        return null;
    }

 

在看下startClient(server);

private static void startClient(WolfServer server) {
        // asyncdbServer只會被連接,不會主動連接其他server
                // 這部分目的是過濾那些不需要主動連比人的serevr,比武我這里的異步db,和活動服
        if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB
                || server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {
            return;
        }

        // 發送game Server ip port到mainserver
        Map<String, Object> params = new HashMap<String, Object>();
        params.put("nodeServerIp", server.getIp());
        params.put("nodeServerPort", server.getPort());
        params.put("serverType", server.getServerType());

        //我需要mainserevr的client,就弄個bean在本服
        final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator
                .getSpringBean("mainServerClient");

        //這個位置其實就是mina的client連server端
        mainServerClient.init();
        Object localAddress = mainServerClient.registerNode(params);

                
         //同上,需要jobserevr的client
        final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator
                .getSpringBean("jobServerClient");
        if (jobServerClient != null) {
            jobServerClient.init();
            Map<String, Object> params1 = new HashMap<String, Object>();
            params1.putAll(params);
            jobServerClient.registerNode(params1);
        }
        // }

        .....

    }

 

再看下WolfClientService.init()

public void init() {
        if (start)
            return;
        if (wolfClient == null) {
            log.error("wolf client is null");
            return;
        }
         //mina 的client 連接 mina server
        wolfClient.start();
        if (wolfClient.isConnected())
            start = true;
    }

 

再看下wolfclient.start()

/**
     * 連接一個服務器,並指定處理接收到的消息的處理方法
     * 
     */
    public void start() {
        // this.context.put("resultMgr", this.resultMgr);

        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_9"), processorNum);
        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_0"), corePoolSize);
        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_4"), maxPoolSize);

        if (this.serverIp == null || this.serverIp.equals("")) {
            logger.error(clientName + "沒有配置serverIp,不啟動.........");
            return;
        }
        String threadPrefix = clientName + "[" + this.serverIp + ":"
                + this.serverPort + "]";
        // exector = Executors.newCachedThreadPool(new
        // NamingThreadFactory(threadPrefix));
        processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,
                processorNum);

        // connector = new NioSocketConnector((Executor) exector, processor);
        connector = new NioSocketConnector(processor);

        // connector.getSessionConfig().setReuseAddress(true);
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();

        if (useLogFilter == 2) {
            chain.addLast("logging", new LoggingFilter());
        }
        // codec filter要放在ExecutorFilter前,因為讀寫同一個socket connection的socket
        // buf不能並發(事實上主要是讀,寫操作mina已經封裝成一個write Queue)
        chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 設置編碼過濾器

        // 添加心跳過濾器,客戶端只接受服務端的心跳請求,不發送心跳請求
        // connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);
        // 這里的KeepAliveFilter必須在codec之后,因為KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,則可以在codec之前
        // KeepAliveFilter到底在ExecutorFilter之前好還是之后好,我也不確定
        KeepAliveFilter filter = new KeepAliveFilter(
                new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),
                IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),
                keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,
                30);
        chain.addLast("ping", filter);

        // 添加執行線程池
        executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(
                        threadPrefix));

        // 這里是預先啟動corePoolSize個處理線程
        executor.prestartAllCoreThreads();

        chain.addLast("exec", new ExecutorFilter(executor,
                IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
                IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
                IoEventType.SESSION_OPENED));

        if (useWriteThreadPool) {
            executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,
                    maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                    new NamingThreadFactory(threadPrefix + "write"));
            executorWrite.prestartAllCoreThreads();
            chain.addLast("execWrite", new ExecutorFilter(executorWrite,
                    IoEventType.WRITE, IoEventType.MESSAGE_SENT));

        }
        // ,logger.isDebugEnabled() ? new
        // LoggingIoEventQueueHandler("execWrite") : nulls

        // 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log
        // 可以配置在ExecutorFilter之后:是為了在工作線程中打印log,不是在NioProcessor中打印
        if (useLogFilter == 1) {
            chain.addLast("logging", new LoggingFilter());
        }

        connector.setHandler(handler);

        connector.getSessionConfig().setReuseAddress(true);
        connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);
        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_1")
                + serverIp + ":" + serverPort);
        ConnectFuture cf = null;

        long start = System.currentTimeMillis();
        while (true) {
                        //這地很關鍵,是個無線循環,每10秒連接一次,直到可以和服務端建立連接,否則一支循環下去
            cf = connector.connect(serverAddress);// 建立連接
            cf.awaitUninterruptibly(10000L);
            if (!cf.isConnected()) {
                if ((System.currentTimeMillis() - start) > timeout) {
                    throw new RuntimeException(
                            com.youxigu.dynasty2.i18n.MarkupMessages
                                    .getString("WolfClient_5")
                                    + serverIp + ":" + serverPort);
                }
                if (cf.getException() != null) {
                    logger.error(com.youxigu.dynasty2.i18n.MarkupMessages
                            .getString("WolfClient_6"), serverIp + ":"
                            + serverPort, cf.getException().getMessage());
                }
                try {
                    Thread.sleep(10000);
                } catch (Exception e) {
                }

                continue;
            }

                        //這就是終極目標了,我們的目的就是在serevr的客戶端的bean里,可以拿到這個iosession
            this.setSession(cf.getSession());

            logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                    .getString("WolfClient_10")
                    + serverIp + ":" + serverPort);
            shutDown = false;
            if (handler instanceof WolfMessageChain) {
                WolfMessageChain wmc = WolfMessageChain.class.cast(handler);
                wmc.init(context);
            }

            break;
        }

    }

 

這樣后端的業務通信網就可以輕松的建立起來,之后想怎么通信就看你的了  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM