SpringBoot RedisMQ消息隊列與發布訂閱


SpringBoot簡單整合RedisMQ消息隊列和發布訂閱

注:RedisMq消息隊列使用redis數組實現,leftpush存一,rightpop取一。

1.application.properties

server.port=8080

#thymeleaf配置
#是否啟用模板緩存。
spring.thymeleaf.cache=false
#是否為Web框架啟用Thymeleaf視圖解析。
spring.thymeleaf.enabled=true
#在SpringEL表達式中啟用SpringEL編譯器。
spring.thymeleaf.enable-spring-el-compiler=true
#模板文件編碼。
spring.thymeleaf.encoding=UTF-8
#要應用於模板的模板模式。另請參見Thymeleaf的TemplateMode枚舉。
spring.thymeleaf.mode=HTML5
#在構建URL時添加前綴以查看名稱的前綴。
spring.thymeleaf.prefix=classpath:/templates/
#Content-Type寫入HTTP響應的值。
spring.thymeleaf.servlet.content-type=text/html
#在構建URL時附加到視圖名稱的后綴。
spring.thymeleaf.suffix=.html

##單服務器
spring.redis.host=192.168.159.129
##單端口
spring.redis.port=6379
## 連接池最大連接數(使用負值表示沒有限制) 
spring.redis.pool.max-active=300
## Redis數據庫索引(默認為0) 
spring.redis.database=0
## 連接池最大阻塞等待時間(使用負值表示沒有限制) 
spring.redis.pool.max-wait=-1
## 連接池中的最大空閑連接 
spring.redis.pool.max-idle=100
## 連接池中的最小空閑連接 
spring.redis.pool.min-idle=20
## 連接超時時間(毫秒) 
spring.redis.timeout=60000

 

2.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.szw.learn</groupId>
    <artifactId>redis_mq_01</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis_mq_01</name>
    <description>redis mq example</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.16.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.test.skip>true</maven.test.skip>
        <skipTests>true</skipTests>
        <start-class>com.szw.learn.redis.RedisMqApplication</start-class>
    </properties>

    <dependencies>
        <!-- 使用web啟動器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 測試 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 模板引擎 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        
        <!-- redis artifactId與1.5之前版本變了 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <!-- 要將源碼放上去,需要加入這個插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打包 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

 

3.RedisMqApplication.java啟動類

package com.szw.learn.redis;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RedisMqApplication {
    public static void main(String[] args) {
        System.setProperty("spring.devtools.restart.enabled", "false");
        SpringApplication.run(RedisMqApplication.class, args);
    }
}

 

4.消息隊列測試(生產者和消費者)

生產者:

package com.szw.learn.redis.mq;

import javax.annotation.PostConstruct;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import com.szw.learn.redis.RedisMqApplication;

/**
 * @author 七脈
 * 描述:生產者測試類
 */
@SpringBootTest(classes = RedisMqApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class MQProducerTest {
    @Autowired
    private StringRedisTemplate redisTemplate;
    //redis的消息隊列直接使用redis數組實現
    private ListOperations<String, String> listRedis;
    
    /**
     * <br>描 述: 初始化時賦值
     * <br>作 者: shizhenwei 
     * <br>歷 史: (版本) 作者 時間 注釋
     */
    @PostConstruct
    private void init(){
        listRedis = redisTemplate.opsForList();
    }
    
    @Test
    public void test() {
        for(int i=1; i<=10; i++){
            //從左邊向堆棧順序存放1~10個消息
            listRedis.leftPush("storage", i+"");
        }
    }
}

消費者:

package com.szw.learn.redis.mq;

import javax.annotation.PostConstruct;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;

import com.szw.learn.redis.RedisMqApplication;
/**
 * @author 七脈
 * 描述:消費者測試類
 */
@SpringBootTest(classes = RedisMqApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class MQConsumerTest {
    @Autowired
    private StringRedisTemplate redisTemplate;
    //redis的消息隊列直接使用redis數組實現
    private ListOperations<String, String> listRedis;
    
    /**
     * <br>描 述: 初始化時賦值
     * <br>作 者: shizhenwei 
     * <br>歷 史: (版本) 作者 時間 注釋
     */
    @PostConstruct
    private void init(){
        listRedis = redisTemplate.opsForList();
    }
    
    @Test
    public void test() {
        while(true){
            //從右邊取堆棧順序取1~10個消息
            String msg = listRedis.rightPop("storage");
            if(StringUtils.isEmpty(msg)){
                System.out.println("消息已經全部取出了。。。。");
                break;
            }
            System.out.println(msg);
        }
    }
}

測試結果:

 

5.發布訂閱

訂閱監聽類:

package com.szw.learn.redis.mq;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

/**
 * @author 七脈 描述:訂閱監聽類
 */
public class SubscribeListener implements MessageListener {
    /**
     * 訂閱接收發布者的消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 緩存消息是序列化的,需要反序列化。然而new String()可以反序列化,但靜態方法valueOf()不可以
        System.out.println(new String(pattern) + "主題發布:" + new String(message.getBody()));
    }
}

 

發布service:

package com.szw.learn.redis.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
 * @author 七脈 描述:發布service
 */
@Component
public class PublishService {
    @Autowired
    StringRedisTemplate redisTemplate;

    /**
     * @author 七脈 描述:發布方法
     * @param channel 消息發布訂閱 主題
     * @param message 消息信息
     */
    public void publish(String channel, Object message) {
        // 該方法封裝的 connection.publish(rawChannel, rawMessage);
        redisTemplate.convertAndSend(channel, message);
    }
}

 

添加定義監聽主題:

package com.szw.learn.redis.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

@Configuration
public class RedisConfig {
    @Autowired
    private JedisConnectionFactory jedisConnectionFactory;

    /**
     * @author 七脈 描述:需要手動定義RedisMessageListenerContainer加入IOC容器
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(jedisConnectionFactory);

        /**
         * 添加訂閱者監聽類,數量不限.PatternTopic定義監聽主題,這里監聽dj主題
         */
        container.addMessageListener(new SubscribeListener(), new PatternTopic("dj"));
        return container;

    }
}

 

發布訂閱測試:

package com.szw.learn.redis.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.szw.learn.redis.RedisMqApplication;

/**
 * @author 七脈
 * 描述:消息發布
 */
@SpringBootTest(classes = RedisMqApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class PublishTest {

    @Autowired
    private PublishService publishService;

    @Test
    public void test() {
        for(int i=1; i<=10; i++){
            //向dj主題里發布10個消息
            publishService.publish("dj", "like "+i+" 次");
        }
    }
}

測試結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM