簡介
sofa-bolt是螞蟻開源的一款基於Netty的網絡通信框架。在Netty的基礎上對網絡編程常見問題進行了一層簡單封裝,讓中間件開發者更關注於中間件產品本身。
Demo 關鍵代碼
Pom依賴:
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>hessian</artifactId>
<version>3.3.6</version>
</dependency>
BoltServer端:
- 測試入口類
public class MyServer {
public static boolean start() {
/**
* 創建 RpcServer 實例,指定監聽 port
*/
RpcServer server = new RpcServer(8888);
/**
* 注冊業務邏輯處理器 UserProcessor
*/
server.registerUserProcessor(new MyServerUserProcessor());
/**
* 啟動服務端:先做 netty 配置初始化操作,再做 bind 操作
* 配置 netty 參數兩種方式:[SOFABolt 源碼分析11 - Config 配置管理的設計](https://www.jianshu.com/p/76b0be893745)
*/
return server.start();
}
public static void main(String[] args) {
if (MyServer.start()) {
System.out.println("server start success!");
} else {
System.out.println("server start fail!");
}
}
}
- 用戶自定義處理器類
/**
* 自定義的業務邏輯用戶處理器
* 注意:
* 對於所有的請求數據的類型,都必須有 UserProcessor 可以處理(感興趣),
* 否則將拋出 RpcServerException 異常,類似於 "RpcServerException:No user processor found for request: java.lang.String"
*/
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println("recive request"+request);
response.setResp("from server -> " + request.getReq());
}
return response;
}
/**
* 指定感興趣的請求數據類型,該 UserProcessor 只對感興趣的請求類型的數據進行處理;
* 假設 除了需要處理 MyRequest 類型的數據,還要處理 java.lang.String 類型,有兩種方式:
* 1、再提供一個 UserProcessor 實現類,其 interest() 返回 java.lang.String.class.getName()
* 2、使用 MultiInterestUserProcessor 實現類,可以為一個 UserProcessor 指定 List<String> multiInterest()
*/
@Override
public String interest() {
return MyRequest.class.getName();
}
}
- 響應統一封裝類
/**
* 響應統一封裝類
* 注意:必須實現 Serializable 接口,因為默認的編碼器:ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable>,
* 只對 Serializable 實現類進行編碼
*/
public class MyResponse implements Serializable {
private static final long serialVersionUID = -6215194863976521002L;
private String resp = "default resp from server";
public String getResp() {
return resp;
}
public void setResp(String resp) {
this.resp = resp;
}
@Override
public String toString() {
return resp;
}
}
BoltClient端:
- 客戶端入口類
/**
* 客戶端
*/
public class MyClient {
private static RpcClient client;
public static void start() {
// 創建 RpcClient 實例
client = new RpcClient();
// 初始化 netty 客戶端:此時還沒有真正的與 netty 服務端進行連接
client.init();
}
public static void main(String[] args) throws RemotingException, InterruptedException {
MyClient.start();
// 構造請求體
MyRequest request = new MyRequest();
request.setReq("Hello, bolt-server");
/**
* 1、獲取或者創建連接(與netty服務端進行連接),Bolt連接的創建是延遲到第一次調用進行的
* 2、向服務端發起同步調用(四種調用方式中最常用的一種)
*/
MyResponse response = (MyResponse) client.invokeSync("127.0.0.1:8888", request, 30 * 1000);
System.out.println(response);
}
}
- 請求統一封裝類
/**
* 請求統一封裝類
* 注意:必須實現 Serializable 接口,因為默認的編碼器:ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable>,
* 只對 Serializable 實現類進行編碼
*/
public class MyRequest implements Serializable {
private static final long serialVersionUID = -7242884346498114969L;
private String req;
public String getReq() {
return req;
}
public void setReq(String req) {
this.req = req;
}
@Override
public String toString() {
return req;
}
}