本系列是學習SpringBoot整合RabbitMQ的練手,包含服務安裝,RabbitMQ整合SpringBoot2.x,消息可靠性投遞實現等三篇博客。
學習路徑:https://www.imooc.com/learn/1042 RabbitMQ消息中間件極速入門與實戰
項目源碼:https://github.com/ZbLeaning/Boot-RabbitMQ
設計一個消息可靠性投遞方案,服務結構如下:
組成:
Sender+Confirm Listener :組成消息的生產者
MQ Broker:消息的消費者,包含具體的MQ服務
BIZ DB:業務數據數據庫
MSG DB:消息日志記錄數據庫(0:發送中、1:發送成功、2:發送失敗)
思路:
以最常見的創建訂單業務來舉例,假設訂單創建成功后需要去發短信通知用戶
1、先完成訂單業務數據的存儲,並記錄這條操作日志(發送中)
2、生產者發送一條消息到消費者(異步)
3、消費者成功消費后給給Confirm listener發送應答
4、監聽收到消息確認成功后,對消息日志表操作,修改之前的日志狀態(發送成功)
5、在消費端返回應答的過程中,可能發生網絡異常導致生產者未收到應答消息,因此需要一個定時任務去撈取狀態是發送中並已經超時的消息集合
6、將撈取到的日志對應的消息,進行重發
7、定時任務判斷設置的消息最大重投次數,大於最大重投次數就判斷消息發送失敗,更新日志記錄狀態(發送失敗)
項目搭建
Durid數據源配置文件
//druid.properties ##下面為連接池的補充設置,應用到上面所有數據源中 #初始化大小,最小,最大 druid.initialSize=5 druid.minIdle=10 druid.maxActive=300 #配置獲取連接等待超時的時間 druid.maxWait=60000 #配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒 druid.timeBetweenEvictionRunsMillis=60000 #配置一個連接在池中最小生存的時間,單位是毫秒 druid.minEvictableIdleTimeMillis=300000 druid.validationQuery=SELECT 1 FROM DUAL druid.testWhileIdle=true druid.testOnBorrow=false druid.testOnReturn=false #打開PSCache,並且指定每個連接上PSCache的大小 druid.poolPreparedStatements=true druid.maxPoolPreparedStatementPerConnectionSize=20 #配置監控統計攔截的filters,去掉后監控界面sql無法統計,'wall'用於防火牆 druid.filters=stat,wall,log4j #通過connectProperties屬性來打開mergeSql功能;慢SQL記錄 druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 #合並多個DruidDataSource的監控數據 druid.useGlobalDataSourceStat=true
添加相應的數據源配置類、定時任務配置類、常量類
package com.imooc.mq.config.database; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.stereotype.Component; /** * @Title: DruidDataSourceSettings * @Description: Druid數據源讀取配置 * @date 2019/1/2214:31 */ @Component @ConfigurationProperties(prefix = "spring.datasource") @PropertySource("classpath:druid.properties") public class DruidDataSourceSettings { private String driverClassName; private String url; private String username; private String password; @Value("${druid.initialSize}") private int initialSize; @Value("${druid.minIdle}") private int minIdle; @Value("${druid.maxActive}") private int maxActive; @Value("${druid.timeBetweenEvictionRunsMillis}") private long timeBetweenEvictionRunsMillis; @Value("${druid.minEvictableIdleTimeMillis}") private long minEvictableIdleTimeMillis; @Value("${druid.validationQuery}") private String validationQuery; @Value("${druid.testWhileIdle}") private boolean testWhileIdle; @Value("${druid.testOnBorrow}") private boolean testOnBorrow; @Value("${druid.testOnReturn}") private boolean testOnReturn; @Value("${druid.poolPreparedStatements}") private boolean poolPreparedStatements; @Value("${druid.maxPoolPreparedStatementPerConnectionSize}") private int maxPoolPreparedStatementPerConnectionSize; @Value("${druid.filters}") private String filters; @Value("${druid.connectionProperties}") private String connectionProperties; @Bean public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){ return new PropertySourcesPlaceholderConfigurer(); } public String getDriverClassName() { return driverClassName; } public void setDriverClassName(String driverClassName) { this.driverClassName = driverClassName; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getInitialSize() { return initialSize; } public void setInitialSize(int initialSize) { this.initialSize = initialSize; } public int getMinIdle() { return minIdle; } public void setMinIdle(int minIdle) { this.minIdle = minIdle; } public int getMaxActive() { return maxActive; } public void setMaxActive(int maxActive) { this.maxActive = maxActive; } public long getTimeBetweenEvictionRunsMillis() { return timeBetweenEvictionRunsMillis; } public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; } public long getMinEvictableIdleTimeMillis() { return minEvictableIdleTimeMillis; } public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; } public String getValidationQuery() { return validationQuery; } public void setValidationQuery(String validationQuery) { this.validationQuery = validationQuery; } public boolean isTestWhileIdle() { return testWhileIdle; } public void setTestWhileIdle(boolean testWhileIdle) { this.testWhileIdle = testWhileIdle; } public boolean isTestOnBorrow() { return testOnBorrow; } public void setTestOnBorrow(boolean testOnBorrow) { this.testOnBorrow = testOnBorrow; } public boolean isTestOnReturn() { return testOnReturn; } public void setTestOnReturn(boolean testOnReturn) { this.testOnReturn = testOnReturn; } public boolean isPoolPreparedStatements() { return poolPreparedStatements; } public void setPoolPreparedStatements(boolean poolPreparedStatements) { this.poolPreparedStatements = poolPreparedStatements; } public int getMaxPoolPreparedStatementPerConnectionSize() { return maxPoolPreparedStatementPerConnectionSize; } public void setMaxPoolPreparedStatementPerConnectionSize( int maxPoolPreparedStatementPerConnectionSize) { this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize; } public String getFilters() { return filters; } public void setFilters(String filters) { this.filters = filters; } public String getConnectionProperties() { return connectionProperties; } public void setConnectionProperties(String connectionProperties) { this.connectionProperties = connectionProperties; } }
package com.imooc.mq.config.database; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; import java.sql.SQLException; import com.alibaba.druid.pool.DruidDataSource; /** * @Title: DruidDataSourceConfig * @Description: Druid數據源初始化 * * EnableTransactionManagement 開啟事務 * @date 2019/1/2214:35 */ @Configuration @EnableTransactionManagement public class DruidDataSourceConfig { private static Logger logger = LoggerFactory.getLogger(com.imooc.mq.config.database.DruidDataSourceConfig.class); //注入數據源配置信息 @Autowired private DruidDataSourceSettings druidSettings; public static String DRIVER_CLASSNAME; @Bean public static PropertySourcesPlaceholderConfigurer propertyConfigure() { return new PropertySourcesPlaceholderConfigurer(); } /** * 創建DataSource * @return * @throws SQLException */ @Bean public DataSource dataSource() throws SQLException { DruidDataSource ds = new DruidDataSource(); ds.setDriverClassName(druidSettings.getDriverClassName()); DRIVER_CLASSNAME = druidSettings.getDriverClassName(); ds.setUrl(druidSettings.getUrl()); ds.setUsername(druidSettings.getUsername()); ds.setPassword(druidSettings.getPassword()); ds.setInitialSize(druidSettings.getInitialSize()); ds.setMinIdle(druidSettings.getMinIdle()); ds.setMaxActive(druidSettings.getMaxActive()); ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis()); ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis()); ds.setValidationQuery(druidSettings.getValidationQuery()); ds.setTestWhileIdle(druidSettings.isTestWhileIdle()); ds.setTestOnBorrow(druidSettings.isTestOnBorrow()); ds.setTestOnReturn(druidSettings.isTestOnReturn()); ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements()); ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize()); ds.setFilters(druidSettings.getFilters()); ds.setConnectionProperties(druidSettings.getConnectionProperties()); logger.info(" druid datasource config : {} ", ds); return ds; } /** * 開啟事務 * @return * @throws Exception */ @Bean public PlatformTransactionManager transactionManager() throws Exception { DataSourceTransactionManager txManager = new DataSourceTransactionManager(); txManager.setDataSource(dataSource()); return txManager; } }
package com.imooc.mq.config.database; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; /** * @Title: MybatisDataSourceConfig * @Description: 整合mybatis和Druid * @date 2019/1/2214:39 */ @Configuration public class MybatisDataSourceConfig { @Autowired private DataSource dataSource; @Bean(name="sqlSessionFactory") public SqlSessionFactory sqlSessionFactoryBean() { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); // 添加XML目錄 ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); try { bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml")); SqlSessionFactory sqlSessionFactory = bean.getObject(); sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE); return sqlSessionFactory; } catch (Exception e) { throw new RuntimeException(e); } } @Bean public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { return new SqlSessionTemplate(sqlSessionFactory); } }
package com.imooc.mq.config.database; import org.mybatis.spring.mapper.MapperScannerConfigurer; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Title: MybatisMapperScanerConfig * @Description: 掃碼Mybatis * @AutoConfigureAfter(MybatisDataSourceConfig.class) 先加載數據源類,再加載該類 * @date 2019/1/2214:43 */ @Configuration @AutoConfigureAfter(MybatisDataSourceConfig.class) public class MybatisMapperScanerConfig { @Bean public MapperScannerConfigurer mapperScannerConfigurer() { MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer(); mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory"); mapperScannerConfigurer.setBasePackage("com.imooc.mq.mapper"); return mapperScannerConfigurer; } }
package com.imooc.mq.config.task; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * @Title: TaskSchedulerConfig * @Description: 定時任務配置 * @date 2019/1/2214:46 */ @Configuration @EnableScheduling //啟動定時任務 public class TaskSchedulerConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskScheduler()); } /** * 定時任務線程池 * @return */ @Bean(destroyMethod = "shutdown") public Executor taskScheduler(){ return Executors.newScheduledThreadPool(100); } }
package com.imooc.mq.constant; /** * @Title: Constans * @Description: 常量 * @date 2019/1/2214:50 */ public class Constans { /** * 發送中 */ public static final String ORDER_SENDING = "0"; /** * 發送成功 */ public static final String ORDER_SEND_SUCCESS = "1"; /** * 發送失敗 */ public static final String ORDER_SEND_FAILURE = "2"; /** * 分鍾超時單位:min */ public static final int ORDER_TIMEOUT = 1; }
相應的mapper接口和mapper.xml文件配置
package com.imooc.mq.mapper; import com.imooc.mq.entity.BrokerMessageLog; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.Date; import java.util.List; /** * @Title: BrokerMessageLogMapper * @Description: 消息記錄接口 * @date 2019/1/2214:45 */ @Repository public interface BrokerMessageLogMapper { /** * 查詢消息狀態為0(發送中) 且已經超時的消息集合 * @return */ List<BrokerMessageLog> query4StatusAndTimeoutMessage(); /** * 重新發送統計count發送次數 +1 * @param messageId * @param updateTime */ void update4ReSend(@Param("messageId")String messageId, @Param("updateTime") Date updateTime); /** * 更新最終消息發送結果 成功 or 失敗 * @param messageId * @param status * @param updateTime */ void changeBrokerMessageLogStatus(@Param("messageId")String messageId, @Param("status")String status, @Param("updateTime")Date updateTime); int insertSelective(BrokerMessageLog record); } ------------------------------------------------------------------ package com.imooc.mq.mapper; import com.imooc.mq.entity.Order; import org.springframework.stereotype.Repository; /** * @Title: OrderMapper * @Description: 訂單接口 * @date 2019/1/2214:45 */ @Repository public interface OrderMapper { int insert(Order record); int deleteByPrimaryKey(Integer id); int insertSelective(Order record); Order selectByPrimaryKey(Integer id); int updateByPrimaryKeySelective(Order record); int updateByPrimaryKey(Order record); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.imooc.mq.mapper.BrokerMessageLogMapper" > <resultMap id="BaseResultMap" type="com.imooc.mq.entity.BrokerMessageLog" > <id column="message_id" property="messageId" jdbcType="VARCHAR" /> <result column="message" property="message" jdbcType="VARCHAR" /> <result column="try_count" property="tryCount" jdbcType="INTEGER" /> <result column="status" property="status" jdbcType="VARCHAR" /> <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" /> <result column="create_time" property="createTime" jdbcType="TIMESTAMP" /> <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" /> </resultMap> <sql id="Base_Column_List" > message_id, message, try_count, status, next_retry, create_time, update_time </sql> <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" > select <include refid="Base_Column_List" /> from broker_message_log where message_id = #{messageId,jdbcType=VARCHAR} </select> <delete id="deleteByPrimaryKey" parameterType="java.lang.String" > delete from broker_message_log where message_id = #{messageId,jdbcType=VARCHAR} </delete> <insert id="insert" parameterType="com.imooc.mq.entity.BrokerMessageLog" > insert into broker_message_log (message_id, message, try_count, status, next_retry, create_time, update_time) values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR}, #{tryCount,jdbcType=INTEGER}, #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}) </insert> <insert id="insertSelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" > insert into broker_message_log <trim prefix="(" suffix=")" suffixOverrides="," > <if test="messageId != null" > message_id, </if> <if test="message != null" > message, </if> <if test="tryCount != null" > try_count, </if> <if test="status != null" > status, </if> <if test="nextRetry != null" > next_retry, </if> <if test="createTime != null" > create_time, </if> <if test="updateTime != null" > update_time, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides="," > <if test="messageId != null" > #{messageId,jdbcType=VARCHAR}, </if> <if test="message != null" > #{message,jdbcType=VARCHAR}, </if> <if test="tryCount != null" > #{tryCount,jdbcType=INTEGER}, </if> <if test="status != null" > #{status,jdbcType=VARCHAR}, </if> <if test="nextRetry != null" > #{nextRetry,jdbcType=TIMESTAMP}, </if> <if test="createTime != null" > #{createTime,jdbcType=TIMESTAMP}, </if> <if test="updateTime != null" > #{updateTime,jdbcType=TIMESTAMP}, </if> </trim> </insert> <update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" > update broker_message_log <set > <if test="message != null" > message = #{message,jdbcType=VARCHAR}, </if> <if test="tryCount != null" > try_count = #{tryCount,jdbcType=INTEGER}, </if> <if test="status != null" > status = #{status,jdbcType=VARCHAR}, </if> <if test="nextRetry != null" > next_retry = #{nextRetry,jdbcType=TIMESTAMP}, </if> <if test="createTime != null" > create_time = #{createTime,jdbcType=TIMESTAMP}, </if> <if test="updateTime != null" > update_time = #{updateTime,jdbcType=TIMESTAMP}, </if> </set> where message_id = #{messageId,jdbcType=VARCHAR} </update> <update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.BrokerMessageLog" > update broker_message_log set message = #{message,jdbcType=VARCHAR}, try_count = #{tryCount,jdbcType=INTEGER}, status = #{status,jdbcType=VARCHAR}, next_retry = #{nextRetry,jdbcType=TIMESTAMP}, create_time = #{createTime,jdbcType=TIMESTAMP}, update_time = #{updateTime,jdbcType=TIMESTAMP} where message_id = #{messageId,jdbcType=VARCHAR} </update> <select id="query4StatusAndTimeoutMessage" resultMap="BaseResultMap"> <![CDATA[ select message_id, message, try_count, status, next_retry, create_time, update_time from broker_message_log bml where status = '0' and next_retry <= sysdate() ]]> </select> <update id="update4ReSend" > update broker_message_log bml set bml.try_count = bml.try_count + 1, bml.update_time = #{updateTime, jdbcType=TIMESTAMP} where bml.message_id = #{messageId,jdbcType=VARCHAR} </update> <update id="changeBrokerMessageLogStatus" > update broker_message_log bml set bml.status = #{status,jdbcType=VARCHAR}, bml.update_time = #{updateTime, jdbcType=TIMESTAMP} where bml.message_id = #{messageId,jdbcType=VARCHAR} </update> </mapper> ------------------------------------------------------------- <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.imooc.mq.mapper.OrderMapper" > <resultMap id="BaseResultMap" type="com.imooc.mq.entity.Order" > <id column="id" property="id" jdbcType="INTEGER" /> <result column="name" property="name" jdbcType="VARCHAR" /> <result column="message_id" property="messageId" jdbcType="VARCHAR" /> </resultMap> <sql id="Example_Where_Clause" > <where > <foreach collection="oredCriteria" item="criteria" separator="or" > <if test="criteria.valid" > <trim prefix="(" suffix=")" prefixOverrides="and" > <foreach collection="criteria.criteria" item="criterion" > <choose > <when test="criterion.noValue" > and ${criterion.condition} </when> <when test="criterion.singleValue" > and ${criterion.condition} #{criterion.value} </when> <when test="criterion.betweenValue" > and ${criterion.condition} #{criterion.value} and #{criterion.secondValue} </when> <when test="criterion.listValue" > and ${criterion.condition} <foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," > #{listItem} </foreach> </when> </choose> </foreach> </trim> </if> </foreach> </where> </sql> <sql id="Update_By_Example_Where_Clause" > <where > <foreach collection="example.oredCriteria" item="criteria" separator="or" > <if test="criteria.valid" > <trim prefix="(" suffix=")" prefixOverrides="and" > <foreach collection="criteria.criteria" item="criterion" > <choose > <when test="criterion.noValue" > and ${criterion.condition} </when> <when test="criterion.singleValue" > and ${criterion.condition} #{criterion.value} </when> <when test="criterion.betweenValue" > and ${criterion.condition} #{criterion.value} and #{criterion.secondValue} </when> <when test="criterion.listValue" > and ${criterion.condition} <foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," > #{listItem} </foreach> </when> </choose> </foreach> </trim> </if> </foreach> </where> </sql> <sql id="Base_Column_List" > id, name, message_id </sql> <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" > select <include refid="Base_Column_List" /> from t_order where id = #{id,jdbcType=INTEGER} </select> <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" > delete from t_order where id = #{id,jdbcType=INTEGER} </delete> <insert id="insert" parameterType="com.imooc.mq.entity.Order" > insert into t_order (id, name, message_id ) values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{messageId,jdbcType=VARCHAR} ) </insert> <insert id="insertSelective" parameterType="com.imooc.mq.entity.Order" > insert into t_order <trim prefix="(" suffix=")" suffixOverrides="," > <if test="id != null" > id, </if> <if test="name != null" > name, </if> <if test="messageId != null" > message_id, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides="," > <if test="id != null" > #{id,jdbcType=INTEGER}, </if> <if test="name != null" > #{name,jdbcType=VARCHAR}, </if> <if test="messageId != null" > #{messageId,jdbcType=VARCHAR}, </if> </trim> </insert> <update id="updateByExampleSelective" parameterType="map" > update t_order <set > <if test="record.id != null" > id = #{record.id,jdbcType=INTEGER}, </if> <if test="record.name != null" > name = #{record.name,jdbcType=VARCHAR}, </if> <if test="record.messageId != null" > message_id = #{record.messageId,jdbcType=VARCHAR}, </if> </set> <if test="_parameter != null" > <include refid="Update_By_Example_Where_Clause" /> </if> </update> <update id="updateByExample" parameterType="map" > update t_order set id = #{record.id,jdbcType=INTEGER}, name = #{record.name,jdbcType=VARCHAR}, message_id = #{record.messageId,jdbcType=VARCHAR} <if test="_parameter != null" > <include refid="Update_By_Example_Where_Clause" /> </if> </update> <update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.Order" > update t_order <set > <if test="name != null" > name = #{name,jdbcType=VARCHAR}, </if> <if test="messageId != null" > message_id = #{messageId,jdbcType=VARCHAR}, </if> </set> where id = #{id,jdbcType=INTEGER} </update> <update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.Order" > update t_order set name = #{name,jdbcType=VARCHAR}, message_id = #{messageId,jdbcType=VARCHAR} where id = #{id,jdbcType=INTEGER} </update> </mapper>
package com.imooc.mq.entity; import java.util.Date; /** * @Title: BrokerMessageLog * @Description: 消息記錄 * @date 2019/1/2214:29 */ public class BrokerMessageLog { private String messageId; private String message; private Integer tryCount; private String status; private Date nextRetry; private Date createTime; private Date updateTime; public BrokerMessageLog() { } public BrokerMessageLog(String messageId, String message, Integer tryCount, String status, Date nextRetry, Date createTime, Date updateTime) { this.messageId = messageId; this.message = message; this.tryCount = tryCount; this.status = status; this.nextRetry = nextRetry; this.createTime = createTime; this.updateTime = updateTime; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public Integer getTryCount() { return tryCount; } public void setTryCount(Integer tryCount) { this.tryCount = tryCount; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public Date getNextRetry() { return nextRetry; } public void setNextRetry(Date nextRetry) { this.nextRetry = nextRetry; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } } -------------------------------------------------------------- package com.imooc.mq.entity; import java.io.Serializable; /** * @Title: Order * @Description: 訂單 * @date 2019/1/2210:18 */ public class Order implements Serializable { private String id; private String name; //存儲消息發送的唯一標識 private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
現在開始按照設計思路寫實現代碼:
1、首先我們把最核心了生產者寫好,生產者組成有基本的消息投遞,和監聽
package com.imooc.mq.producer; import com.imooc.mq.constant.Constans; import com.imooc.mq.entity.Order; import com.imooc.mq.mapper.BrokerMessageLogMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Title: RabbitOrderSender * @Description: 消息發送 * @date 2019/1/2214:52 */ @Component public class RabbitOrderSender { private static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * Broker應答后,會調用該方法區獲取應答結果 */ final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("correlationData:"+correlationData); String messageId = correlationData.getId(); if (ack){ //如果返回成功,則進行更新 brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constans.ORDER_SEND_SUCCESS,new Date()); }else { //失敗進行操作:根據具體失敗原因選擇重試或補償等手段 logger.error("異常處理"+cause); } } }; /** * 發送消息方法調用: 構建自定義對象消息 * @param order * @throws Exception */ public void sendOrder(Order order) throws Exception { // 通過實現 ConfirmCallback 接口,消息發送到 Broker 后觸發回調,確認消息是否到達 Broker 服務器,也就是只確認是否正確到達 Exchange 中 rabbitTemplate.setConfirmCallback(confirmCallback); //消息唯一ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); rabbitTemplate.convertAndSend("order-exchange1", "order.ABC", order, correlationData); } }
2、將定時任務邏輯寫好
package com.imooc.mq.task; import com.imooc.mq.constant.Constans; import com.imooc.mq.entity.BrokerMessageLog; import com.imooc.mq.entity.Order; import com.imooc.mq.mapper.BrokerMessageLogMapper; import com.imooc.mq.producer.RabbitOrderSender; import com.imooc.mq.utils.FastJsonConvertUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; /** * @Title: RetryMessageTasker * @Description: 定時任務 * @date 2019/1/2215:45 */ @Component public class RetryMessageTasker { private static Logger logger = LoggerFactory.getLogger(RetryMessageTasker.class); @Autowired private RabbitOrderSender rabbitOrderSender; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 定時任務 */ @Scheduled(initialDelay = 5000, fixedDelay = 10000) public void reSend(){ logger.info("-----------定時任務開始-----------"); //抽取消息狀態為0且已經超時的消息集合 List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage(); list.forEach(messageLog -> { //投遞三次以上的消息 if(messageLog.getTryCount() >= 3){ //更新失敗的消息 brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constans.ORDER_SEND_FAILURE, new Date()); } else { // 重試投遞消息,將重試次數遞增 brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(), new Date()); Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class); try { rabbitOrderSender.sendOrder(reSendOrder); } catch (Exception e) { e.printStackTrace(); logger.error("-----------異常處理-----------"); } } }); } }
3、寫好消費者的邏輯,直接用上一篇中的消費者代碼,修改對應的exchange、queue、路由key就好
package com.imooc.mq.consumer; import com.imooc.mq.entity.Order; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** * @Title: OrderReceiver * @Description: 消費 * @date 2019/1/2211:03 */ @Component public class OrderReceiver { /** * @RabbitListener 消息監聽,可配置交換機、隊列、路由key * 該注解會創建隊列和交互機 並建立綁定關系 * @RabbitHandler 標識此方法如果有消息過來,消費者要調用這個方法 * @Payload 消息體 * @Headers 消息頭 * @param order */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue1",declare = "true"), exchange = @Exchange(name = "order-exchange1",declare = "true",type = "topic"), key = "order.ABC" )) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws Exception{ //消費者操作 System.out.println("------收到消息,開始消費------"); System.out.println("訂單ID:"+order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //現在是手動確認消息 ACK channel.basicAck(deliveryTag,false); } }
4、業務邏輯
package com.imooc.mq.service; import com.imooc.mq.constant.Constans; import com.imooc.mq.entity.BrokerMessageLog; import com.imooc.mq.entity.Order; import com.imooc.mq.mapper.BrokerMessageLogMapper; import com.imooc.mq.mapper.OrderMapper; import com.imooc.mq.producer.RabbitOrderSender; import com.imooc.mq.utils.DateUtils; import com.imooc.mq.utils.FastJsonConvertUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; /** * @Title: OrderService * @Description: 業務實現 * @date 2019/1/2215:41 */ @Service public class OrderService { private static Logger logger = LoggerFactory.getLogger(OrderService.class); @Autowired private OrderMapper orderMapper; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; @Autowired private RabbitOrderSender rabbitOrderSender; public void createOrder(Order order) { try { // 使用當前時間當做訂單創建時間(為了模擬一下簡化) Date orderTime = new Date(); // 插入業務數據 orderMapper.insert(order); // 插入消息記錄表數據 BrokerMessageLog brokerMessageLog = new BrokerMessageLog(); // 消息唯一ID brokerMessageLog.setMessageId(order.getMessageId()); // 保存消息整體 轉為JSON 格式存儲入庫 brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order)); // 設置消息狀態為0 表示發送中 brokerMessageLog.setStatus("0"); // 設置消息未確認超時時間窗口為 一分鍾 brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constans.ORDER_TIMEOUT)); brokerMessageLog.setCreateTime(new Date()); brokerMessageLog.setUpdateTime(new Date()); brokerMessageLogMapper.insertSelective(brokerMessageLog); // 發送消息 rabbitOrderSender.sendOrder(order); } catch (Exception e) { logger.error("訂單業務異常{}",e); } } }
5、測試
/** * 測試訂單創建 */ @Test public void createOrder(){ Order order = new Order(); order.setId("201901228"); order.setName("測試訂單"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString()); try { orderService.createOrder(order); } catch (Exception e) { e.printStackTrace(); } }
先啟動消費者服務、再啟動生產者服務讓定時任務跑起來,最后啟動測試方法。消息被消費成功后,日志記錄狀態被修改為1。測試消息重投的話需要制造一些異常情況,比如修改消費者端跟exchange,生產者找不到該交互機,拿不到回調,就會重試投遞。