netty学习:UDP服务器与Spring整合(2)


上一篇文章中,介绍了netty实现UDP服务器的栗子。

本文将会对UDP服务器与spring boot整合起来,并使用RedisTemplate的操作类访问Redis和使用Spring DATA JPA链接MySQL数据库,其中会使用多线程、异步等知识。

只公布了一个框架,需要的同学可以根据此来进行扩展,增加自己需要的功能模块。如Controller部分。

本人使用的编辑器是IntelliJ IDEA 2017.1.exe版本(链接:http://pan.baidu.com/s/1pLODHm7 密码:dlx7);建议使用STS或者是idea编辑器来进行spring的学习。

 

1)项目目录结构

整个项目的目录结构如下:

 

2)jar包

其中pom.xml文件的内容如下:

只有netty-all和commons-lang3是手动加入的jar包,其余的都是创建spring boot项目时候选择组件后自动导入的。
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <groupId>com.example</groupId>
 7     <artifactId>udplearning</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10 
11     <name>udplearning</name>
12     <description>Demo project for Spring Boot</description>
13 
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>1.5.6.RELEASE</version>
18         <relativePath/> <!-- lookup parent from repository -->
19     </parent>
20 
21     <properties>
22         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
24         <commons-lang3.version>3.4</commons-lang3.version>
25         <java.version>1.8</java.version>
26     </properties>
27 
28     <dependencies>
29 
30         <!-- netty  -->
31 
32         <dependency>
33             <groupId>io.netty</groupId>
34             <artifactId>netty-all</artifactId>
35             <version>4.0.49.Final</version>
36         </dependency>
37 
38 
39         <dependency>
40             <groupId>org.apache.commons</groupId>
41             <artifactId>commons-lang3</artifactId>
42             <version>${commons-lang3.version}</version>
43         </dependency>
44 
45 
46 
47         <dependency>
48             <groupId>org.springframework.boot</groupId>
49             <artifactId>spring-boot-starter-data-jpa</artifactId>
50         </dependency>
51         <dependency>
52             <groupId>org.springframework.boot</groupId>
53             <artifactId>spring-boot-starter-data-redis</artifactId>
54         </dependency>
55         <dependency>
56             <groupId>org.springframework.boot</groupId>
57             <artifactId>spring-boot-starter-jdbc</artifactId>
58         </dependency>
59         <dependency>
60             <groupId>org.springframework.boot</groupId>
61             <artifactId>spring-boot-starter-web</artifactId>
62         </dependency>
63         <dependency>
64             <groupId>org.springframework.boot</groupId>
65             <artifactId>spring-boot-starter-web-services</artifactId>
66         </dependency>
67 
68         <dependency>
69             <groupId>mysql</groupId>
70             <artifactId>mysql-connector-java</artifactId>
71             <scope>runtime</scope>
72         </dependency>
73         <dependency>
74             <groupId>org.springframework.boot</groupId>
75             <artifactId>spring-boot-starter-test</artifactId>
76             <scope>test</scope>
77         </dependency>
78     </dependencies>
79 
80     <build>
81         <plugins>
82             <plugin>
83                 <groupId>org.springframework.boot</groupId>
84                 <artifactId>spring-boot-maven-plugin</artifactId>
85             </plugin>
86         </plugins>
87     </build>
88 
89 
90 </project>

 

3)配置文件application.properties

application.properties的内容:

1 spring.profiles.active=test
2 
3 spring.messages.encoding=utf-8
4 
5 logging.config=classpath:logback.xml
“spring.profiles.active” 针对多种启动环境的spring boot配置方法,此时启动的是test运行环境,即默认是启动application-test.properties里面的配置信息;
“spring.messages.encoding=utf-8”是指编码方式utf-8;
“logging.config=classpath:logback.xml”是指日志文件位置。

application-test.properties的内容如下:
 1 context.listener.classes=com.example.demo.init.StartupEvent
 2 
 3 #mysql
 4 spring.jpa.show-sql=true
 5 spring.jpa.database=mysql
 6 #spring.jpa.hibernate.ddl-auto=update
 7 spring.datasource.url=jdbc:mysql://127.0.0.1/test
 8 spring.datasource.username=root
 9 spring.datasource.password=123456
10 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
11 spring.datasource.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)
12 
13 spring.session.store-type=none
14 
15 # (RedisProperties)
16 spring.redis.database=3
17 spring.redis.host=127.0.0.1
18 spring.redis.port=6379
19 spring.redis.password=123456
20 spring.redis.pool.max-active=8
21 spring.redis.pool.max-wait=-1
22 spring.redis.pool.max-idle=8
23 spring.redis.pool.min-idle=0
24 spring.redis.timeout=0
25 
26 
27 #UDP消息接收打端口
28 sysfig.udpReceivePort = 7686
29 
30 #线程池
31 spring.task.pool.corePoolSize = 5
32 spring.task.pool.maxPoolSize = 100
33 spring.task.pool.keepAliveSeconds = 100
34 spring.task.pool.queueCapacity = 100

其中配置了context.listener.classes=com.example.demo.init.StartupEvent,将StartupEvent类作为Spring boot启动后执行文件。

其中还配置了一些mysql、redis和自定义的属性。可根据项目的实际情况修改。



4)日志文件logback.xml
logback.xml的内容如下:
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <configuration xmlns="http://ch.qos.logback/xml/ns/logback"
 3                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4                xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback
 5                http://ch.qos.logback/xml/ns/logback/logback.xsd
 6                http://ch.qos.logback/xml/ns/logback ">
 7     <property name="APP_Name" value="udplearning" />
 8     <timestamp key="bySecond" datePattern="yyyyMMdd'T'HHmmss" />
 9     <contextName>${APP_Name}</contextName>
10     
11     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
12         <encoder>
13             <pattern>%d{yyyyMMddHHmmss}|%-5level| %logger{0}.%M | %msg | %thread %n</pattern>
14         </encoder>
15     </appender>  
16     
17   <appender name="FILELOG" class="ch.qos.logback.core.rolling.RollingFileAppender">   
18     <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">   
19       <fileNamePattern>${catalina.home}/logs/app.%d{yyyyMMdd}.log</fileNamePattern>   
20       <maxHistory>30</maxHistory>    
21     </rollingPolicy>   
22     <encoder>   
23       <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern>   
24     </encoder>   
25   </appender>
26   
27     <appender name="RUNLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">   
28     <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">   
29       <fileNamePattern>${catalina.home}/logs/run.%d{yyyyMMdd}.log</fileNamePattern>   
30       <maxHistory>7</maxHistory>    
31     </rollingPolicy>   
32     <encoder>   
33       <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern>   
34     </encoder>   
35   </appender>
36   
37     <logger name="com.example.demo" level="debug" additivity="false">
38         <appender-ref ref="STDOUT" />  
39         <appender-ref ref="FILELOG" />
40     </logger>
41     
42     <root level="info">
43         <appender-ref ref="STDOUT" />
44     </root>
45 </configuration>

日志的级别是info级别  可以根据自己项目的实际情况进行设置。

 

5)StartupEvent.java

 1 package com.example.demo.init;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.context.ApplicationContext;
 6 import org.springframework.context.ApplicationListener;
 7 import org.springframework.context.event.ContextRefreshedEvent;
 8 
 9 /**
10  *
11  * Created by wj on 2017/8/28.
12  */
13 
14 public class StartupEvent implements ApplicationListener<ContextRefreshedEvent> {
15     private static final Logger log = LoggerFactory.getLogger(StartupEvent.class);
16 
17     private static ApplicationContext context;
18 
19     @Override
20     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
21 
22         try {
23 
24             context = contextRefreshedEvent.getApplicationContext();
25 
26             SysConfig sysConfig = (SysConfig) context.getBean(SysConfig.class);
27 
28             //接收UDP消息并保存至redis中
29             UdpServer udpServer = (UdpServer)StartupEvent.getBean(UdpServer.class);
30             udpServer.run(sysConfig.getUdpReceivePort());
31 
32 
33 //            这里可以开启多个线程去执行不同的任务
34 //            此处为工作的内容,不便公开!
35 
36 
37         } catch (Exception e) {
38             log.error("Exception", e);
39         }
40     }
41 
42     public static Object getBean(Class beanName) {
43         return context != null ? context.getBean(beanName) : null;
44     }
45 }

 

6)UdpServer.java

 1 package com.example.demo.init;
 2 
 3 import com.example.demo.handle.UdpServerHandler;
 4 import io.netty.bootstrap.Bootstrap;
 5 import io.netty.channel.ChannelOption;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.nio.NioDatagramChannel;
 9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.springframework.scheduling.annotation.Async;
12 import org.springframework.stereotype.Component;
13 
14 /**
15  * server服务器
16  * Created by wj on 2017/8/30.
17  */
18 @Component
19 public class UdpServer {
20 
21     private static final Logger log= LoggerFactory.getLogger(UdpServer.class);
22 
23 //    private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));
24 
25     @Async("myTaskAsyncPool")
26     public void run(int udpReceivePort) {
27 
28         EventLoopGroup group = new NioEventLoopGroup();
29         log.info("Server start!  Udp Receive msg Port:" + udpReceivePort );
30 
31         try {
32             Bootstrap b = new Bootstrap();
33             b.group(group)
34                     .channel(NioDatagramChannel.class)
35                     .option(ChannelOption.SO_BROADCAST, true)
36                     .handler(new UdpServerHandler());
37 
38             b.bind(udpReceivePort).sync().channel().closeFuture().await();
39         } catch (InterruptedException e) {
40             e.printStackTrace();
41         } finally {
42             group.shutdownGracefully();
43         }
44     }
45 
46 }

此处NioDatagramChannel.class采用的是非阻塞的模式接受UDP消息,若是接受的UDP消息少,可以采用阻塞式的方式接受UDP消息。

UdpServer.run()方法使用@Async将该方法定义成异步的,myTaskAsyncPool是自定义的线程池。

 

7)UdpServerHandler.java

 

 1 package com.example.demo.handle;
 2 
 3 import com.example.demo.init.StartupEvent;
 4 import com.example.demo.mod.UdpRecord;
 5 import com.example.demo.repository.mysql.UdpRepository;
 6 import com.example.demo.repository.redis.RedisRepository;
 7 import io.netty.buffer.Unpooled;
 8 import io.netty.channel.ChannelHandlerContext;
 9 import io.netty.channel.SimpleChannelInboundHandler;
10 import io.netty.channel.socket.DatagramPacket;
11 import io.netty.util.CharsetUtil;
12 import org.apache.commons.lang3.StringUtils;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 
16 import java.sql.Timestamp;
17 import java.util.Date;
18 
19 /**
20  * 接受UDP消息,并保存至redis的list链表中
21  * Created by wj on 2017/8/30.
22  *
23  */
24 
25 public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
26 
27     private static final Logger log= LoggerFactory.getLogger(UdpServerHandler.class);
28 
29     //用来计算server接收到多少UDP消息
30     private static int count = 0;
31 
32     @Override
33     public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
34 
35         String receiveMsg = packet.content().toString(CharsetUtil.UTF_8);
36 
37         log.info("Received UDP Msg:" + receiveMsg);
38 
39         UdpRecord udpRecord = new UdpRecord();
40 
41         //判断接受到的UDP消息是否正确(未实现)
42         if (StringUtils.isNotEmpty(receiveMsg) ){
43 
44             //计算接收到的UDP消息的数量
45             count++;
46 
47             //获取UdpRepository对象,将接收UDP消息的日志保存至mysql中
48             udpRecord.setUdpMsg(receiveMsg);
49             udpRecord.setTime(getTime());
50             UdpRepository udpRepository = (UdpRepository) StartupEvent.getBean(UdpRepository.class);
51             udpRepository.save(udpRecord);
52 
53             //获取RedirRepository对象
54             RedisRepository redisRepository = (RedisRepository) StartupEvent.getBean(RedisRepository.class);
55             //将获取到的UDP消息保存至redis的list列表中
56             redisRepository.lpush("udp:msg", receiveMsg);
57             redisRepository.setKey("UDPMsgNumber", String.valueOf(count));
58 
59 
60 //            在这里可以返回一个UDP消息给对方,告知已接收到UDP消息,但考虑到这是UDP消息,此处可以注释掉
61             ctx.write(new DatagramPacket(
62                     Unpooled.copiedBuffer("QOTM: " + "Got UDP Message!" , CharsetUtil.UTF_8), packet.sender()));
63 
64         }else{
65             log.error("Received Error UDP Messsage:" + receiveMsg);
66         }
67     }
68 
69     @Override
70     public void channelReadComplete(ChannelHandlerContext ctx) {
71         ctx.flush();
72     }
73 
74     @Override
75     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
76         cause.printStackTrace();
77         // We don't close the channel because we can keep serving requests.
78     }
79 
80     public Timestamp getTime(){
81         Date date = new Date();
82         Timestamp time = new Timestamp(date.getTime());
83         return time;
84     }
85 
86 }

 

 

 

此处若不借用ApplicationContext.getBean,是无法获取到RedisRepository对象的。

注:这里是无法使用注解@Autowired来获取到redisTemplate对象的。

8)repository 

RedisRepository.java

 1 package com.example.demo.repository.redis;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.data.redis.core.RedisTemplate;
 7 import org.springframework.stereotype.Service;
 8 
 9 /**
10  * 链接redis
11  * 实现list lpush和rpop
12  * Created by wj on 2017/8/30.
13  */
14 
15 
16 @Service
17 public class RedisRepository {
18     private static final Logger log = LoggerFactory.getLogger(RedisRepository.class);
19 
20     @Autowired
21     private RedisTemplate<String, String> redisTemplate;
22 
23     //----------------String-----------------------
24     public void setKey(String key,String value){
25         redisTemplate.opsForValue().set(key, value);
26     }
27 
28 
29     //----------------list----------------------
30     public Long lpush(String key, String val) throws Exception{
31         log.info("UDP Msg保存至redis中,key:" + key + ",val:" + val);
32         return redisTemplate.opsForList().leftPush(key, val);
33     }
34 
35     public String rpop(String key) throws Exception {
36         return redisTemplate.opsForList().rightPop(key);
37     }
38 
39 }

 使用springframework框架中的RedisTemplate类去链接redis,此处是将收到的UDP消息左保存(lpush)至list链表中,然后右边弹出(rpop)。

 

UdpRepository.java
 1 package com.example.demo.repository.mysql;
 2 
 3 import com.example.demo.mod.UdpRecord;
 4 import org.springframework.data.jpa.repository.JpaRepository;
 5 
 6 /**
 7  * Created by wj on 2017/8/31.
 8  */
 9 public interface UdpRepository extends JpaRepository<UdpRecord,Long> {
10 
11 }

定义Spring Data JPA接口,链接数据库。

其中
UdpRecord.java
 1 package com.example.demo.mod;
 2 
 3 import javax.persistence.Entity;
 4 import javax.persistence.GeneratedValue;
 5 import javax.persistence.Id;
 6 import javax.persistence.Table;
 7 import java.sql.Timestamp;
 8 
 9 /**
10  * Created by wj on 2017/8/31.
11  *
12  * 用来记录接收的UDP消息的日志
13  */
14 @Entity
15 @Table
16 public class UdpRecord {
17 
18     private long id;
19     private String udpMsg;
20     private Timestamp time;
21 
22     @Id
23     @GeneratedValue
24     public long getId() {
25         return id;
26     }
27 
28     public void setId(long id) {
29         this.id = id;
30     }
31 
32     public String getUdpMsg() {
33         return udpMsg;
34     }
35 
36     public void setUdpMsg(String udpMsg) {
37         this.udpMsg = udpMsg;
38     }
39 
40     public Timestamp getTime() {
41         return time;
42     }
43 
44     public void setTime(Timestamp time) {
45         this.time = time;
46     }
47 }

注解@Entity和@Table辨明这是一个实体类表格 ,其中的@Id和@GeneratedValue表明id是key值并且是自动递增的。



 

 

9)线程池的相关信息

TaskExecutePool.java
 1 package com.example.demo.thread;
 2 
 3 import com.example.demo.init.SysConfig;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.context.annotation.Bean;
 6 import org.springframework.context.annotation.Configuration;
 7 import org.springframework.scheduling.annotation.EnableAsync;
 8 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 9 
10 import java.util.concurrent.Executor;
11 import java.util.concurrent.ThreadPoolExecutor;
12 
13 /**
14  * Created by wangjian on 2017/8/29.
15  */
16 @Configuration
17 @EnableAsync
18 public class TaskExecutePool {
19 
20     @Autowired
21     private SysConfig config;
22 
23     @Bean
24     public Executor myTaskAsyncPool() {
25         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
26         executor.setCorePoolSize(config.getCorePoolSize());
27         executor.setMaxPoolSize(config.getMaxPoolSize());
28         executor.setQueueCapacity(config.getQueueCapacity());
29         executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
30         executor.setThreadNamePrefix("MyExecutor-");
31 
32         // rejection-policy:当pool已经达到max size的时候,如何处理新任务
33         // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
34         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
35         executor.initialize();
36         return executor;
37     }
38 }

 

 

10)配置文件SysConfig.java

 1 package com.example.demo.init;
 2 
 3 import org.springframework.boot.context.properties.ConfigurationProperties;
 4 import org.springframework.stereotype.Component;
 5 
 6 /**
 7  * Created by wj on 2017/8/30.
 8  */
 9 @Component
10 @ConfigurationProperties(prefix="sysfig")
11 public class SysConfig {
12     private int UdpReceivePort;//UDP消息接收端口
13 
14     //线程池信息
15     private int CorePoolSize;
16 
17     private int MaxPoolSize;
18 
19     private int KeepAliveSeconds;
20 
21     private int QueueCapacity;
22 
23     public int getCorePoolSize() {
24         return CorePoolSize;
25     }
26 
27     public void setCorePoolSize(int corePoolSize) {
28         CorePoolSize = corePoolSize;
29     }
30 
31     public int getMaxPoolSize() {
32         return MaxPoolSize;
33     }
34 
35     public void setMaxPoolSize(int maxPoolSize) {
36         MaxPoolSize = maxPoolSize;
37     }
38 
39     public int getKeepAliveSeconds() {
40         return KeepAliveSeconds;
41     }
42 
43     public void setKeepAliveSeconds(int keepAliveSeconds) {
44         KeepAliveSeconds = keepAliveSeconds;
45     }
46 
47     public int getQueueCapacity() {
48         return QueueCapacity;
49     }
50 
51     public void setQueueCapacity(int queueCapacity) {
52         QueueCapacity = queueCapacity;
53     }
54 
55     public int getUdpReceivePort() {
56         return UdpReceivePort;
57     }
58 
59     public void setUdpReceivePort(int udpReceivePort) {
60         UdpReceivePort = udpReceivePort;
61     }
62 }

 

 

 

11)小结

其实发送UDP和接收UDP消息的核心代码很简单,只是netty框架将其包装了。

UDP发送消息是

1 byte[] buffer = ...
2 InetAddress address = InetAddress.getByName("localhost");
3 
4 DatagramPacket packet = new DatagramPacket(
5     buffer, buffer.length, address, 9999);
6     DatagramSocket datagramSocket = new DatagramSocket();
7     datagramSocket.send(packet);

 

udp接收消息是

1 DatagramSocket datagramSocket = new DatagramSocket(9999);
2 
3 byte[] buffer =....
4 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
5 
6 datagramSocket.receive(packet);

 

看起来是不是很简单???

 

 

12)源代码下载地址

https://github.com/wj302763621/learning_udp.git

这里只公布了一个框架,其他很多部分由于涉及到了工作内容不便公布。

有需要的同学可以自行下载对其代码进行更改。

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM