SpringBoot簡單整合RedisMQ消息隊列和發布訂閱
注:RedisMq消息隊列使用redis數組實現,leftpush存一,rightpop取一。
1.application.properties
server.port=8080 #thymeleaf配置 #是否啟用模板緩存。 spring.thymeleaf.cache=false #是否為Web框架啟用Thymeleaf視圖解析。 spring.thymeleaf.enabled=true #在SpringEL表達式中啟用SpringEL編譯器。 spring.thymeleaf.enable-spring-el-compiler=true #模板文件編碼。 spring.thymeleaf.encoding=UTF-8 #要應用於模板的模板模式。另請參見Thymeleaf的TemplateMode枚舉。 spring.thymeleaf.mode=HTML5 #在構建URL時添加前綴以查看名稱的前綴。 spring.thymeleaf.prefix=classpath:/templates/ #Content-Type寫入HTTP響應的值。 spring.thymeleaf.servlet.content-type=text/html #在構建URL時附加到視圖名稱的后綴。 spring.thymeleaf.suffix=.html ##單服務器 spring.redis.host=192.168.159.129 ##單端口 spring.redis.port=6379 ## 連接池最大連接數(使用負值表示沒有限制) spring.redis.pool.max-active=300 ## Redis數據庫索引(默認為0) spring.redis.database=0 ## 連接池最大阻塞等待時間(使用負值表示沒有限制) spring.redis.pool.max-wait=-1 ## 連接池中的最大空閑連接 spring.redis.pool.max-idle=100 ## 連接池中的最小空閑連接 spring.redis.pool.min-idle=20 ## 連接超時時間(毫秒) spring.redis.timeout=60000
2.pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.szw.learn</groupId> <artifactId>redis_mq_01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redis_mq_01</name> <description>redis mq example</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.16.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <maven.test.skip>true</maven.test.skip> <skipTests>true</skipTests> <start-class>com.szw.learn.redis.RedisMqApplication</start-class> </properties> <dependencies> <!-- 使用web啟動器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 測試 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 模板引擎 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!-- redis artifactId與1.5之前版本變了 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies> <repositories> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> <build> <plugins> <!-- 要將源碼放上去,需要加入這個插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <configuration> <attach>true</attach> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <!-- 打包 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> </configuration> </plugin> </plugins> </build> </project>
3.RedisMqApplication.java啟動類
package com.szw.learn.redis; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RedisMqApplication { public static void main(String[] args) { System.setProperty("spring.devtools.restart.enabled", "false"); SpringApplication.run(RedisMqApplication.class, args); } }
4.消息隊列測試(生產者和消費者)
生產者:
package com.szw.learn.redis.mq; import javax.annotation.PostConstruct; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.ListOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import com.szw.learn.redis.RedisMqApplication; /** * @author 七脈 * 描述:生產者測試類 */ @SpringBootTest(classes = RedisMqApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @RunWith(SpringRunner.class) public class MQProducerTest { @Autowired private StringRedisTemplate redisTemplate; //redis的消息隊列直接使用redis數組實現 private ListOperations<String, String> listRedis; /** * <br>描 述: 初始化時賦值 * <br>作 者: shizhenwei * <br>歷 史: (版本) 作者 時間 注釋 */ @PostConstruct private void init(){ listRedis = redisTemplate.opsForList(); } @Test public void test() { for(int i=1; i<=10; i++){ //從左邊向堆棧順序存放1~10個消息 listRedis.leftPush("storage", i+""); } } }
消費者:
package com.szw.learn.redis.mq; import javax.annotation.PostConstruct; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.ListOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.StringUtils; import com.szw.learn.redis.RedisMqApplication; /** * @author 七脈 * 描述:消費者測試類 */ @SpringBootTest(classes = RedisMqApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @RunWith(SpringRunner.class) public class MQConsumerTest { @Autowired private StringRedisTemplate redisTemplate; //redis的消息隊列直接使用redis數組實現 private ListOperations<String, String> listRedis; /** * <br>描 述: 初始化時賦值 * <br>作 者: shizhenwei * <br>歷 史: (版本) 作者 時間 注釋 */ @PostConstruct private void init(){ listRedis = redisTemplate.opsForList(); } @Test public void test() { while(true){ //從右邊取堆棧順序取1~10個消息 String msg = listRedis.rightPop("storage"); if(StringUtils.isEmpty(msg)){ System.out.println("消息已經全部取出了。。。。"); break; } System.out.println(msg); } } }
測試結果:
5.發布訂閱
訂閱監聽類:
package com.szw.learn.redis.mq; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; /** * @author 七脈 描述:訂閱監聽類 */ public class SubscribeListener implements MessageListener { /** * 訂閱接收發布者的消息 */ @Override public void onMessage(Message message, byte[] pattern) { // 緩存消息是序列化的,需要反序列化。然而new String()可以反序列化,但靜態方法valueOf()不可以 System.out.println(new String(pattern) + "主題發布:" + new String(message.getBody())); } }
發布service:
package com.szw.learn.redis.mq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; /** * @author 七脈 描述:發布service */ @Component public class PublishService { @Autowired StringRedisTemplate redisTemplate; /** * @author 七脈 描述:發布方法 * @param channel 消息發布訂閱 主題 * @param message 消息信息 */ public void publish(String channel, Object message) { // 該方法封裝的 connection.publish(rawChannel, rawMessage); redisTemplate.convertAndSend(channel, message); } }
添加定義監聽主題:
package com.szw.learn.redis.mq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisConfig { @Autowired private JedisConnectionFactory jedisConnectionFactory; /** * @author 七脈 描述:需要手動定義RedisMessageListenerContainer加入IOC容器 * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory); /** * 添加訂閱者監聽類,數量不限.PatternTopic定義監聽主題,這里監聽dj主題 */ container.addMessageListener(new SubscribeListener(), new PatternTopic("dj")); return container; } }
發布訂閱測試:
package com.szw.learn.redis.mq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.szw.learn.redis.RedisMqApplication; /** * @author 七脈 * 描述:消息發布 */ @SpringBootTest(classes = RedisMqApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @RunWith(SpringRunner.class) public class PublishTest { @Autowired private PublishService publishService; @Test public void test() { for(int i=1; i<=10; i++){ //向dj主題里發布10個消息 publishService.publish("dj", "like "+i+" 次"); } } }
測試結果: