使用Netty4實現基本的消息分發


示例工程代碼

 

可從附件下載

 

具體的說明和用法在后面介紹

 

需求與目的

 

一個游戲服務端需要處理各種業務邏輯,每一種業務邏輯都對應着一個請求消息和一個響應消息。那么服務端需要把這些不同的消息自動分發到對應的業務邏輯中處理。

 

最簡單的處理方式就是根據請求消息中的type字段,使用switch case來進行分別處理,但這種方式隨着消息的增多,顯現了一些壞味道:長長的一大坨不太好看;如果要添加新的消息、新的邏輯,或者去掉新的消息、新的邏輯,在代碼上不但要修改這些消息和邏輯,還不得不修改這長長的一坨swtich case,這樣的修改顯得很多余。

 

所以我們的目的就是把消息分發這塊的代碼自動化,在增加、修改、刪除消息和邏輯的時候不需要再對消息分發的代碼再做修改,從而使得修改的代碼最小化。

 

實現原理

 

在實現中,使用了注解(annotation)

Java代碼   收藏代碼
  1. package com.company.game.dispatcher.annotation;  
  2.   
  3. import java.lang.annotation.ElementType;  
  4. import java.lang.annotation.Retention;  
  5. import java.lang.annotation.RetentionPolicy;  
  6. import java.lang.annotation.Target;  
  7.   
  8. /** 
  9.  * 修飾消息類和業務邏輯執行類 
  10.  * msgType指定對應的類型,從1開始計數 
  11.  * @author xingchencheng 
  12.  * 
  13.  */  
  14.   
  15. @Target(ElementType.TYPE)  
  16. @Retention(RetentionPolicy.RUNTIME)  
  17. public @interface UserMsgAndExecAnnotation {  
  18.     short msgType();  
  19. }  

 

唯一的字段msgType代表了消息類型,這是客戶端與服務端的約定,這里我們從1開始計數。

 

當我們要增加一個加法消息時,就使用這個注解來修飾我們的請求消息類:

Java代碼   收藏代碼
  1. package com.company.game.dispatcher.msg;  
  2.   
  3. import com.company.game.dispatcher.annotation.UserMsgAndExecAnnotation;  
  4.   
  5. /** 
  6.  * 加法請求消息類 
  7.  *  
  8.  * @author xingchencheng 
  9.  * 
  10.  */  
  11.   
  12. @UserMsgAndExecAnnotation(msgType = MsgType.ADD)  
  13. public class UserAddRequest extends RequestMsgBase {  
  14.   
  15.     private double leftNumber;  
  16.     private double RightNumber;  
  17.       
  18.     public UserAddRequest() {  
  19.         super(MsgType.ADD);  
  20.     }  
  21.   
  22.     public double getLeftNumber() {  
  23.         return leftNumber;  
  24.     }  
  25.   
  26.     public void setLeftNumber(double leftNumber) {  
  27.         this.leftNumber = leftNumber;  
  28.     }  
  29.   
  30.     public double getRightNumber() {  
  31.         return RightNumber;  
  32.     }  
  33.   
  34.     public void setRightNumber(double rightNumber) {  
  35.         RightNumber = rightNumber;  
  36.     }  
  37.   
  38. }  

 

為什么要這樣修飾呢?先從服務端的解碼(decode)說起,實例代碼中,一個請求消息是這樣規定的:

 

 0-1字節表示整個消息的長度(單位:字節)

 2-3字節代表消息類型,對應annotation的msgType

 余下的是消息的json字符串(UTF-8編碼)

 

我們需要根據2-3字節表示的msgType得到對應請求消息類的class對象,用這個class對象來序列化json字符串,得到具體的請求對象。那么怎么根據msgType得到class對象呢?這就是為什么要使用annotation的原因。

 

在服務端程序啟動前,會執行下面的處理:

 

Java代碼   收藏代碼
  1. // msgType->請求、響應類的class對象  
  2. private static Map<Short, Class<?>> typeToMsgClassMap;  
  3.   
  4. // 根據類型得到對應的消息類的class對象  
  5. public static Class<?> getMsgClassByType(short type) {  
  6.     return typeToMsgClassMap.get(type);  
  7. }  
  8.   
  9. /** 
  10.  * 初始化typeToMsgClassMap 
  11.  * 遍歷包com.company.game.dispatcher.msg 
  12.  * 取得消息類的class文件 
  13.  *  
  14.  * @throws ClassNotFoundException 
  15.  * @throws IOException 
  16.  */  
  17. public static void initTypeToMsgClassMap()   
  18.         throws ClassNotFoundException, IOException {  
  19.       
  20.     Map<Short, Class<?>> tmpMap = new HashMap<Short, Class<?>>();  
  21.       
  22.     Set<Class<?>> classSet = getClasses("com.company.game.dispatcher.msg");  
  23.     if (classSet != null) {  
  24.         for (Class<?> clazz : classSet) {  
  25.             if (clazz.isAnnotationPresent(UserMsgAndExecAnnotation.class)) {  
  26.                 UserMsgAndExecAnnotation annotation = clazz.getAnnotation(UserMsgAndExecAnnotation.class);  
  27.                 tmpMap.put(annotation.msgType(), clazz);  
  28.             }  
  29.         }  
  30.     }  
  31.       
  32.     typeToMsgClassMap = Collections.unmodifiableMap(tmpMap);  
  33. }  

 

程序初始化了一個映射,在指定的包找到請求的消息類的class,讀取class上的annotation,保存到一個Map中,這樣在后續就可以根據這個Map來根據msgType得到class對象了。

 

再給出解碼器的實現:

 

Java代碼   收藏代碼
  1. package com.company.game.dispatcher.codec;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.company.game.dispatcher.util.ClassUtil;  
  6. import com.company.game.dispatcher.util.GsonUtil;  
  7. import com.google.gson.Gson;  
  8.   
  9. import io.netty.buffer.ByteBuf;  
  10. import io.netty.channel.ChannelHandlerContext;  
  11. import io.netty.handler.codec.ByteToMessageDecoder;  
  12.   
  13. /** 
  14.  * 解碼器 
  15.  * 客戶端和服務端均有使用 
  16.  * 0-1字節表示整個消息的長度(單位:字節) 
  17.  * 2-3字節代表消息類型,對應annotation 
  18.  * 余下的是消息的json字符串(UTF-8編碼) 
  19.  *  
  20.  * @author xingchencheng 
  21.  * 
  22.  */  
  23.   
  24. public class MsgDecoder extends ByteToMessageDecoder {  
  25.   
  26.     @Override  
  27.     protected void decode(ChannelHandlerContext ctx, ByteBuf buf,  
  28.             List<Object> list) throws Exception {  
  29.           
  30.         if (buf.readableBytes() < 2) {  
  31.             return;  
  32.         }  
  33.           
  34.         Gson gson = GsonUtil.getGson();  
  35.         short jsonBytesLength = (short) (buf.readShort() - 2);  
  36.         short type = buf.readShort();  
  37.           
  38.         byte[] tmp = new byte[jsonBytesLength];  
  39.         buf.readBytes(tmp);  
  40.         String json = new String(tmp, "UTF-8");  
  41.           
  42.         Class<?> clazz = ClassUtil.getMsgClassByType(type);  
  43.         Object msgObj = gson.fromJson(json, clazz);  
  44.         list.add(msgObj);  
  45.     }  
  46. }  

 

解碼完成后,程序進入到服務端的handler中:

Java代碼   收藏代碼
  1. @Override  
  2. protected void channelRead0(ChannelHandlerContext ctx, Object msgObject)  
  3.         throws Exception {  
  4.       
  5.     // 分發消息給對應的消息處理器  
  6.     Dispatcher.submit(ctx.channel(), msgObject);  
  7. }  

 

Dispatcher代碼如下:

Java代碼   收藏代碼
  1. package com.company.game.dispatcher;  
  2.   
  3. import io.netty.channel.Channel;  
  4.   
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7.   
  8. import com.company.game.dispatcher.exec.BusinessLogicExecutorBase;  
  9. import com.company.game.dispatcher.msg.RequestMsgBase;  
  10. import com.company.game.dispatcher.util.ClassUtil;  
  11.   
  12. /** 
  13.  * 抽象了分發器 
  14.  * 多線程執行 
  15.  * 某個消息對象msgObject指定某個業務邏輯對象executor 
  16.  * submit到線程池中 
  17.  * @author xingchencheng 
  18.  * 
  19.  */  
  20.   
  21. public class Dispatcher {  
  22.       
  23.     private static final int MAX_THREAD_NUM = 50;  
  24.       
  25.     private static ExecutorService executorService =  
  26.             Executors.newFixedThreadPool(MAX_THREAD_NUM);  
  27.       
  28.     public static void submit(Channel channel, Object msgObject)   
  29.             throws InstantiationException, IllegalAccessException {  
  30.           
  31.         RequestMsgBase msg = (RequestMsgBase) msgObject;  
  32.         Class<?> executorClass = ClassUtil.getExecutorClassByType(msg.getType());  
  33.         BusinessLogicExecutorBase executor =   
  34.                 (BusinessLogicExecutorBase) executorClass.newInstance();  
  35.         executor.setChannel(channel);  
  36.         executor.setMsgObject(msgObject);  
  37.           
  38.         executorService.submit(executor);  
  39.     }  
  40. }  

 

我們看到,在代碼中也是根據msgType取得了對應的一個class對象,並new了一個對象出來,交給了線程池進行並發執行,這個對象就是業務邏輯處理器對象,它實現了Runnable接口,進行一些業務邏輯上的處理。根據msgType取得class對象的映射過程跟前面提到的映射原理是相同的,可以參見代碼。貼出業務邏輯處理器對象的代碼:

Java代碼   收藏代碼
  1. package com.company.game.dispatcher.exec;  
  2.   
  3. import com.company.game.dispatcher.annotation.UserMsgAndExecAnnotation;  
  4. import com.company.game.dispatcher.msg.MsgType;  
  5. import com.company.game.dispatcher.msg.UserAddRequest;  
  6. import com.company.game.dispatcher.msg.UserAddResponse;  
  7.   
  8. /** 
  9.  * 具體的業務邏輯 
  10.  * 實現加法 
  11.  *  
  12.  * @author xingchencheng 
  13.  * 
  14.  */  
  15.   
  16. @UserMsgAndExecAnnotation(msgType = MsgType.ADD)  
  17. public class UserAddExecutor extends BusinessLogicExecutorBase {  
  18.   
  19.     public void run() {  
  20.         UserAddResponse response = new UserAddResponse();  
  21.           
  22.         if (this.msgObject instanceof UserAddRequest) {  
  23.             UserAddRequest request = (UserAddRequest) this.msgObject;  
  24.             double result = request.getLeftNumber() + request.getRightNumber();  
  25.             response.setResult(result);  
  26.             response.setSuccess(true);  
  27.         } else {  
  28.             response.setSuccess(false);  
  29.         }  
  30.           
  31.         System.out.println("服務端處理結果:" + response.getResult());  
  32.         channel.writeAndFlush(response);  
  33.     }  
  34.   
  35. }  

注意,它也得用annotation來修飾。

 

思路大致就是如此,如果要增加一個請求,在示例代碼中,需要做3件事情:

  • 在MsgType添加一個類型
  • 添加請求相應消息類
  • 添加業務邏輯處理器類

而不需要修改消息分發的代碼。

 

示例項目的說明和使用

 

  • 工程可在文章開頭的github中或附件得到
  • 項目使用Maven3構建,構建的結果是一個jar,可通過命令行分別運行服務端和客戶端
  • 僅僅是個示例,並沒有過多的考慮異常處理,性能等方面
  • 沒有單元測試和其他測試

提供了命令行工具,幫助信息如下:


 

服務端啟動命令:


 

客戶端啟動命令:


 

結語

 

本文的描述未必清晰,更好的方法是直接看代碼。

關於消息分發想必還有更好的方法,這里只是拋磚引玉,希望路過的各位能提供更好的方法一起參考。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM