學習mina目的還是搭建通信架構,學完mina我們了解了如何實現客戶端和服務端,也就是一個正常channel我們是知道怎么建立的
但是問題是,我們應用環境通信分為兩種
1.前后端通信
其實這個比較好實現,提供一個mina server端,供前端語言通過socket建連接就行,這個通信就算是ok了,編解碼等通信解析的細節這里不講了
以前的游戲服務端架構業務多用短連接,聊天用長連接,聊天的部分其實就是上面表述的情況
現在是長連接的天下,聊天依舊是長連接,業務也做成長連接,實現了真正意義上的長連接游戲架構,這其實就表述了一種當下典型架構,
就是后端提供兩個開放的通信端口【即兩個mina server】,供前端的socket連接,一個負責聊天,登錄,注冊,另一個負責其他業務,這樣就實現了協議通信的負載均衡
2.后端的業務服通信【這是本文的重點】
那么后端的業務就不需要負載均衡嗎?比如job,異步更新db,活動副本等
當然也是需要的,怎么做那,先拿1中的做個解釋
mainserevr[聊天,登錄,注冊]---nodeserver[其他業務]
這兩個mina sever端已經建立起來了,但是兩個server之間還不能通信,我們有兩個選擇,要么在mainserevr上起個mina client去連nodeserver,要么在nodeserver
上起個mina client去連mainserevr,思路肯定是這樣的,一旦這個通道建立了,其實互為server和client的,會有一個iosession被通道持有,只要有這個iosession,
就可以主動write,當然對於通道的另一端可以response,也可以通過取得iosession來主動寫
實現方式,我們在nodeserevr上提供一個mainserverClient這樣一個spring的bean去連接mainserver,這樣在nodeserver上就可以向mainserevr發消息了
3.帶着這個思路設計一下
我把游戲中的業務分為
public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 異步DB public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活動 public static final String SERVER_TYPE_OTHER_STR = "other";// 其他 public static final String SERVER_TYPE_GM_STR = "GM";//管理端
每次啟動一種server時,首先啟動一次mina serevr,然后啟動多個mina client去連接其他的mina server,
比如啟動nodeserevr 服務端,然后啟動多個client分別連接mainserevr,jobserevr等的服務端,這樣我就可以
在nodeserver上給其他業務serevr發請求了,具體啟動哪些client看需要
搞一個啟動server類型的方法
public static ClassPathXmlApplicationContext start(String serverTypeStr) { try { //關閉連接池的鈎子線程 ProxoolFacade.disableShutdownHook(); //spring 的核心配置文件 String xmlFile = "applicationContext.xml"; .... log.info("啟動 {} server................", serverTypeName); // 設置到系統環境變量 System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + ""); System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY, serverTypeName); // final ClassPathXmlApplicationContext parent = new // ClassPathXmlApplicationContext( // xmlFile); String fileName = null; //這是把spring的住配置文件拆分了一部分內容出來,目前是只加載本server需要的bean if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) { fileName = "wolf/app_nodeserver.xml"; } else { fileName = "wolf/app_server.xml"; } //手動啟動spring final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( new String[] { xmlFile, fileName }); if (context != null) { ServiceLocator.getInstance().setApplicationContext(context); } // 啟動socket server final WolfServer server = (WolfServer) ServiceLocator .getSpringBean("wolf_server"); server.setServerType(serverType); //這個調用就是我們熟悉的啟動mina server端 server.start(); //這個動用做兩件事,選區需要的serevr類型建立mina client連接 startClient(server); //鈎子線程用來監聽應用停止,為了做停止時的后續處理 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { _shutdown(); } }, "shutdownHookThread")); //為了支持web,springMVC,內置一個web server if (NodeSessionMgr.SERVER_TYPE_MAIN_STR .equalsIgnoreCase(serverTypeStr)) { JettyServer jettyServer = (JettyServer) ServiceLocator .getSpringBean("jettyServer"); jettyServer.start(); } log.info("start {} end................", serverTypeName); return context; } catch (Exception e) { e.printStackTrace(); shutdown(); } finally { } return null; }
在看下startClient(server);
private static void startClient(WolfServer server) { // asyncdbServer只會被連接,不會主動連接其他server // 這部分目的是過濾那些不需要主動連比人的serevr,比武我這里的異步db,和活動服 if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB || server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) { return; } // 發送game Server ip port到mainserver Map<String, Object> params = new HashMap<String, Object>(); params.put("nodeServerIp", server.getIp()); params.put("nodeServerPort", server.getPort()); params.put("serverType", server.getServerType()); //我需要mainserevr的client,就弄個bean在本服 final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator .getSpringBean("mainServerClient"); //這個位置其實就是mina的client連server端 mainServerClient.init(); Object localAddress = mainServerClient.registerNode(params); //同上,需要jobserevr的client final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator .getSpringBean("jobServerClient"); if (jobServerClient != null) { jobServerClient.init(); Map<String, Object> params1 = new HashMap<String, Object>(); params1.putAll(params); jobServerClient.registerNode(params1); } // } ..... }
再看下WolfClientService.init()
public void init() { if (start) return; if (wolfClient == null) { log.error("wolf client is null"); return; } //mina 的client 連接 mina server wolfClient.start(); if (wolfClient.isConnected()) start = true; }
再看下wolfclient.start()
/** * 連接一個服務器,並指定處理接收到的消息的處理方法 * */ public void start() { // this.context.put("resultMgr", this.resultMgr); logger.info(com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_9"), processorNum); logger.info(com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_0"), corePoolSize); logger.info(com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_4"), maxPoolSize); if (this.serverIp == null || this.serverIp.equals("")) { logger.error(clientName + "沒有配置serverIp,不啟動........."); return; } String threadPrefix = clientName + "[" + this.serverIp + ":" + this.serverPort + "]"; // exector = Executors.newCachedThreadPool(new // NamingThreadFactory(threadPrefix)); processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, processorNum); // connector = new NioSocketConnector((Executor) exector, processor); connector = new NioSocketConnector(processor); // connector.getSessionConfig().setReuseAddress(true); DefaultIoFilterChainBuilder chain = connector.getFilterChain(); if (useLogFilter == 2) { chain.addLast("logging", new LoggingFilter()); } // codec filter要放在ExecutorFilter前,因為讀寫同一個socket connection的socket // buf不能並發(事實上主要是讀,寫操作mina已經封裝成一個write Queue) chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 設置編碼過濾器 // 添加心跳過濾器,客戶端只接受服務端的心跳請求,不發送心跳請求 // connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut); // 這里的KeepAliveFilter必須在codec之后,因為KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,則可以在codec之前 // KeepAliveFilter到底在ExecutorFilter之前好還是之后好,我也不確定 KeepAliveFilter filter = new KeepAliveFilter( new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0), IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(), keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval, 30); chain.addLast("ping", filter); // 添加執行線程池 executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory( threadPrefix)); // 這里是預先啟動corePoolSize個處理線程 executor.prestartAllCoreThreads(); chain.addLast("exec", new ExecutorFilter(executor, IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED, IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED)); if (useWriteThreadPool) { executorWrite = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(threadPrefix + "write")); executorWrite.prestartAllCoreThreads(); chain.addLast("execWrite", new ExecutorFilter(executorWrite, IoEventType.WRITE, IoEventType.MESSAGE_SENT)); } // ,logger.isDebugEnabled() ? new // LoggingIoEventQueueHandler("execWrite") : nulls // 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log // 可以配置在ExecutorFilter之后:是為了在工作線程中打印log,不是在NioProcessor中打印 if (useLogFilter == 1) { chain.addLast("logging", new LoggingFilter()); } connector.setHandler(handler); connector.getSessionConfig().setReuseAddress(true); connector.getSessionConfig().setTcpNoDelay(tcpNoDelay); logger.info(com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_1") + serverIp + ":" + serverPort); ConnectFuture cf = null; long start = System.currentTimeMillis(); while (true) { //這地很關鍵,是個無線循環,每10秒連接一次,直到可以和服務端建立連接,否則一支循環下去 cf = connector.connect(serverAddress);// 建立連接 cf.awaitUninterruptibly(10000L); if (!cf.isConnected()) { if ((System.currentTimeMillis() - start) > timeout) { throw new RuntimeException( com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_5") + serverIp + ":" + serverPort); } if (cf.getException() != null) { logger.error(com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_6"), serverIp + ":" + serverPort, cf.getException().getMessage()); } try { Thread.sleep(10000); } catch (Exception e) { } continue; } //這就是終極目標了,我們的目的就是在serevr的客戶端的bean里,可以拿到這個iosession this.setSession(cf.getSession()); logger.info(com.youxigu.dynasty2.i18n.MarkupMessages .getString("WolfClient_10") + serverIp + ":" + serverPort); shutDown = false; if (handler instanceof WolfMessageChain) { WolfMessageChain wmc = WolfMessageChain.class.cast(handler); wmc.init(context); } break; } }
這樣后端的業務通信網就可以輕松的建立起來,之后想怎么通信就看你的了