Jetlinks自定義協議開發
Jetlinks社區QQ群2021514 www.jetlinks.cn
自定義協議開發
環境准備和開發工具
- JDK:1.8+
- MAVEN:3.1+ (注意不要使用全局倉庫配置,可能導致依賴無法下載)
- 開發工具:idea
第一步 通過idea創建maven工程:demo-protocol
第二步 修改pom文件,添加依賴
<dependencies>
// jetlinks 核心依賴
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-core</artifactId>
<version>1.0.2-BUILD-SNAPSHOT</version>
</dependency>
// jetlinks 協議解析接口包
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.0.2-BUILD-SNAPSHOT</version>
</dependency>
// lombok,需要idea安裝lombok插件,否則去掉
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
// vertx核心包,可以用來進行網絡模擬測試
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.8.3</version>
<scope>test</scope>
</dependency>
// 單元測試包
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
// logback日志
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
// netty組件
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
- 修改maven pom文件properties
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<netty.version>4.1.45.Final</netty.version>
</properties>
- 添加maven編譯規則
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${project.build.jdk}</source>
<target>${project.build.jdk}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
- 添加hsweb私服和阿里雲倉庫
<repositories>
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>http://nexus.hsweb.me/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
<repository>
<id>aliyun-nexus</id>
<name>aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
第三步 協議開發
-
新建packag:org.jetlinks.demo.protocol
-
創建協議編碼解碼類:DemoDeviceMessageCodec
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceOnlineMessage; import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.wt.protocol.message.NbIotMessage; import org.jetlinks.wt.protocol.message.data.enums.DataIdEnum; import org.jetlinks.wt.protocol.message.enums.ControlEnum; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @AllArgsConstructor @Slf4j public class DemoDeviceMessageCodec implements DeviceMessageCodec { // 傳輸協議定義 @Override public Transport getSupportTransport() { return DefaultTransport.TCP; } // 把tcp消息解碼為平台消息,多用於設備上報消息到平台 @Override public Mono<? extends Message> decode(MessageDecodeContext context) { return Mono.empty(); } // 把平台消息編碼為協議傳輸消息,多用於平台命令下發到設備 @Override public Publisher<? extends EncodedMessage> encode(MessageEncodeContext context) { retrun Mono.empty(); } }
-
創建協議入口類: DemoProtocolSupportProvider
import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.Value; import org.jetlinks.core.defaults.CompositeProtocolSupport; import org.jetlinks.core.device.AuthenticationResponse; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.MqttAuthenticationRequest; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.demo.protocol.tcp.DemoTcpMessageCodec; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import reactor.core.publisher.Mono; public class DemoProtocolSupportProvider implements ProtocolSupportProvider { @Override public Mono<? extends ProtocolSupport> create(ServiceContext context) { CompositeProtocolSupport support = new CompositeProtocolSupport(); // 協議ID support.setId("demo-v1"); // 協議名稱 support.setName("演示協議v1"); // 協議說明 support.setDescription("演示協議"); // 物模型編解碼,固定為JetLinksDeviceMetadataCodec support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); //TCP消息編解碼器 DemoDeviceMessageCodec codec = new DemoDeviceMessageCodec(); // 兩個參數,協議支持和編解碼類DemoDeviceMessageCodec中保持一致,第二個參數定義使用的編碼解碼類 support.addMessageCodecSupport(DefaultTransport.TCP, () -> Mono.just(codec)); return Mono.just(support); } }
第四步 設備上報消息解碼
本部分代碼全部寫在DemoDeviceMessageCodec.decode方法中
@AllArgsConstructor
@Slf4j
public class DemoTcpMessageCodec implements DeviceMessageCodec {
....
// 把tcp消息解碼為平台消息,多用於設備上報消息到平台
@Override
public Mono<? extends Message> decode(MessageDecodeContext context) {
return Mono.defer(() -> {
// 消息上下文
FromDeviceMessageContext ctx = ((FromDeviceMessageContext) context);
// 從上下文中獲取消息字節數組
ByteBuf byteBuf = context.getMessage().getPayload();
byte[] payload = ByteBufUtil.getBytes(byteBuf, 0, byteBuf.readableBytes(), false);
// 把字節流轉換為字符串,根據不同設備不同協議進行解析,
String text=new String(payload);
ReportPropertyMessage message.setProperties(properties); = new ReportPropertyMessage();
// 設置消息ID為我們獲得的消息內容
message.setDeviceId(text);
// 以當前時間戳為消息時間
long time=LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
message.setTimestamp(time);
// 構造上報屬性
Map<String, Object> properties = new HashMap<>();
properties.put("text",text);
// 設置上報屬性
message.setProperties(properties);
// 獲取設備會話信息
DeviceSession session = ((FromDeviceMessageContext) context).getSession();
// 如果session中沒有設備信息,則為設備首次上線
if (session.getOperator() == null) {
DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
onlineMessage.setDeviceId(text);
onlineMessage.setTimestamp(System.currentTimeMillis());
// 返回到平台上線消息
return Flux.just(message,onlineMessage);
}
// 返回到平台屬性上報消息
return Mono.just(message);
});
}
.....
}
## 第五步 平台發送消息到設備(消息編碼)
本部分代碼全部寫在DemoDeviceMessageCodec.encode方法中
~~~java
@AllArgsConstructor
@Slf4j
public class DemoDeviceMessageCodec implements DeviceMessageCodec {
..........
// 把平台消息編碼為協議傳輸消息,多用於平台命令下發到設備
@Override
public Publisher<? extends EncodedMessage> encode(MessageEncodeContext context) {
// 從平台消息上下文中獲取消息內容
Message message = context.getMessage();
EncodedMessage encodedMessage=EncodedMessage.simple(Unpooled.wrappedBuffer(toBytes());) = null;
// 根據消息類型的不同,構造不同的消息
if (message instanceof ReadPropertyMessage) {
ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage) message;
// 獲取需要傳輸的字節
byte[] bytes=readPropertyMessage.toString().getBytes();
// 構造為平台傳輸到設備的消息體
encodedMessage=EncodedMessage.simple(Unpooled.wrappedBuffer(bytes));
}
...
retrun encodedMessage != null ? Mono.just(encodedMessage) : Mono.empty();
}
}
第六步 打成jar包上傳到平台,並進行調試
進行協議上傳和 配置,並使用模擬器進行調試。