JMS監聽Oracle AQ


  • 該文檔中,jdk版本1.8,java項目為maven構建的springboot項目,並使用了定時任務來做AQ監聽的重連功能,解決由於外部原因導致連接斷裂之后,需要手動重啟項目才能恢復連接的問題

  • github源碼位置

  • gitee源碼位置

一、創建隊列

1.1.管理員登錄執行

  • 管理員登錄,執行授權操作,oracle使用隊列需要單獨的授權,默認未開啟,須手動開啟,授權命令如下,username使用自己的用戶名即可
GRANT EXECUTE ON SYS.DBMS_AQ to 'username';
GRANT EXECUTE ON SYS.DBMS_AQADM to 'username';
GRANT EXECUTE ON SYS.DBMS_AQ_BQVIEW to 'username';
GRANT EXECUTE ON SYS.DBMS_AQIN to 'username';
GRANT EXECUTE ON SYS.DBMS_JOB to 'username';

1.2.用戶登錄執行執行

1.2.1. 創建消息負荷payload

  • 創建的此type用來封裝隊列所帶的,根據實際需求進行創建
CREATE OR REPLACE TYPE TYPE_QUEUE_INFO AS OBJECT
(
  param_1             VARCHAR2(100),
  param_2             VARCHAR2(100)
)

1.2.2. 創建隊列表

  • 創建對列表,並指定隊列數據的類型,隊列表名自定義即可,數據類型使用上面剛創建的type
begin
  sys.dbms_aqadm.create_queue_table(
    queue_table => 'QUEUE_TABLE',
    queue_payload_type => 'TYPE_QUEUE_INFO',
    sort_list => 'ENQ_TIME',
    compatible => '10.0.0',
    primary_instance => 0,
    secondary_instance => 0);
end;

1.2.3. 創建隊列並啟動

  • 創建名稱為QUEUE_TEST的隊列,並指定對列表名【同一個oracle用戶下,可以有多個對列表,同一個對列表中,可以有多個隊列】
begin
  sys.dbms_aqadm.create_queue(
    queue_name => 'QUEUE_TEST',
    queue_table => 'QUEUE_TABLE',
    queue_type => sys.dbms_aqadm.normal_queue,
    max_retries => 5,
    retry_delay => 0,
    retention_time => 0);
end;
  • 剛創建的隊列的狀態默認是未開啟的,需要手動開啟一下,同理,存在刪除、停止等操作
begin
  -- 啟動隊列
  sys.dbms_aqadm.start_queue(
      queue_name => 'QUEUE_TEST'
  );
  
  -- 暫停隊列
  --sys.dbms_aqadm.STOP_QUEUE(
  --    queue_name => 'QUEUE_TEST'
  --);
  
  -- 刪除隊列
  --sys.dbms_aqadm.DROP_QUEUE(
  --    queue_name => 'QUEUE_TEST'
  --);
  
  -- 刪除對列表
  --sys.dbms_aqadm.DROP_QUEUE_TABLE(
  --    queue_table => 'QUEUE_TABLE'
  --);
end;

1.2.4. 創建存儲過程

  • 儲存過程的作用為把數據加載到隊列中,生成的新的隊列會自動添加進綁定的對列表中,等待消費者進行消費
CREATE OR REPLACE PROCEDURE pro_queue(param_1 VARCHAR2, param_2 VARCHAR2) as
  r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle     RAW(16);
  o_payload            TYPE_QUEUE_INFO;
begin
  -- 封裝最終消息
  o_payload := TYPE_QUEUE_INFO(param_1, param_2);
  -- 入隊操作,指定隊列
  dbms_aq.enqueue(queue_name         => 'QUEUE_TEST',
                  enqueue_options    => r_enqueue_options,
                  message_properties => r_message_properties,
                  payload            => o_payload,
                  msgid              => v_message_handle);

  -- 出隊操作
  --dbms_aq.enqueue(queue_name => 'QUEUE_TEST',
  --                dequeue_options => r_dequeue_options,
  --                message_properties => r_message_properties,
  --                payload => o_payload,
  --                msgid => v_message_handle);
end pro_queue;

二、Java中JMS的使用

2.1. 項目配置

2.1.1. maven

<dependency>
      <groupId>com.oracle</groupId>
      <artifactId>jmscommon</artifactId>
      <version>1.2</version>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>orai18n</artifactId>
	<version>1.2</version>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>jta</artifactId>
	<version>1.2</version>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>aqapi_g</artifactId>
	<version>1.2</version>
</dependency>

2.1.2. yml

spring:
  datasource:
    url: jdbc:oracle:thin:@ip:port/sid
    username: **
    password: **    
queue:
  aq:
    # 該隊列是否可用,用來控制隊列的加載和重連,不可省略
    enable: true
    # 隊列名稱,不可省略
    name: QUEUE_TEST
    # 隊列重連的定時任務對應的時間表達式,不可省略
    cron: 0 */1 * * * ?

2.2. AQ初始化

  • 在項目啟動結束后立即運行此類,會根據所配置的隊列名稱監聽對應的隊列
package com.wangqq.jms;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
 * @Title: MessageAQInit.java
 * @Description: AQ 初始化
 * @author wangqq
 * @date 2020年6月28日 下午3:45:23
 * @version 1.0
 */
@Component
public class MessageAQInit implements CommandLineRunner {

    @Autowired
    private MessageAQConfig aqConfig;
    @Autowired
    private MessageAQListener listener;

    @Override
    public void run(String... args) throws RuntimeException {
        // 檢查消息隊列是否啟用
        if (aqConfig.enable) {
            // 設置AQ的消息監聽器
            MessageAQConnection.setListener(listener);
            // 初始化AQ連接
            if (!MessageAQConnection.initFactory(aqConfig)) {
                throw new RuntimeException("Message Oracle AQ initialization failed!");
            }
            // 建立連接
            if (!MessageAQConnection.establishConnection(aqConfig)) {
                throw new RuntimeException("Message Oracle AQ connection failed!");
            }
        }
    }
}

2.3. 配置信息類

  • 配置類,將yml的配置文件轉為java對象【時間表達式在代碼中不會以對象屬性的方式被使用,因此在該類中沒有設置】
package com.wangqq.jms;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @Title: MessageAQConfig.java
 * @Description: ORACLE 消息隊列配置
 * @author wangqq
 * @date 2020年6月28日 下午3:36:08
 * @version 1.0
 */
@Component
public class MessageAQConfig {
    
    /** 是否開啟MessageAq功能 */
    @Value("${queue.aq.enable}")
    public Boolean enable;
    
    /** 數據庫用戶名 */
	@Value("${spring.datasource.username}")
	public String userName;
	
	/** 數據庫密碼 */
	@Value("${spring.datasource.password}")
	public String password;
	
	/** 數據庫地址url */
	@Value("${spring.datasource.url}")
	public String url;
	
	/** 隊列名稱 */
	@Value("${queue.aq.name}")
	public String queue;
}

2.4. AQ 連接工廠類

  • AQ 鏈接的核心類,根據配置對象以及注入的監聽對象,動態監聽AQ隊列
package com.wangqq.jms;

import javax.jms.Queue;
import javax.jms.Session;

import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsConnection;
import oracle.jms.AQjmsConnectionFactory;
import oracle.jms.AQjmsConsumer;
import oracle.jms.AQjmsSession;

/**
 * @Title: MessageAQConnection.java
 * @Description: AQ 連接
 * @author wangqq
 * @date 2020年6月28日 下午3:50:32
 * @version 1.0
 */
@Slf4j
public class MessageAQConnection {

    private static AQjmsConnectionFactory aQjmsConnectionFactory;

    private static AQjmsConsumer aQjmsConsumer;

    private static AQjmsSession aQjmsSession;

    private static AQjmsConnection aQjmsConnection;

    private static MessageAQListener listener;

    /**
     * 設置JMS監聽器
     * 
     * @param messageAqJmsListener
     * @author wangqq
     * @date 2020年7月6日 上午8:33:57
     */
    public static void setListener(MessageAQListener messageAqJmsListener) {
        listener = messageAqJmsListener;
    }

    /**
     * 初始化 AQ 連接 Factory
     *
     * @param aqConfig 消息隊列配置
     * @return 是否成功
     */
    public static boolean initFactory(MessageAQConfig aqConfig) {
        try {
            aQjmsConnectionFactory = new AQjmsConnectionFactory();
            aQjmsConnectionFactory.setJdbcURL(aqConfig.url);
            aQjmsConnectionFactory.setUsername(aqConfig.userName);
            aQjmsConnectionFactory.setPassword(aqConfig.password);
            return true;
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return false;
        }
    }

    /**
     * 連接消息隊列
     *
     * @param aqConfig 消息隊列配置
     * @return 是否成功
     */
    public static boolean establishConnection(MessageAQConfig aqConfig) {
        try {
            aQjmsConnection = (AQjmsConnection) aQjmsConnectionFactory.createConnection();
            aQjmsSession = (AQjmsSession) aQjmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            aQjmsConnection.start();
            Queue queue = aQjmsSession.getQueue(aqConfig.userName, aqConfig.queue);
            aQjmsConsumer = (AQjmsConsumer) aQjmsSession.createConsumer(queue, null, MessageORAData.getFactory(), null,
                    false);
            aQjmsConsumer.setMessageListener(listener);
            return true;
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return false;
        }
    }

    /**
     * 關閉消息隊列連接
     *
     * @return 是否成功
     */
    public static boolean closeConnection() {
        try {
            aQjmsConsumer.close();
            aQjmsSession.close();
            aQjmsConnection.close();
            return true;
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return false;
        }
    }
}

2.5. 創建AQ 數據承載類

  • 用來接收oracle隊列中所帶的參數,基本保證與數據庫中的type格式相同即可
package com.wangqq.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Title: Test.java
 * @Description: AQ 數據承載類
 * @author wangqq
 * @date 2021-01-20 16:19:16
 * @version 1.0
 */
@Data
@Builder
public class Test {
    
    private String param_1;
    
    private String param_2;
    
}

2.6. 數據類型轉換

  • 將oracleAq所承載的數據,轉化為我們自己需要的實例對象,及上述中的Test對象
package com.wangqq.jms;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Struct;

import com.wangqq.bean.Test;

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;

/**
 * @Title: MessageORAData.java
 * @Description: 數據類型轉換類
 * @author synjones
 * @date 2018年12月3日 上午11:29:50
 * @version 1.0
 */
@Slf4j
@NoArgsConstructor
public class MessageORAData implements ORAData, ORADataFactory {

    private Object[] rawData = new Object[8];
    
    private static final MessageORAData MESSAGE_FACTORY = new MessageORAData();

    public static ORADataFactory getFactory() {
        return MESSAGE_FACTORY;
    }

    @Override
    public ORAData create(Datum datum, int sqlType) throws SQLException {
        if (datum == null) {
            return null;
        } else {
            try {
                MessageORAData payOraData = new MessageORAData();
                Struct aStruct = (Struct) datum;
                payOraData.rawData = aStruct.getAttributes();
                return payOraData;
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                return null;
            }
        }
    }

    @Override
    public Datum toDatum(Connection arg0) throws SQLException {
        return null;
    }

    /**
     * 消息內容解析並封裝
     * 
     * @return
     * @author wangqq
     * @date 2020年7月6日 上午8:38:01
     */
    public Test getContent() {
        try {
            return Test.builder()
                .param_1(rawData[0] == null ? null : rawData[0].toString())
                .param_2(rawData[0] == null ? null : rawData[0].toString())
                .build();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return null;
        }
    }
}

2.7. AQ 監聽

package com.wangqq.jms;

import javax.jms.Message;
import javax.jms.MessageListener;

import org.springframework.stereotype.Component;

import com.wangqq.bean.Test;

import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;

/**
 * @Title: JMSListener.java
 * @Description: JMS監聽ORACLEAQ的隊列消息
 * @author wangqq
 * @date 2020年6月28日 上午11:23:42
 * @version 1.0
 */
@Slf4j
@Component
public class MessageAQListener implements MessageListener {

    
    @Override
    public void onMessage(Message message1) {
        AQjmsAdtMessage adtMessage = (AQjmsAdtMessage)message1;
        try {
            MessageORAData payload = (MessageORAData)adtMessage.getAdtPayload();
            // 獲取消息內容
            Test test = payload.getContent();
            
            System.out.println(test.toString());
            
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

2.8. AQ 監控任務, 在AQ斷開后重連

  • 通過定時任務,定時查詢是否有入隊時間在5分鍾之內的隊列未被消費【隊列入隊后,會在對列表中產生一條數據,消費之后該數據會被清除掉】,若存在,則說明監聽異常,需要重新創建連接監聽隊列
  • 數據庫對列表中的入隊時間在本次測試中為0時區的時間,故而在代碼中轉換了一下時區,否則無法根據入隊時間查詢數據
package com.wangqq.jms;

import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.wangqq.mapper.MessageAqMapper;
import com.wangqq.util.DateUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * @Title: MessageAQMonitor.java
 * @Description: AQ 監控任務, 在AQ斷開后重連
 * @author wangqq
 * @date 2020年6月28日 下午4:35:31
 * @version 1.0
 */
@Slf4j
@Component
public class MessageAQMonitor {

    @Autowired
    private MessageAQConfig aqConfig;
    @Autowired
    private MessageAqMapper aqMapper;

    @Scheduled(cron = "${queue.aq.cron}")
    private void monitorJob() {
        // 檢查消息隊列是否啟用
        if (!aqConfig.enable) {
            return;
        }
        // 獲取當前時間,並向前推5分鍾
        String formatDateTime = DateUtil.formatDate(new Date(System.currentTimeMillis() - 300000));
        // 將該時間轉為0時區的時間【數據庫中存儲的隊列時間為0時區的時間】
        String zeroZoneTime = DateUtil.timeConvert(formatDateTime, "+08:00", "+00:00", "yyyy-MM-dd HH:mm:ss");
        // 查詢是否存在5分鍾以前的隊列未被消費
        int selectCount = aqMapper.selectCount(aqConfig.queue, zeroZoneTime);
        if (selectCount != 0) {
            // 若存在,則重新啟動監聽
            if (MessageAQConnection.closeConnection()) {
                log.info("--> AQ connection has been closed.");
                if (MessageAQConnection.establishConnection(aqConfig)) {
                    log.info("--> AQ connection has been re-established.");
                }
            }
        }
    }
}

2.9. 隊列表中隊列數量的查詢

  • 根據隊列名稱和入隊時間,查詢在入隊時間之后入對的隊列數量
package com.wangqq.mapper;

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;

/**
 * @Title: MessageAqMapper.java
 * @Description: oracleAQ的查詢
 * @author wangqq
 * @date 2020年6月28日 下午4:04:50
 * @version 1.0
 */
@Mapper
public interface MessageAqMapper {

    /**
     * 
     * 查詢數據庫中的隊列表中符合條件的隊列的條數
     *
     * @param qName         隊列名稱
     * @param minDatetime   隊列入隊的最小時間
     * @return
     * @author wangqq
     * @date 2020-07-10 15:44:43
     */
    @Select("select count(msgid) from T_QUEUE_TABLE t where t.q_name = #{qName,jdbcType=VARCHAR} "
            + "and to_char(cast(t.enq_time AS DATE), 'yyyy-MM-dd HH24:mi:ss') < #{minDatetime,jdbcType=VARCHAR}")
    int selectCount(String qName, String minDatetime);

}

2.10. 日期工具類

package com.wangqq.util;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;

/**
 * @Title: DateUtil.java
 * @Description: 日期工具類
 * @author wangqq
 * @date 2018年10月29日 下午5:27:21
 * @version 1.0
 */
public class DateUtil {

	/**
	 * 字符串轉date,默認格式yyyy-MM-dd HH:mm:ss
	 * 
	 * @param source
	 * @return
	 */
	public static Date parseDate(String source) {
		return parseDate(source, "yyyy-MM-dd HH:mm:ss");
	}

	/**
	 * 字符串轉date
	 * 
	 * @param source
	 * @param pattern
	 *            格式
	 * @return
	 */
	public static Date parseDate(String source, String pattern) {
		if (source == null || source.equals("")) {
			return null;
		}
		SimpleDateFormat sdf = new SimpleDateFormat(pattern);
		try {
			return sdf.parse(source);
		} catch (ParseException e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 格式化日期,默認格式yyyy-MM-dd HH:mm:ss
	 * 
	 * @param date
	 * @return
	 */
	public static String formatDate(Date date) {
		return formatDate(date, "yyyy-MM-dd HH:mm:ss");
	}

	/**
	 * 格式化日期
	 * 
	 * @param date
	 * @param pattern
	 *            格式
	 * @return
	 */
	public static String formatDate(Date date, String pattern) {
		if (date == null) {
			return null;
		}
		SimpleDateFormat sdf = new SimpleDateFormat(pattern);
		return sdf.format(date);
	}

	/**
     * 時區 時間轉換方法:將傳入的時間(可能為其他時區)轉化成目標時區對應的時間
     * @param sourceTime 時間格式必須為:yyyy-MM-dd HH:mm:ss
     * @param sourceId 入參的時間的時區id 比如:+08:00
     * @param targetId 要轉換成目標時區id 比如:+09:00
     * @param reFormat 返回格式 默認:yyyy-MM-dd HH:mm:ss
     * @return string 轉化時區后的時間
     */
    public static String timeConvert(String sourceTime, String sourceId,
            String targetId,String reFormat){
        //校驗入參是否合法
        if (null == sourceId || "".equals(sourceId) || null == targetId
                || "".equals(targetId) || null == sourceTime
                || "".equals(sourceTime)){
            return null;
        }
        
        if(reFormat == null || "".equals(reFormat)){
            reFormat = "yyyy-MM-dd HH:mm:ss";
        }
        
        //校驗 時間格式必須為:yyyy-MM-dd HH:mm:ss
        String reg = "^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$";
        if (!sourceTime.matches(reg)){
            return null;
        }
        
        try{
            //時間格式
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //根據入參原時區id,獲取對應的timezone對象
            TimeZone sourceTimeZone = TimeZone.getTimeZone("GMT"+sourceId);
            //設置SimpleDateFormat時區為原時區(否則是本地默認時區),目的:用來將字符串sourceTime轉化成原時區對應的date對象
            df.setTimeZone(sourceTimeZone);
            //將字符串sourceTime轉化成原時區對應的date對象
            java.util.Date sourceDate = df.parse(sourceTime);
            
            //開始轉化時區:根據目標時區id設置目標TimeZone
            TimeZone targetTimeZone = TimeZone.getTimeZone("GMT"+targetId);
            //設置SimpleDateFormat時區為目標時區(否則是本地默認時區),目的:用來將字符串sourceTime轉化成目標時區對應的date對象
            df.setTimeZone(targetTimeZone);
            //得到目標時間字符串
            String targetTime = df.format(sourceDate);
            
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            java.util.Date date = sdf.parse(targetTime);
            sdf = new SimpleDateFormat(reFormat);
            
            return sdf.format(date);
        }
        catch (ParseException e){
            e.printStackTrace();
        }
        return null;
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM