在游戲服務器的框架設計中,最重要的就是管理業務邏輯處理。當收到客戶端的請求消息時,服務器如何辨認這個消息是什么請求,怎么樣保證同一個用戶請求處理的順序性?怎么樣提高並發性?這些都是在底層框架中要解決的問題。這里分享一種做法,有用者取之。
1,定義消息Id
給客戶端與服務器交互的消息定義一個唯一的消息id,通過消息的id,處理對應的業務請求,比如1001 代表登陸,1002 代表簽到,1003代表抽卡,等。
2,管理消息Id與處理方法的映射
當服務器解析出客戶端的請求拿到消息號之后,怎么做相應處理呢?有的是這樣做的:
public void dispatch(int id, JSONObject param, long userId) { if (id == 1001) { // 調用相關的處理方法
} else if (id == 1002) { // 調用相關的處理方法
} else if (id == 1003) { // 調用相關的處理方法
} // 或者
switch (id) { case 1001: // 調用相關的處理方法
break; case 1002: // 調用相關的處理方法
break; case 1003: // //調用相關的處理方法
break; default: break; } }
這兩種方法不是不可以,如果是請求不多還行,如果大一點的游戲,幾十上百個請求,一個類中幾百行if或switch,看起來就心煩,再加上團隊間協作開發,多人修改導致提交git沖突等問題。
一種解決方法是,設計一個統一的接口,每個請求對應一個接口,把id和接口映射起來,放到一個map中。例如:
public class LogicManager { Map<Integer, IMessageProcess> messageMap = new HashMap<>(); public void init() { messageMap.put(1001, new LoginProcess()); //以下省略很多
} public void dispatch(int id, JSONObject param, long userId) { IMessageProcess process = messageMap.get(id); if(process != null) { process.action(userId, param); } } }
這種方式只需要每增加一個消息,增加一個接口實現即可,但是有個問題是每次都需要手動添加消息號和接口的映射,很不爽,我們可以使用注解的反射解決這個問題。
我的項目是使用spring boot管理的,所以使用了spring的特性,從容器中獲取實例,因為spring已經在啟動的時候掃描了所有注解。
先看定義的注解:
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.stereotype.Service; import com.xinyue.interview.logiccontroller.RequestCode; @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Service public @interface LogicRequestMapping { public RequestCode value() default RequestCode.DEFAULT; }
這個注解具體有spring @Service的特性。RequestCode是定義的消息號枚舉,把它添加到接口的實現類上面。
@LogicRequestMapping public class LoginProcess implements IMessageProcess{ @Override public void action(long userId, JSONObject param) { } }
然后在服務器啟動的時候,使用程序自動映射消息號和接口
package com.netty; import java.util.HashMap; import java.util.Map; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; @Service public class LogicManager { Map<Integer, IMessageProcess> messageMap = new HashMap<>(); @Autowired private ApplicationContext context; @PostConstruct public void init() { Map<String, Object> beanMap = context.getBeansWithAnnotation(LogicRequestMapping.class); beanMap.forEach((k,v)->{ LogicRequestMapping logicRequestMapping = v.getClass().getAnnotation(LogicRequestMapping.class); if(logicRequestMapping != null && v instanceof IMessageProcess) { int id = logicRequestMapping.value().getCode(); messageMap.put(id, (IMessageProcess) v); } }); } public void dispatch(int id, JSONObject param, long userId) { IMessageProcess process = messageMap.get(id); if (process != null) { process.action(userId, param); } } }
這樣增加新請求的時候,只需要增加新接口實現即可,而舊的代碼不需要修改,也沒有長長的if和switch判斷。如果覺得RequestCode也需要修改,可以去掉,換成接口,每個人可以定義自己的消息枚舉。
如果覺得一個接口處理一個消息有點浪費的話,也可以像web的controller那樣,把處理定位到方法上面。即一個方法對應一個處理的消息id.服務器啟動的時候緩存方法:

package com.xinyue.interview.dispatch; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.xinyue.interview.controller.protocol.LogicMessage; import com.xinyue.interview.exception.ServerError; import com.xinyue.interview.logiccontroller.RequestCode; import com.xinyue.interview.utils.StringUtil; @Service public class LogicRequestMappingManager { private Logger logger = LoggerFactory.getLogger(LogicRequestMappingManager.class); private Map<Integer, LogicRequestMappingInfo> requestMap = new HashMap<>(); @Autowired private ApplicationContext context; @PostConstruct public void init() { Map<String, Object> beanMap = context.getBeansWithAnnotation(LogicRequestMapping.class); beanMap.forEach((k, v) -> { Method[] methods = v.getClass().getMethods(); for (Method method : methods) { LogicRequestMapping logicRequestMapping = method.getAnnotation(LogicRequestMapping.class); if (logicRequestMapping != null) { RequestCode requestCode = logicRequestMapping.value(); if(requestCode == null) { throw new IllegalArgumentException("請求消息標記為空"); } int commandId = requestCode.getCode(); if (requestMap.containsKey(commandId)) { String msg = StringUtil.format("業務處理方法的id有重復的,重復id:{},所在類:{},方法名:{}", commandId, v.getClass().getName(), method.getName()); throw new IllegalArgumentException(msg); } LogicRequestMappingInfo logicRequestMappingInfo = new LogicRequestMappingInfo(commandId, v, method); requestMap.put(commandId, logicRequestMappingInfo); } } }); } public void callMethod(Integer commandId, LogicRequestContext ctx, JSONObject params) { LogicRequestMappingInfo requestMappingInfo = requestMap.get(commandId); if (requestMappingInfo == null) { LogicMessage logicMessage = new LogicMessage(ServerError.REQUEST_PARAM_ERROR); ctx.writeAndflush(logicMessage); logger.error("用戶{}請求的消息id:{}不存在", ctx.getUserId(), commandId); return; } try { requestMappingInfo.call(ctx, params); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { logger.error("系統異常", e); LogicMessage logicMessage = new LogicMessage(ServerError.SYSTEM_ERROR); ctx.writeAndflush(logicMessage); } } }
3,管理業務線程
要保證同一個用戶的消息處理都是順序性的,一般有幾種方式,1,加鎖,2,使用隊列,生產者消費者模式,3,在同一個線程中處理。使用鎖,在高並發的時候導致cpu上下文切換頻繁,性能下降,我們可以采用netty的方式,同一個連接的所有消息都在同一個線程中處理。一服務器處理的線程並不是越多越好,見:https://www.cnblogs.com/wgslucky/p/9749990.html 所以在處理業務的時候最好不要有io操作。那不可避免的io操作,比如查數據庫,更新數據,我們放在別的單獨線程處理。這里先說業務處理,它操作的都是內存中的數據,所以速度很快,不會卡玩家。
首先創建一個線程處理器的類,用於管業務線程數:
package com.xinyue.interview.utils.concurrent; import io.netty.util.concurrent.DefaultEventExecutor; import io.netty.util.concurrent.EventExecutor; public class LogicEventExecutorGroup { private EventExecutor[] executors; public LogicEventExecutorGroup(int nthreads) { executors = new EventExecutor[nthreads]; for (int i = 0; i < nthreads; i++) { executors[i] = new DefaultEventExecutor(); } } public EventExecutor select(long userId) { int index = (int) (userId % executors.length); return executors[index]; } }
這里根據用戶的userId,取余固定一個線程處理器,所以同一個用戶的請求都放到這個線程處理器中。
在收到消息后:

package com.xinyue.interview.dispatch; import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.xinyue.interview.controller.protocol.HeadInfo; import com.xinyue.interview.controller.protocol.LogicMessage; import com.xinyue.interview.exception.ServerError; import com.xinyue.interview.exception.ServerErrorException; import com.xinyue.interview.logic.manager.AccountManager; import com.xinyue.interview.logic.manager.EntityManagerFactory; import com.xinyue.interview.logic.manager.IEntityManager; import com.xinyue.interview.utils.concurrent.LogicEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @Service public class LogicRequestDispatch { private Logger logger = LoggerFactory.getLogger(LogicRequestDispatch.class); @Autowired private LogicRequestMappingManager logicRequestMappingManager; private LogicEventExecutorGroup executorGroup; @PostConstruct public void init() { //初始化4個線程處理器處理業務邏輯
int nthreads = 4; this.executorGroup = new LogicEventExecutorGroup(nthreads); } public void fireRead(LogicRequestContext ctx, JSONObject param) { long userId = ctx.getUserId(); EventExecutor executor = executorGroup.select(userId); executor.execute(() -> { try { // 檢測是否登陸成功
this.checkLogin(ctx.getHeadInfo()); // 調用業務方法
Integer commandId = ctx.getCommandId(); logicRequestMappingManager.callMethod(commandId, ctx, param); } catch (Throwable e) { LogicMessage response = null; if (e instanceof ServerErrorException) { ServerErrorException errorException = (ServerErrorException) e; response = new LogicMessage(errorException); } else { response = new LogicMessage(ServerError.SYSTEM_ERROR); } ctx.writeAndflush(response); } }); } private void checkLogin(HeadInfo headInfo) { long userId = headInfo.getUserId(); AccountManager accountManager = EntityManagerFactory.getManager(userId, AccountManager.class); if (accountManager == null) { ServerError.throwError(ServerError.USER_NOT_LOGIN, "userId:{}", userId); return; } String token = headInfo.getToken(); if (!accountManager.checkToken(token)) { ServerError.throwError(ServerError.USER_NOT_LOGIN, "userId:{},token錯誤", userId); return; } } /** * * <p> * Description:初始化相應的數據管理類到對應的邏輯線程中 * </p> * * @param userId * @param managers * @author wgs * @date 2018年10月19日 下午4:39:31 * */
public void initEntityManager(long userId, List<? extends IEntityManager> managers) { EventExecutor executor = executorGroup.select(userId); executor.execute(() -> { try { EntityManagerFactory.init(userId, managers); } catch (Exception e) { logger.error("初始化EntityManager出錯", e); } }); executor.scheduleAtFixedRate(() -> { EntityManagerFactory.flushDB(); }, 5, 5, TimeUnit.MINUTES); } public Future<Object> fireEvent(long selectKey, Object param, Promise<Object> promise) { return promise; } }
這樣就實現了業務處理保證在同一個線程中。