SpringBoot整合RabbitMQ-消息可靠性投遞


本系列是學習SpringBoot整合RabbitMQ的練手,包含服務安裝,RabbitMQ整合SpringBoot2.x,消息可靠性投遞實現等三篇博客。

  學習路徑:https://www.imooc.com/learn/1042 RabbitMQ消息中間件極速入門與實戰 

  項目源碼:https://github.com/ZbLeaning/Boot-RabbitMQ 


 

設計一個消息可靠性投遞方案,服務結構如下:

 

 組成:

  Sender+Confirm Listener :組成消息的生產者

  MQ Broker:消息的消費者,包含具體的MQ服務

  BIZ DB:業務數據數據庫

  MSG DB:消息日志記錄數據庫(0:發送中、1:發送成功、2:發送失敗)

思路:

  以最常見的創建訂單業務來舉例,假設訂單創建成功后需要去發短信通知用戶

  1、先完成訂單業務數據的存儲,並記錄這條操作日志(發送中)

  2、生產者發送一條消息到消費者(異步)

  3、消費者成功消費后給給Confirm listener發送應答

  4、監聽收到消息確認成功后,對消息日志表操作,修改之前的日志狀態(發送成功)

  5、在消費端返回應答的過程中,可能發生網絡異常導致生產者未收到應答消息,因此需要一個定時任務去撈取狀態是發送中並已經超時的消息集合

  6、將撈取到的日志對應的消息,進行重發

  7、定時任務判斷設置的消息最大重投次數,大於最大重投次數就判斷消息發送失敗,更新日志記錄狀態(發送失敗)


 項目搭建

  Durid數據源配置文件

//druid.properties
##下面為連接池的補充設置,應用到上面所有數據源中
#初始化大小,最小,最大
druid.initialSize=5
druid.minIdle=10
druid.maxActive=300
#配置獲取連接等待超時的時間
druid.maxWait=60000
#配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
druid.timeBetweenEvictionRunsMillis=60000
#配置一個連接在池中最小生存的時間,單位是毫秒
druid.minEvictableIdleTimeMillis=300000
druid.validationQuery=SELECT 1 FROM DUAL
druid.testWhileIdle=true
druid.testOnBorrow=false
druid.testOnReturn=false
#打開PSCache,並且指定每個連接上PSCache的大小
druid.poolPreparedStatements=true
druid.maxPoolPreparedStatementPerConnectionSize=20
#配置監控統計攔截的filters,去掉后監控界面sql無法統計,'wall'用於防火牆
druid.filters=stat,wall,log4j
#通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合並多個DruidDataSource的監控數據
druid.useGlobalDataSourceStat=true

  添加相應的數據源配置類、定時任務配置類、常量類

package com.imooc.mq.config.database;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.stereotype.Component;

/**
 * @Title: DruidDataSourceSettings
 * @Description: Druid數據源讀取配置
 * @date 2019/1/2214:31
 */
@Component
@ConfigurationProperties(prefix = "spring.datasource")
@PropertySource("classpath:druid.properties")
public class DruidDataSourceSettings {
    private String driverClassName;
    private String url;
    private String username;
    private String password;

    @Value("${druid.initialSize}")
    private int initialSize;

    @Value("${druid.minIdle}")
    private int minIdle;

    @Value("${druid.maxActive}")
    private int maxActive;

    @Value("${druid.timeBetweenEvictionRunsMillis}")
    private long timeBetweenEvictionRunsMillis;

    @Value("${druid.minEvictableIdleTimeMillis}")
    private long minEvictableIdleTimeMillis;

    @Value("${druid.validationQuery}")
    private String validationQuery;

    @Value("${druid.testWhileIdle}")
    private boolean testWhileIdle;

    @Value("${druid.testOnBorrow}")
    private boolean testOnBorrow;

    @Value("${druid.testOnReturn}")
    private boolean testOnReturn;

    @Value("${druid.poolPreparedStatements}")
    private boolean poolPreparedStatements;

    @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")
    private int maxPoolPreparedStatementPerConnectionSize;

    @Value("${druid.filters}")
    private String filters;

    @Value("${druid.connectionProperties}")
    private String connectionProperties;

    @Bean
    public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){
        return new PropertySourcesPlaceholderConfigurer();
    }

    public String getDriverClassName() {
        return driverClassName;
    }
    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public int getInitialSize() {
        return initialSize;
    }
    public void setInitialSize(int initialSize) {
        this.initialSize = initialSize;
    }
    public int getMinIdle() {
        return minIdle;
    }
    public void setMinIdle(int minIdle) {
        this.minIdle = minIdle;
    }
    public int getMaxActive() {
        return maxActive;
    }
    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;
    }
    public long getTimeBetweenEvictionRunsMillis() {
        return timeBetweenEvictionRunsMillis;
    }
    public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
        this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
    }
    public long getMinEvictableIdleTimeMillis() {
        return minEvictableIdleTimeMillis;
    }
    public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
    }
    public String getValidationQuery() {
        return validationQuery;
    }
    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }
    public boolean isTestWhileIdle() {
        return testWhileIdle;
    }
    public void setTestWhileIdle(boolean testWhileIdle) {
        this.testWhileIdle = testWhileIdle;
    }
    public boolean isTestOnBorrow() {
        return testOnBorrow;
    }
    public void setTestOnBorrow(boolean testOnBorrow) {
        this.testOnBorrow = testOnBorrow;
    }
    public boolean isTestOnReturn() {
        return testOnReturn;
    }
    public void setTestOnReturn(boolean testOnReturn) {
        this.testOnReturn = testOnReturn;
    }
    public boolean isPoolPreparedStatements() {
        return poolPreparedStatements;
    }
    public void setPoolPreparedStatements(boolean poolPreparedStatements) {
        this.poolPreparedStatements = poolPreparedStatements;
    }
    public int getMaxPoolPreparedStatementPerConnectionSize() {
        return maxPoolPreparedStatementPerConnectionSize;
    }
    public void setMaxPoolPreparedStatementPerConnectionSize(
            int maxPoolPreparedStatementPerConnectionSize) {
        this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
    }
    public String getFilters() {
        return filters;
    }
    public void setFilters(String filters) {
        this.filters = filters;
    }
    public String getConnectionProperties() {
        return connectionProperties;
    }
    public void setConnectionProperties(String connectionProperties) {
        this.connectionProperties = connectionProperties;
    }
}
package com.imooc.mq.config.database;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.sql.SQLException;

import com.alibaba.druid.pool.DruidDataSource;
/**
 * @Title: DruidDataSourceConfig
 * @Description: Druid數據源初始化
 *
 * EnableTransactionManagement 開啟事務
 * @date 2019/1/2214:35
 */

@Configuration
@EnableTransactionManagement
public class DruidDataSourceConfig {
    private static Logger logger = LoggerFactory.getLogger(com.imooc.mq.config.database.DruidDataSourceConfig.class);
    //注入數據源配置信息
    @Autowired
    private DruidDataSourceSettings druidSettings;

    public static String DRIVER_CLASSNAME;

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertyConfigure() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    /**
     * 創建DataSource
     * @return
     * @throws SQLException
     */
    @Bean
    public DataSource dataSource() throws SQLException {
        DruidDataSource ds = new DruidDataSource();
        ds.setDriverClassName(druidSettings.getDriverClassName());
        DRIVER_CLASSNAME = druidSettings.getDriverClassName();
        ds.setUrl(druidSettings.getUrl());
        ds.setUsername(druidSettings.getUsername());
        ds.setPassword(druidSettings.getPassword());
        ds.setInitialSize(druidSettings.getInitialSize());
        ds.setMinIdle(druidSettings.getMinIdle());
        ds.setMaxActive(druidSettings.getMaxActive());
        ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
        ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
        ds.setValidationQuery(druidSettings.getValidationQuery());
        ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
        ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
        ds.setTestOnReturn(druidSettings.isTestOnReturn());
        ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
        ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
        ds.setFilters(druidSettings.getFilters());
        ds.setConnectionProperties(druidSettings.getConnectionProperties());
        logger.info(" druid datasource config : {} ", ds);
        return ds;
    }

    /**
     * 開啟事務
     * @return
     * @throws Exception
     */
    @Bean
    public PlatformTransactionManager transactionManager() throws Exception {
        DataSourceTransactionManager txManager = new DataSourceTransactionManager();
        txManager.setDataSource(dataSource());
        return txManager;
    }
}
package com.imooc.mq.config.database;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;

/**
 * @Title: MybatisDataSourceConfig
 * @Description: 整合mybatis和Druid
 * @date 2019/1/2214:39
 */
@Configuration
public class MybatisDataSourceConfig {
    @Autowired
    private DataSource dataSource;

    @Bean(name="sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactoryBean() {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 添加XML目錄
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        try {
            bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));
            SqlSessionFactory sqlSessionFactory = bean.getObject();
            sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);

            return sqlSessionFactory;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
package com.imooc.mq.config.database;

import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Title: MybatisMapperScanerConfig
 * @Description: 掃碼Mybatis
 * @AutoConfigureAfter(MybatisDataSourceConfig.class) 先加載數據源類,再加載該類
 * @date 2019/1/2214:43
 */
@Configuration
@AutoConfigureAfter(MybatisDataSourceConfig.class)
public class MybatisMapperScanerConfig {
    @Bean
    public MapperScannerConfigurer mapperScannerConfigurer() {
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
        mapperScannerConfigurer.setBasePackage("com.imooc.mq.mapper");
        return mapperScannerConfigurer;
    }
}

 

package com.imooc.mq.config.task;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
 * @Title: TaskSchedulerConfig
 * @Description: 定時任務配置
 * @date 2019/1/2214:46
 */
@Configuration
@EnableScheduling //啟動定時任務
public class TaskSchedulerConfig  implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskScheduler());
    }

    /**
     * 定時任務線程池
     * @return
     */
    @Bean(destroyMethod = "shutdown")
    public Executor taskScheduler(){
        return Executors.newScheduledThreadPool(100);
    }
}
package com.imooc.mq.constant;

/**
 * @Title: Constans
 * @Description: 常量
 * @date 2019/1/2214:50
 */
public class Constans {
    /**
     * 發送中
     */
    public static final String ORDER_SENDING = "0";

    /**
     * 發送成功
     */
    public static final String ORDER_SEND_SUCCESS = "1";

    /**
     * 發送失敗
     */
    public static final String ORDER_SEND_FAILURE = "2";
    /**
     * 分鍾超時單位:min
     */
    public static final int ORDER_TIMEOUT = 1;
}

 相應的mapper接口和mapper.xml文件配置

package com.imooc.mq.mapper;

import com.imooc.mq.entity.BrokerMessageLog;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;

import java.util.Date;
import java.util.List;

/**
 * @Title: BrokerMessageLogMapper
 * @Description: 消息記錄接口
 * @date 2019/1/2214:45
 */
@Repository
public interface BrokerMessageLogMapper {
    /**
     * 查詢消息狀態為0(發送中) 且已經超時的消息集合
     * @return
     */
    List<BrokerMessageLog> query4StatusAndTimeoutMessage();

    /**
     * 重新發送統計count發送次數 +1
     * @param messageId
     * @param updateTime
     */
    void update4ReSend(@Param("messageId")String messageId, @Param("updateTime") Date updateTime);
    /**
     * 更新最終消息發送結果 成功 or 失敗
     * @param messageId
     * @param status
     * @param updateTime
     */
    void changeBrokerMessageLogStatus(@Param("messageId")String messageId, @Param("status")String status, @Param("updateTime")Date updateTime);

    int insertSelective(BrokerMessageLog record);
}
------------------------------------------------------------------
package com.imooc.mq.mapper;

import com.imooc.mq.entity.Order;
import org.springframework.stereotype.Repository;

/**
 * @Title: OrderMapper
 * @Description: 訂單接口
 * @date 2019/1/2214:45
 */
@Repository
public interface OrderMapper {
    int insert(Order record);
    int deleteByPrimaryKey(Integer id);
    int insertSelective(Order record);
    Order selectByPrimaryKey(Integer id);
    int updateByPrimaryKeySelective(Order record);
    int updateByPrimaryKey(Order record);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.imooc.mq.mapper.BrokerMessageLogMapper" >
    <resultMap id="BaseResultMap" type="com.imooc.mq.entity.BrokerMessageLog" >
        <id column="message_id" property="messageId" jdbcType="VARCHAR" />
        <result column="message" property="message" jdbcType="VARCHAR" />
        <result column="try_count" property="tryCount" jdbcType="INTEGER" />
        <result column="status" property="status" jdbcType="VARCHAR" />
        <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
        <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
        <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
    </resultMap>
    <sql id="Base_Column_List" >
    message_id, message, try_count, status, next_retry, create_time, update_time
  </sql>

    <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
        select
        <include refid="Base_Column_List" />
        from broker_message_log
        where message_id = #{messageId,jdbcType=VARCHAR}
    </select>
    <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
    delete from broker_message_log
    where message_id = #{messageId,jdbcType=VARCHAR}
  </delete>
    <insert id="insert" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
    insert into broker_message_log (message_id, message, try_count,
      status, next_retry, create_time,
      update_time)
    values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR}, #{tryCount,jdbcType=INTEGER},
      #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
    <insert id="insertSelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
        insert into broker_message_log
        <trim prefix="(" suffix=")" suffixOverrides="," >
            <if test="messageId != null" >
                message_id,
            </if>
            <if test="message != null" >
                message,
            </if>
            <if test="tryCount != null" >
                try_count,
            </if>
            <if test="status != null" >
                status,
            </if>
            <if test="nextRetry != null" >
                next_retry,
            </if>
            <if test="createTime != null" >
                create_time,
            </if>
            <if test="updateTime != null" >
                update_time,
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
            <if test="messageId != null" >
                #{messageId,jdbcType=VARCHAR},
            </if>
            <if test="message != null" >
                #{message,jdbcType=VARCHAR},
            </if>
            <if test="tryCount != null" >
                #{tryCount,jdbcType=INTEGER},
            </if>
            <if test="status != null" >
                #{status,jdbcType=VARCHAR},
            </if>
            <if test="nextRetry != null" >
                #{nextRetry,jdbcType=TIMESTAMP},
            </if>
            <if test="createTime != null" >
                #{createTime,jdbcType=TIMESTAMP},
            </if>
            <if test="updateTime != null" >
                #{updateTime,jdbcType=TIMESTAMP},
            </if>
        </trim>
    </insert>
    <update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
        update broker_message_log
        <set >
            <if test="message != null" >
                message = #{message,jdbcType=VARCHAR},
            </if>
            <if test="tryCount != null" >
                try_count = #{tryCount,jdbcType=INTEGER},
            </if>
            <if test="status != null" >
                status = #{status,jdbcType=VARCHAR},
            </if>
            <if test="nextRetry != null" >
                next_retry = #{nextRetry,jdbcType=TIMESTAMP},
            </if>
            <if test="createTime != null" >
                create_time = #{createTime,jdbcType=TIMESTAMP},
            </if>
            <if test="updateTime != null" >
                update_time = #{updateTime,jdbcType=TIMESTAMP},
            </if>
        </set>
        where message_id = #{messageId,jdbcType=VARCHAR}
    </update>
    <update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
    update broker_message_log
    set message = #{message,jdbcType=VARCHAR},
      try_count = #{tryCount,jdbcType=INTEGER},
      status = #{status,jdbcType=VARCHAR},
      next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>


    <select id="query4StatusAndTimeoutMessage" resultMap="BaseResultMap">
          <![CDATA[
          select message_id, message, try_count, status, next_retry, create_time, update_time
              from broker_message_log bml
              where status = '0'
              and next_retry <= sysdate()
          ]]>
    </select>

    <update id="update4ReSend" >
    update broker_message_log bml
    set bml.try_count = bml.try_count + 1,
      bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
    where bml.message_id = #{messageId,jdbcType=VARCHAR}
  </update>

    <update id="changeBrokerMessageLogStatus" >
    update broker_message_log bml
    set bml.status = #{status,jdbcType=VARCHAR},
          bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
    where bml.message_id = #{messageId,jdbcType=VARCHAR}
  </update>

</mapper>
-------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.imooc.mq.mapper.OrderMapper" >
    <resultMap id="BaseResultMap" type="com.imooc.mq.entity.Order" >
        <id column="id" property="id" jdbcType="INTEGER" />
        <result column="name" property="name" jdbcType="VARCHAR" />
        <result column="message_id" property="messageId" jdbcType="VARCHAR" />
    </resultMap>
    <sql id="Example_Where_Clause" >
        <where >
            <foreach collection="oredCriteria" item="criteria" separator="or" >
                <if test="criteria.valid" >
                    <trim prefix="(" suffix=")" prefixOverrides="and" >
                        <foreach collection="criteria.criteria" item="criterion" >
                            <choose >
                                <when test="criterion.noValue" >
                                    and ${criterion.condition}
                                </when>
                                <when test="criterion.singleValue" >
                                    and ${criterion.condition} #{criterion.value}
                                </when>
                                <when test="criterion.betweenValue" >
                                    and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
                                </when>
                                <when test="criterion.listValue" >
                                    and ${criterion.condition}
                                    <foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >
                                        #{listItem}
                                    </foreach>
                                </when>
                            </choose>
                        </foreach>
                    </trim>
                </if>
            </foreach>
        </where>
    </sql>
    <sql id="Update_By_Example_Where_Clause" >
        <where >
            <foreach collection="example.oredCriteria" item="criteria" separator="or" >
                <if test="criteria.valid" >
                    <trim prefix="(" suffix=")" prefixOverrides="and" >
                        <foreach collection="criteria.criteria" item="criterion" >
                            <choose >
                                <when test="criterion.noValue" >
                                    and ${criterion.condition}
                                </when>
                                <when test="criterion.singleValue" >
                                    and ${criterion.condition} #{criterion.value}
                                </when>
                                <when test="criterion.betweenValue" >
                                    and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
                                </when>
                                <when test="criterion.listValue" >
                                    and ${criterion.condition}
                                    <foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >
                                        #{listItem}
                                    </foreach>
                                </when>
                            </choose>
                        </foreach>
                    </trim>
                </if>
            </foreach>
        </where>
    </sql>
    <sql id="Base_Column_List" >
    id, name, message_id
  </sql>

    <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
        select
        <include refid="Base_Column_List" />
        from t_order
        where id = #{id,jdbcType=INTEGER}
    </select>
    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
    delete from t_order
    where id = #{id,jdbcType=INTEGER}
  </delete>

    <insert id="insert" parameterType="com.imooc.mq.entity.Order" >
    insert into t_order (id, name, message_id
      )
    values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{messageId,jdbcType=VARCHAR}
      )
  </insert>
    <insert id="insertSelective" parameterType="com.imooc.mq.entity.Order" >
        insert into t_order
        <trim prefix="(" suffix=")" suffixOverrides="," >
            <if test="id != null" >
                id,
            </if>
            <if test="name != null" >
                name,
            </if>
            <if test="messageId != null" >
                message_id,
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
            <if test="id != null" >
                #{id,jdbcType=INTEGER},
            </if>
            <if test="name != null" >
                #{name,jdbcType=VARCHAR},
            </if>
            <if test="messageId != null" >
                #{messageId,jdbcType=VARCHAR},
            </if>
        </trim>
    </insert>

    <update id="updateByExampleSelective" parameterType="map" >
        update t_order
        <set >
            <if test="record.id != null" >
                id = #{record.id,jdbcType=INTEGER},
            </if>
            <if test="record.name != null" >
                name = #{record.name,jdbcType=VARCHAR},
            </if>
            <if test="record.messageId != null" >
                message_id = #{record.messageId,jdbcType=VARCHAR},
            </if>
        </set>
        <if test="_parameter != null" >
            <include refid="Update_By_Example_Where_Clause" />
        </if>
    </update>
    <update id="updateByExample" parameterType="map" >
        update t_order
        set id = #{record.id,jdbcType=INTEGER},
        name = #{record.name,jdbcType=VARCHAR},
        message_id = #{record.messageId,jdbcType=VARCHAR}
        <if test="_parameter != null" >
            <include refid="Update_By_Example_Where_Clause" />
        </if>
    </update>
    <update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.Order" >
        update t_order
        <set >
            <if test="name != null" >
                name = #{name,jdbcType=VARCHAR},
            </if>
            <if test="messageId != null" >
                message_id = #{messageId,jdbcType=VARCHAR},
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
    <update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.Order" >
    update t_order
    set name = #{name,jdbcType=VARCHAR},
      message_id = #{messageId,jdbcType=VARCHAR}
    where id = #{id,jdbcType=INTEGER}
  </update>
</mapper>

 

package com.imooc.mq.entity;

import java.util.Date;

/**
 * @Title: BrokerMessageLog
 * @Description: 消息記錄
 * @date 2019/1/2214:29
 */
public class BrokerMessageLog {
    private String messageId;

    private String message;

    private Integer tryCount;

    private String status;

    private Date nextRetry;

    private Date createTime;

    private Date updateTime;

    public BrokerMessageLog() {
    }

    public BrokerMessageLog(String messageId, String message, Integer tryCount, String status, Date nextRetry, Date createTime, Date updateTime) {
        this.messageId = messageId;
        this.message = message;
        this.tryCount = tryCount;
        this.status = status;
        this.nextRetry = nextRetry;
        this.createTime = createTime;
        this.updateTime = updateTime;
    }

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Integer getTryCount() {
        return tryCount;
    }

    public void setTryCount(Integer tryCount) {
        this.tryCount = tryCount;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public Date getNextRetry() {
        return nextRetry;
    }

    public void setNextRetry(Date nextRetry) {
        this.nextRetry = nextRetry;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}
--------------------------------------------------------------
package com.imooc.mq.entity;

import java.io.Serializable;

/**
 * @Title: Order
 * @Description: 訂單
 * @date 2019/1/2210:18
 */
public class Order implements Serializable {
    private String id;
    private String name;
    //存儲消息發送的唯一標識
    private String messageId;

    public Order() {
    }

    public Order(String id, String name, String messageId) {
        this.id = id;
        this.name = name;
        this.messageId = messageId;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

}

 


 

現在開始按照設計思路寫實現代碼:

  1、首先我們把最核心了生產者寫好,生產者組成有基本的消息投遞,和監聽

package com.imooc.mq.producer;


import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Title: RabbitOrderSender
 * @Description: 消息發送
 * @date 2019/1/2214:52
 */
@Component
public class RabbitOrderSender {
    private static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;

    /**
     * Broker應答后,會調用該方法區獲取應答結果
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            logger.info("correlationData:"+correlationData);
            String messageId = correlationData.getId();
            if (ack){
                //如果返回成功,則進行更新
                brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constans.ORDER_SEND_SUCCESS,new Date());
            }else {
                //失敗進行操作:根據具體失敗原因選擇重試或補償等手段
                logger.error("異常處理"+cause);
            }
        }
    };

    /**
     * 發送消息方法調用: 構建自定義對象消息
     * @param order
     * @throws Exception
     */
    public void sendOrder(Order order) throws Exception {
        // 通過實現 ConfirmCallback 接口,消息發送到 Broker 后觸發回調,確認消息是否到達 Broker 服務器,也就是只確認是否正確到達 Exchange 中
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //消息唯一ID
        CorrelationData correlationData = new CorrelationData(order.getMessageId());
        rabbitTemplate.convertAndSend("order-exchange1", "order.ABC", order, correlationData);
    }
}

  2、將定時任務邏輯寫好

package com.imooc.mq.task;

import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.BrokerMessageLog;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import com.imooc.mq.producer.RabbitOrderSender;
import com.imooc.mq.utils.FastJsonConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;

/**
 * @Title: RetryMessageTasker
 * @Description: 定時任務
 * @date 2019/1/2215:45
 */
@Component
public class RetryMessageTasker {
    private static Logger logger = LoggerFactory.getLogger(RetryMessageTasker.class);
    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;

    /**
     * 定時任務
     */
    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void reSend(){
        logger.info("-----------定時任務開始-----------");
        //抽取消息狀態為0且已經超時的消息集合
        List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();
        list.forEach(messageLog -> {
            //投遞三次以上的消息
            if(messageLog.getTryCount() >= 3){
                //更新失敗的消息
                brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constans.ORDER_SEND_FAILURE, new Date());
            } else {
                // 重試投遞消息,將重試次數遞增
                brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(),  new Date());
                Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class);
                try {
                    rabbitOrderSender.sendOrder(reSendOrder);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("-----------異常處理-----------");
                }
            }
        });
    }

}

  3、寫好消費者的邏輯,直接用上一篇中的消費者代碼,修改對應的exchange、queue、路由key就好

package com.imooc.mq.consumer;

import com.imooc.mq.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * @Title: OrderReceiver
 * @Description: 消費
 * @date 2019/1/2211:03
 */
@Component
public class OrderReceiver {
    /**
     * @RabbitListener 消息監聽,可配置交換機、隊列、路由key
     * 該注解會創建隊列和交互機 並建立綁定關系
     * @RabbitHandler 標識此方法如果有消息過來,消費者要調用這個方法
     * @Payload 消息體
     * @Headers 消息頭
     * @param order
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue1",declare = "true"),
            exchange = @Exchange(name = "order-exchange1",declare = "true",type = "topic"),
            key = "order.ABC"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers,
                               Channel channel) throws Exception{
        //消費者操作
        System.out.println("------收到消息,開始消費------");
        System.out.println("訂單ID:"+order.getId());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //現在是手動確認消息 ACK 
        channel.basicAck(deliveryTag,false);
    }
}

  4、業務邏輯

package com.imooc.mq.service;

import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.BrokerMessageLog;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import com.imooc.mq.mapper.OrderMapper;
import com.imooc.mq.producer.RabbitOrderSender;
import com.imooc.mq.utils.DateUtils;
import com.imooc.mq.utils.FastJsonConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @Title: OrderService
 * @Description: 業務實現
 * @date 2019/1/2215:41
 */
@Service
public class OrderService {
    private static Logger logger = LoggerFactory.getLogger(OrderService.class);
    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;

    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    public void createOrder(Order order)  {
        try {
            // 使用當前時間當做訂單創建時間(為了模擬一下簡化)
            Date orderTime = new Date();
            // 插入業務數據
            orderMapper.insert(order);
            // 插入消息記錄表數據
            BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
            // 消息唯一ID
            brokerMessageLog.setMessageId(order.getMessageId());
            // 保存消息整體 轉為JSON 格式存儲入庫
            brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order));
            // 設置消息狀態為0 表示發送中
            brokerMessageLog.setStatus("0");
            // 設置消息未確認超時時間窗口為 一分鍾
            brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constans.ORDER_TIMEOUT));
            brokerMessageLog.setCreateTime(new Date());
            brokerMessageLog.setUpdateTime(new Date());
            brokerMessageLogMapper.insertSelective(brokerMessageLog);
            // 發送消息
            rabbitOrderSender.sendOrder(order);
        } catch (Exception e) {
            logger.error("訂單業務異常{}",e);
        }
    }
}

  5、測試

 /**
     * 測試訂單創建
     */
    @Test
    public void createOrder(){
        Order order = new Order();
        order.setId("201901228");
        order.setName("測試訂單");
        order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
        try {
            orderService.createOrder(order);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

  先啟動消費者服務、再啟動生產者服務讓定時任務跑起來,最后啟動測試方法。消息被消費成功后,日志記錄狀態被修改為1。測試消息重投的話需要制造一些異常情況,比如修改消費者端跟exchange,生產者找不到該交互機,拿不到回調,就會重試投遞。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM