八.利用springAMQP實現異步消息隊列的日志管理


  經過前段時間的學習和鋪墊,已經對spring amqp有了大概的了解。俗話說學以致用,今天就利用springAMQP來完成一個日志管理模塊。大概的需求是這樣的:系統中有很多地方需要記錄操作日志,比如登錄、退出、查詢等,如果將記錄日志這個操作摻雜在主要的業務邏輯當中,勢必會增加響應的時間,對客戶來說是一種不好的體驗。所以想到用異步消息隊列來進行優化。系統處理完主要業務邏輯之后,將日志的相關實體發布到特定Queue下,然后設置一個監聽器,監該Queue的消息並做處理。客戶不用等待日志的處理就可直接返回。

  大概的業務流程如下圖所示。

  

 

  1.首先建立日志的數據表和實體,數據表起名為t_log。實體如下。主要包含操作者,操作的事件,操作時間等幾個主要參數。  

package com.xdx.entity;

import java.util.Date;

public class TLog {
    private Integer logId;

    private String operator;

    private String event;

    private Date createTime;

    private Integer isDel;

    public TLog(String operator, String event) {
        this.operator = operator;
        this.event = event;
    }

    public TLog() {
    }

    public Integer getLogId() {
        return logId;
    }

    public void setLogId(Integer logId) {
        this.logId = logId;
    }

    public String getOperator() {
        return operator;
    }

    public void setOperator(String operator) {
        this.operator = operator == null ? null : operator.trim();
    }

    public String getEvent() {
        return event;
    }

    public void setEvent(String event) {
        this.event = event == null ? null : event.trim();
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Integer getIsDel() {
        return isDel;
    }

    public void setIsDel(Integer isDel) {
        this.isDel = isDel;
    }
}

  2.編寫保存日志的方法,很簡單,就是一個數據庫的save過程。

  

package com.xdx.service;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;

import com.xdx.dao.BaseDao;
import com.xdx.entity.TLog;

@Service
public class LogService {
    @Resource(name = "baseDao")
    private BaseDao<TLog, Integer> baseDao;

    public Integer saveLog(TLog log) {
        Integer result = baseDao.addT("TLogMapper.insertSelective", log);
        return result;
    }
}

  其中的TLogMapper.insertSelective代碼如下:

 <insert id="insertSelective" parameterType="com.xdx.entity.TLog" >
    insert into t_log
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="logId != null" >
        log_id,
      </if>
      <if test="operator != null" >
        operator,
      </if>
      <if test="event != null" >
        event,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="isDel != null" >
        is_del,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="logId != null" >
        #{logId,jdbcType=INTEGER},
      </if>
      <if test="operator != null" >
        #{operator,jdbcType=VARCHAR},
      </if>
      <if test="event != null" >
        #{event,jdbcType=VARCHAR},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="isDel != null" >
        #{isDel,jdbcType=INTEGER},
      </if>
    </trim>
  </insert>

  3.接下來就跟我們的spring amqp有關了,首先要在pom.xml中引入相關的jar包。 

    <!-- spring-rabbitMQ -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <!-- spring -amqp -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-amqp</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

  4.編寫主配置文件。在項目新建一個com.xdx.spring_rabbit包。關於rabbit的所有代碼都寫在這邊。

  編寫一個抽象的rabbit的主配置文件,之所以這樣做是為了以后擴展方便,讓不同的異步消息隊列的業務可以繼承並擴展它。如下所示。

  主配置的文件主要是配置了連接Rabbit服務的基本信息,並且指定了消息轉換器是json轉換器。

package com.xdx.spring_rabbit;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;

/**
 * 抽象類,rabbitMQ的主配置類
 * 
 * @author xdx
 *
 */
public abstract class AbstractRabbitConfiguration {

    @Value("${amqp.port:5672}")
    private int port = 5672;

    protected abstract void configureRabbitTemplate(RabbitTemplate template);

    /**
     * 由於connectionFactory會與項目中的redis的connectionFactory命名沖突,
     * 所以這邊改名為rabbit_connectionFactory
     * 
     * @return
     */
    @Bean
    public ConnectionFactory rabbit_connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                "192.168.1.195");
        connectionFactory.setUsername("xdx");
        connectionFactory.setPassword("xxxx");
        connectionFactory.setPort(port);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(rabbit_connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbit_connectionFactory());
        return rabbitAdmin;
    }
}

  5.編寫我們這個日志項目需要用到的配置文件,繼承上述的抽象類,在該配置文件中,我們具體指定Exchange,RouteKey,Queue,Binding以及監聽器這些要素。

package com.xdx.spring_rabbit;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 日志管理的Rabbit配置類的具體實現類
 * 
 * @author xdx
 *
 */
@Configuration
public class LogRabbitConfiguration extends AbstractRabbitConfiguration {
    protected static String LOG_EXCHANGE_NAME = "warrior.exchange.log";// topic
                                                                        // exchange的名稱
    protected static String LOG_QUEUE_NAME = "warrior.queue.log";// 接收消息的queue
    protected static String LOG_ROUTING_KEY = LOG_QUEUE_NAME;
    @Autowired
    private LogRabbitRecHandler logRabbitRecHandler;// 監聽器的委托類,委托其處理接收到的消息

    /**
     * 設置Exchange為LOG_EXCHANGE_NAME,RoutingKey為LOG_ROUTING_KEY,這樣將信息發送到
     * Exchange為LOG_EXCHANGE_NAME,RouteKey為LOG_ROUTING_KEY的通道中
     */
    @Override
    protected void configureRabbitTemplate(RabbitTemplate template) {
        System.err.println("創建一個RabbitTemplate,名字是 " + template);
        template.setExchange(LOG_EXCHANGE_NAME);
        template.setRoutingKey(LOG_ROUTING_KEY);
    }

    /**
     * 用於接收日志消息的Queue,默認綁定自己的名稱
     * 
     * @return
     */
    @Bean
    public Queue logQueue() {
        return new Queue(LOG_QUEUE_NAME);
    }

    /**
     * 定義一個topExchange
     * 
     * @return
     */
    @Bean
    public TopicExchange logExchange() {
        return new TopicExchange(LOG_EXCHANGE_NAME);
    }

    /**
     * 定義一個綁定日志接收的Queue的binding
     * 
     * @return
     */
    @Bean
    public Binding logQueueBinding() {
        return BindingBuilder.bind(logQueue()).to(logExchange())
                .with(LOG_ROUTING_KEY);
    }

    /**
     * 這個bean為監聽適配器,用於日志消息,並交由logRabbitRecHandler處理
     * 
     * @return
     */
    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(logRabbitRecHandler,
                jsonMessageConverter());
    }

    /**
     * 這個bean用於監聽服務端發過來的消息,監聽的Queue為logQueue(),
     * 因為該Queue綁定了logExchange和logRouteKey, 所以它可以接收到我們發送的日志消息
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
                rabbit_connectionFactory());
        container.setConcurrentConsumers(5);
        container.setQueues(logQueue());
        container.setMessageListener(messageListenerAdapter());
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return container;
    }
}

  6.封裝發送消息的接口,如下所示。這是一個泛型的接口,目的是為了傳入不同的消息類型。

package com.xdx.spring_rabbit;
/**
 * 定義一個泛型接口,用於發送消息,T為要發送的消息類型
 * @author xdx
 *
 * @param <T>
 */
public interface RabbitSend<T> {
    void send(T t);
}

  7.實現這個發送消息的接口。在這個實現類中,我們注入了之前生成的RabbitTemplate對象。用於發送消息。

package com.xdx.spring_rabbit;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import com.xdx.entity.TLog;

/**
 * 用於發送日志消息的通用實現類
 * 
 * @author xdx
 *
 */
@Component("logRabbitSend")
public class LogRabbitSend implements RabbitSend<TLog> {
    @Resource(name = "rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(TLog log) {
        rabbitTemplate.convertAndSend(log);
        System.err.println("發送消息:" + log);
    }
}

  8.封裝監聽器委托對象的接口,該接口用於處理監聽器監聽到的消息。同意是一個泛型的類,如下所示。

package com.xdx.spring_rabbit;

/**
 * 用於處理監聽到的消息的消息處理器接口,T為接收到的消息的類型
 * 
 * @author xdx
 *
 * @param <T>
 */
public interface RabbitRecHandler<T> {
    void handleMessage(T t);
}

  9.實現上述委托對象的接口,如下所示。在該接口中,我們注入了日志處理類的對象。用於儲存日志信息到數據庫。

package com.xdx.spring_rabbit;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import com.xdx.entity.TLog;
import com.xdx.service.LogService;
@Component("logRabbitRecHandler")
public class LogRabbitRecHandler implements RabbitRecHandler<TLog> {
    @Resource(name="logService")
    private LogService logService;

    @Override
    public void handleMessage(TLog log) {
        System.err.println("開始存儲日志"+log.getOperator()+","+log.getEvent());
        logService.saveLog(log);
    }
}

  10.最后,我們在具體的業務類中調用消息發送的接口,就可以實現日志消息的發送了。如下所示。

@Controller
public class AdminController {
   @Resource(name = "logRabbitSend")
    private LogRabbitSend logRabbitSend;
    
    @RequestMapping("admin")
    public ModelAndView admin(HttpSession session,String adminName, String password) throws Exception {
        List<Map<String,Object>>adminMap=adminService.getAllAdminMap();
        ModelAndView mv = new ModelAndView();
        //登錄操作的主要邏輯代碼……
        session.setAttribute("adminName", admin.getAdminName());
        session.setAttribute("realName", admin.getRealName());
       TLog log=new TLog(adminName, "登錄系統"); logRabbitSend.send(log); return mv;
    }
}

  運行我們的系統,我們先看看RabbitMQ的后台。看到了我們定義的Exchange和Queue等元素。

 

 

 

  運行AdmintController類中的admin方法,登錄系統,我們發現確實已經發送了消息,並且消息被監聽到,然后存儲到了數據庫。

  控制台打印出來的消息為:

  數據庫存入的記錄為:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM