
運行環境:
-
JDK 8+
-
Maven 3.0+
-
Redis
技術棧:
-
SpringBoot 2.0+
-
Redis (Lettuce客戶端,RedisTemplate模板方法)
-
Netty 4.1+
-
MQTT 3.1.1
IDE:
-
IDEA或者Eclipse
-
Lombok插件
簡介
近年來,物聯網高歌猛進,美國有“工業互聯網”,德國有“工業4.0”,我國也有“中國制造2025”,這背后都是雲計算、大數據。據波士頓咨詢報告,單單中國制造業,雲計算、大數據、人工智能等新技術就能為其帶來高達6萬億的額外附加值。
國內外巨頭紛紛駐足工業互聯網,國外如亞馬遜AWS、微軟Azure,國內則是三大電信運營商、百度雲、華為、金山雲等,其中騰訊雲、阿里雲最甚,還拉來了傳統制造大佬,國內巨頭紛紛在物聯網上布局。在2018雲棲-深圳峰會上,阿里巴巴資深副總裁,阿里雲總裁胡曉明宣布阿里巴巴將正式進軍IoT。胡曉明表示,IoT是阿里巴巴集團繼電商、金融、物流、雲計算之后的一條新的主賽道。
IOT技術窺探
以上這些內容,作者作為一個開發人員,並不是一個投資人員和創業先鋒。並不太關系這些具體細節。我所關心的是如何用技術去實現或者模擬一個支持百萬鏈接的IOT服務器,並不嚴謹,僅做大家參考。
關於為什么選用下圖的中間件或者對MQTT不太了解的話,可以閱讀我之前的2篇文章:
技術輪廓圖


快速入門
運行測試
- git clone https://github.com/sanshengshui/netty-learning-example
- cd netty-iot
- 運行 NettyIotApplication
- 打開 http://localhost:8080/groza/v1/123456/auth,獲取密碼!


- 另起一個Eclipse Paho,訂閱隨意主題,例如test。另一個Eclipse Paho發布主題test。即可收到消息。
- 取消主題訂閱,再次發布消息。就收不到消息。

有了前面2篇文章的鋪墊並學習了MQTT V3.1.1 協議,說了那么多,手癢癢的很。
You build it, You run it!
項目結構介紹
netty-iot ├── auth -- 認證 ├── service -- 用戶名,密碼認證實現類 ├── util -- 認證工具類 ├── common -- 公共類 ├── auth -- 用戶名,密碼認證接口 ├── message -- 協議存儲實體及接口類 ├── session -- session存儲實體及接口類 ├── subscribe -- 訂閱存儲實體及接口類 ├── config -- Redis配置 ├── protocol -- MQTT協議實現 ├── server -- MQTT服務器 ├── store -- Redis數據存儲 ├── cache ├── message ├── session ├── subscribe ├── web -- web服務 ├── NettyIotApplication -- 服務啟動類
Redis
安裝
體驗 Redis 需要使用 Linux 或者 Mac 環境,如果是 Windows 可以考慮使用虛擬機。主要方式有四種:
-
使用 Docker 安裝。
-
通過 Github 源碼編譯。
-
直接安裝 apt-get install(Ubuntu)、yum install(RedHat) 或者 brew install(Mac)。
-
如果讀者懶於安裝操作,也可以使用網頁版的 Web Redis 直接體驗。
具體操作如下:
Docker 方式
# 拉取 redis 鏡像 > docker pull redis # 運行 redis 容器 > docker run --name myredis -d -p6379:6379 redis # 執行容器中的 redis-cli,可以直接使用命令行操作 redis > docker exec -it myredis redis-cli...
Github 源碼編譯方式
# 下載源碼 > git clone --branch 2.8 --depth 1 git@github.com:antirez/redis.git > cd redis # 編譯 > make > cd src # 運行服務器,daemonize表示在后台運行 > ./redis-server --daemonize yes # 運行命令行 > ./redis-cli...
直接安裝方式
# mac > brew install redis # ubuntu > apt-get install redis # redhat > yum install redis # 運行客戶端 > redis-cli
使用
Spring Boot除了支持常見的ORM框架外,更是對常用的中間件提供了非常好封裝,隨着Spring Boot2.x的到來,支持的組件越來越豐富,也越來越成熟,其中對Redis的支持不僅僅是豐富了它的API,更是替換掉底層Jedis的依賴,取而代之換成了Lettuce(生菜),大家可以參考這篇文章對工程進行配置。所以我使用Lettuce作為客戶端來對我的MQTT協議傳輸的消息進行緩存。
下列的是Redis所對應的操作方式
-
opsForValue: 對應 String(字符串)
-
opsForZSet: 對應 ZSet(有序集合)
-
opsForHash: 對應 Hash(哈希)
-
opsForList: 對應 List(列表)
-
opsForSet: 對應 Set(集合)
-
opsForGeo: 對應 GEO(地理位置)
我主要使用opsForValue,opsForHash和opsForZSet,對於字符串。我推薦使用StringRedisTemplate。
以下對於opsForValue和opsForHash的基礎操作,我在這里簡短的講解一下。
Redis的Hash數據機構
Redis的散列可以讓用戶將多個鍵值對存儲到一個Redis鍵里面。 public interface HashOperations<H,HK,HV> HashOperations提供一系列方法操作hash:
java > template.opsForHash().put("books","java","think in java");
redis-cli > hset books java "think in java" # 命令行的字符串如果包含空格,要用引號括起來
(integer) 1
------
java > template.opsForHash().put("books","golang","concurrency in go");
redis-cli > hset books golang "concurrency in go"
(integer) 1
------
java > template.opsForHash().put("books","python","python cookbook");
redis-cli > hset books python "python cookbook"
(integer) 1
------
java > template.opsForHash().entries("books")
redis-cli > hgetall books # entries(),key 和 value 間隔出現
1) "java"
2) "think in java"
3) "golang"
4) "concurrency in go"
5) "python"
6) "python cookbook"
------
java > template.opsForHash().size("books")
redis-cli > hlen books
(integer) 3
------
java > template.opsForHash().get("redisHash","age")
redi-cli > hget books java
"think in java"
------
java >
Map<String,Object> testMap = new HashMap();
testMap.put("java","effective java");
testMap.put("python","learning python");
testMap.put("golang","modern golang programming");
template.opsForHash().putAll("books",testMap);
redis-cli > hmset books java "effective java" python "learning python" golang "modern golang programming" # 批量 set
OK...
Redis的Set數據結構
Redis的Set是string類型的無序集合。集合成員是唯一的,這就意味着集合中不能出現重復的數據。 Redis 中 集合是通過哈希表實現的,所以添加,刪除,查找的復雜度都是O(1)。
java > template.opsForSet().add("python","java","golang")
redis-cli > sadd books python java golang
(integer) 3
------
java > template.opsForSet().members("books")
redis-cli > smembers books # 注意順序,和插入的並不一致,因為 set 是無序的
1) "java"
2) "python"
3) "golang"
------
java > template.opsForSet().isMember("books","java")
redis-cli > sismember books java # 查詢某個 value 是否存在,相當於 contains(o)
(integer) 1
------
java > template.opsForSet().size("books")
redis-cli > scard books # 獲取長度相當於 count()
(integer) 3
------
java > template.opsForSet().pop("books")
redis-cli > spop books # 彈出一個
"java"...
MQTT
MQTT是一種輕量級的發布/訂閱消息傳遞協議,最初由IBM和Arcom(后來成為Eurotech的一部分)於1998年左右創建。現在,MQTT 3.1.1規范已由OASIS聯盟標准化。
客戶端下載


對於MQTT客戶端,我選用Eclipse Paho,Eclipse Paho項目提供針對物聯網(IoT)的新的,現有的和新興的應用程序的MQTT和MQTT-SN消息傳遞協議的開源客戶端實現。具體下載地址,大家根據自己的操作系統自行下載。
MQTT控制報文
├── Connect -- 連接服務端 ├── DisConnect -- 斷開連接 ├── PingReq -- 心跳請求 ├── PubAck -- 發布確認 ├── PubComp -- 發布完成(QoS2,第散步) ├── Publish -- 發布消息 ├── PubRec -- 發布收到(QoS2,第一步) ├── PubRel -- 發布釋放(QoS2,第二步) ├── Subscribe -- 訂閱主題 ├── UnSubscribe -- 取消訂閱
Connect
讓我們對照着MQTT 3.1.1協議來實現客戶端Connect協議。
-
當我們對消息解碼時,如果協議名不正確服務端可以斷開客戶端的連接,按照本規范,服務端不能繼續處理CONNECT報。
-
服務端使用客戶端標識符 (ClientId) 識別客戶端。連接服務端的每個客戶端都有唯一的客戶端標識符(ClientId)。
// 消息解碼器出現異常 if (msg.decoderResult().isFailure()) { Throwable cause = msg.decoderResult().cause(); if (cause instanceof MqttUnacceptableProtocolVersionException) { // 不支持的協議版本 MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; } else if (cause instanceof MqttIdentifierRejectedException) { // 不合格的clientId MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; } channel.close(); return; }
-
clientId為空或null的情況, 這里要求客戶端必須提供clientId, 不管cleanSession是否為1, 此處沒有參考標准協議實現
if (StrUtil.isBlank(msg.payload().clientIdentifier())) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; }
-
用戶名和密碼驗證, 這里要求客戶端連接時必須提供用戶名和密碼, 不管是否設置用戶名標志和密碼標志為1, 此處沒有參考標准協議實現
String username = msg.payload().userName(); String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8); if (!grozaAuthService.checkValid(username,password)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; }
-
如果會話中已存儲這個新連接的clientId, 就關閉之前該clientId的連接
if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){ SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier()); Channel previous = sessionStore.getChannel(); Boolean cleanSession = sessionStore.isCleanSession(); if (cleanSession){ grozaSessionStoreService.remove(msg.payload().clientIdentifier()); grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier()); grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier()); grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier()); } previous.close(); }
-
處理遺囑信息
SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null); if (msg.variableHeader().isWillFlag()){ MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0), new MqttPublishVariableHeader(msg.payload().willTopic(),0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()) ); sessionStore.setWillMessage(willMessage); }
-
處理連接心跳包
if (msg.variableHeader().keepAliveTimeSeconds() > 0){ if (channel.pipeline().names().contains("idle")){ channel.pipeline().remove("idle"); } channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f))); }
至此存儲會話消息及返回接受客戶端連接 將clientId存儲到channel的map中
-
grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore); channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier()); Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession(); MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent), null ); channel.writeAndFlush(okResp);
-
如果cleanSession為0, 需要重發同一clientId存儲的未完成的QoS1和QoS2的DUP消息
if (!msg.variableHeader().isCleanSession()){ List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier()); List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier()); dupPublishMessageStoreList.forEach(dupPublishMessageStore -> { MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0), new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()) ); channel.writeAndFlush(publishMessage); }); dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> { MqttMessage pubRelMessage = MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0), MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null ); channel.writeAndFlush(pubRelMessage); }); }
其他MQTT報文大家對照着工程並對照着MQTT v3.1.1自行查看!
用戶名密碼認證
/** * 用戶名和密碼認證服務 * @author 穆書偉 */ @Service public class AuthServiceImpl implements GrozaAuthService { private RSAPrivateKey privateKey; @Override public boolean checkValid(String username, String password) { if (StringUtils.isEmpty(username)){ return false; } if (StringUtils.isEmpty(password)){ return false; } RSA rsa = new RSA(privateKey,null); String value = rsa.encryptBcd(username, KeyType.PrivateKey); return value.equals(password) ? true : false; } @PostConstruct public void init() { privateKey = IoUtil.readObj(AuthServiceImpl.class.getClassLoader().getResourceAsStream("keystore/auth-private.key")); } }
其他
關於Netty實現高性能IOT服務器(Groza)之精盡代碼篇中詳解到這里就結束了。
原創不易,如果感覺不錯,希望給個推薦!您的支持是我寫作的最大動力!
下文會帶大家推進Netty實現MQTT協議的IOT服務器。
版權聲明:
作者:穆書偉
博客園出處:https://www.cnblogs.com/sanshengshui
github出處:https://github.com/sanshengshui
個人博客出處:

