oracle 隊列


 

Oracle 高級隊列(AQ)

 

適用對象:初步了解oracle高級隊列人群

 

 

注意事項:

序號

注意事項

1

JMS監聽部分基本參照以下網站內容:

http://blog.csdn.net/scorpio3k/article/details/49406209

2

本文僅為按作者本身的項目經歷編寫,不包含全部oracle高級隊列功能

 

 

目錄

Oracle 高級隊列(AQ). 0

一.前言... 3

二.功能概述... 3

三.創建Oracle高級隊列... 4

1.Oracle高級隊列所需權限... 4

2.創建隊列結構—TYPE. 4

3.創建隊列表... 5

4.創建隊列... 5

5.隊列管理... 5

6.隊列創建管理完整步驟... 7

四.JMS監聽並處理Oracle 高級隊列... 10

1. 准備工作:. 10

2. 創建連接參數類:. 10

3. 創建消息轉換類:. 10

4. 主類進行消息處理:. 11

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

1 前言

一般系統的應用可以分為:立即要執行和可以延遲要執行的事情,區分這個很重要。為了提高系統的性能,縮短系統等待時間,引入隊列技術。

隊列是一種能將應用程序的處理工作有效地划分為前台任務和后台任務的技術。當處理容量允許時,這種技術通過存儲消息、確定消息處理的優先順序和向應用程序提交消息來發揮作用。它使你能夠平衡本地計算機的負荷,或將任務分配到遠程計算機。

為了減少用戶的等待時間,應用程序可以讓說明需要后台處理的消息排入隊列。然后就可以從頁面的呈遞過程中去掉該處理任務。由一個后台進程來讀取並隊列處理這些消息,或者甚至可以交由一個單獨的系統來處理它們。

隊列可以實現各個系統之間的數據共享,消息通信。

 

2 功能概述

書寫本文的目的:利用Oracle高級隊列實現pl/sql代碼,為其它語言實現高級隊列的功能作接口。

Oracle高級隊列有一下好處:

(1)高級隊列管理是Oracle數據庫的一個特性,它提供消息隊列管理功能。這是一個非常可靠、安全和可伸縮的消息管理系統,因為它使用與其他基於Oracle技術的應用程序相同的數據庫特性。

(2)高級隊列管理的一個很大優點是它可以通過pl/sql、java或c來訪問,這樣你就可以把來自一個java servlet的消息入隊列和使pl/sql存儲過程中的相同消息出隊列。

(3)高級隊列管理的另一個優點是你可以利用這一軟件通過Oracle net services (sql*net)、http(s)和smtp,在遠程節點之間傳播消息。高級隊列甚至可以通過消息網關與非Oracle的消息管理系統(如ibm mqseries)相集成。

(4)Oracle高級隊列管理提供了單消費者隊列和多消費者隊列。單消費者隊列只面向單一的接收者。多消費者隊列可以被多個接收者使用。當把消息放入多消費者隊列時,應用程序的程序員必須顯式地在消息屬性中指定這些接收者,或者建立決定每條消息的接收者的基於規則的訂閱過程。

Oracle 高級隊列具體開發步驟如下:

(1)首先確定應用的需求,是否適合使用高級隊列?使用高級隊列預計提高性能的預期值

(2)賦予數據庫賬戶相應aq權限。

(3)確定隊列包體結構,即創建type。

(4)創建隊列表及隊列。

(5)隊列管理

3 創建Oracle高級隊列

3.1 Oracle高級隊列所需權限 

賦予權限和角色:

grant connect, resource to賬戶名;

grant aq_user_role to賬戶名;

grant aq_administrator_role to 賬戶名;

grant execute on sys.dbms_aqadm to賬戶名;

grant execute on sys.dbms_aq to賬戶名;

grant execute on sys.dbms_aqin to賬戶名;

grant execute on sys.dbms_aqjms to賬戶名;

 

3.2 創建隊列結構—TYPE

create or replace type 類型名稱 as object

(

  字段一     字段類型,

  字段二     字段類型,

  字段三     字段類型,

  字段四     字段類型,

  ………………………… 

字段n      字段類型

);

 

 

 

3.3 創建隊列表

begin

  sys.dbms_aqadm.create_queue_table

 (

  queue_table        => '隊列表名',  

  queue_payload_type => ' type類型',            --之前定義的type類型

  sort_list          => 'priority,enq_time',    --按優先級和入列時間排序

  multiple_consumers => false,                  --多消費者

  comment            => '自己加注解',  

  auto_commit        => false                   --手動控制事務

 );

end;

 

3.4 創建隊列

begin

  sys.dbms_aqadm.create_queue

 (

  queue_name     => '隊列名',

  queue_table    => '隊列表名',                      --之前創建的隊列表

  queue_type     => sys.dbms_aqadm.normal_queue,

  max_retries    => 3,                               --取隊列失敗后重試次數

  retry_delay    => 1,                               --重試前等待

  retention_time => 0                                 --取隊列后保持時間,不保持

 );

end;

 

3.5 隊列管理

3.5.1 啟動隊列:

begin

  sys.dbms_aqadm.start_queue

  (queue_name => '隊列名',enqueue => true ,dequeue => true );

end;

 

 

3.5.2 插入隊列:

declare

  v_message    隊列類型(type);

  v_msgid      raw(16);

  v_options    dbms_aq.enqueue_options_t;

  v_properties dbms_aq.message_properties_t;

  v_recipients dbms_aq.aq$_recipient_list_t;

begin

  v_message := task_c(字段一  => 字符串,

                          字段二  => 字符串,

                          字段三  => 字符串,

                          字段四  => 字符串,

                          ………………………

                          字段n  => 字符串);

  v_properties.priority :=數字;                   --該消息的優先級別,默認為1

  v_options.visibility  := dbms_aq.immediate;     --立即入列

  dbms_aq.enqueue(queue_name         => '隊列名',

                  enqueue_options    => v_options,

                  message_properties => v_properties,

                  payload            => v_message,

                  msgid              => v_msgid);

end;

 

3.5.3 暫停隊列:

begin

  sys.dbms_aqadm.stop_queue ( queue_name => '隊列名');

end;

 

3.5.4 刪除隊列:

begin

  sys.dbms_aqadm.drop_queue ( queue_name => '隊列名');

end;

 

3.5.5 刪除隊列表:

begin

  sys.dbms_aqadm.drop_queue_table (queue_table   =>  '隊列表名');

end;

 

 

3.6 隊列創建管理完整步驟

在aquser賬戶中創建高級隊列,高級隊列結構為callid,msg_id,report_time,sms_report,respurl,send_times,隊列名稱為sms_queue。

 

開發步驟:

 

(1)賦予權限:

grant connect, resource to aquser;

grant aq_user_role to aquser;

grant aq_administrator_role to aquser;

grant execute on sys.dbms_aqadm to aquser;

grant execute on sys.dbms_aq to aquser;

grant execute on sys.dbms_aqin to aquser;

grant execute on sys.dbms_aqjms to aquser;

 

(2)創建隊列結構(type):

create or replace type sms_queue_type as object

(

  callid      varchar2(1024),

  msg_id      varchar2(1024),

  report_time varchar2(1024),

  sms_report  varchar2(1024),

  respurl     varchar2(1024),

  send_times  varchar2(1024)

);

 

(3)創建隊列表:

begin

  sys.dbms_aqadm.create_queue_table

 (

  queue_table        => 'sms_queue_table',  

  queue_payload_type => 'sms_queue_type',            

  sort_list          => 'priority,enq_time',   

  multiple_consumers => false,                  

  comment            => 'queue for test',  

  auto_commit        => false                  

 );

end;

 

 

(4)創建隊列:

begin

  sys.dbms_aqadm.create_queue

 (

  queue_name     => 'sms_queue',

  queue_table    => 'sms_queue_table',                 

  queue_type     => sys.dbms_aqadm.normal_queue,

  max_retries    => 3,                              

  retry_delay    => 1,                              

  retention_time => 0                                

 );

end;

 

(5)啟動隊列:

begin

  sys.dbms_aqadm.start_queue

  (queue_name => 'sms_queue',enqueue => true ,dequeue => true );

end;

 

(6)建立入隊存儲:

CREATE OR REPLACE procedure sms_enqueue(in_callid      varchar2,

                                        in_msg_id      varchar2,

                                        in_report_time varchar2,

                                        in_sms_report  varchar2,

                                        in_respurl     varchar2,

                                        in_send_times  number,

                                        out_result     out varchar2,

                                        out_sqlerrm    out varchar2) as

  /*聲明變量*/

  v_Message    SMS_QUEUE_TYPE;

  v_MsgId      RAW(16);

  v_options    DBMS_AQ.ENQUEUE_OPTIONS_T;

  v_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

  v_Recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T;

  v_sqlerrm    varchar2(512);

begin

 

  /*對列字段賦值*/

  v_Message := SMS_QUEUE_TYPE(callid      => in_callid,

                              msg_id      => in_msg_id,

                              report_time => in_report_time,

                              sms_report  => in_sms_report,

                              respurl     => in_respurl,

                              send_times  => in_send_times);

 

 

  /*讓消息立即進入隊列*/

  v_options.visibility := DBMS_AQ.IMMEDIATE;

 

  /*入隊操作*/

  dbms_aq.enqueue(queue_name         => 'sms_queue',

                  enqueue_options    => v_options,

                  message_properties => v_properties,

                  payload            => v_Message,

                  msgid              => v_MsgId);

 

  /*異常處理*/

exception

  WHEN OTHERS THEN

    v_sqlerrm   := SQLERRM;

    out_result  := '-1';

    out_sqlerrm := v_sqlerrm;

end;

 

(6)刪除隊列:

begin

  sys.dbms_aqadm.stop_queue ( queue_name => 'sms_queue');

end;

 

begin

  sys.dbms_aqadm.drop_queue ( queue_name => 'sms_queue');

end;

 

begin

  sys.dbms_aqadm.drop_queue_table (queue_table   =>  'sms_queue_table');

end;

 

drop type voicechange.sms_queue_type;

 

 

 

 

4 JMS監聽並處理Oracle 高級隊列

4.1 准備工作:

Java使用JMS進行相應的處理,需要使用Oracle提供的jar,在Oracle安裝目錄可以找到:在linux中可以使用find命令進行查找,例如:find `pwd` -name 'jmscommon.jar'

需要的jar為:

app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar

app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar

app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar

app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar

app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar

 

4.2 創建連接參數類:

實際使用時可以把參數信息配置在properties文件中,使用spring進行注入。

package org.kevin.jms;

c class JmsConfig

{

    public String username = "數據庫賬戶名";

    public String password = "數據庫賬戶密碼";

    public String jdbcUrl = "jdbc:oracle:thin:@數據庫TNS:端口號:名稱";

    public String queueName = "監聽的隊列名稱";

}

 

4.3 創建消息轉換類:

package org.kevin.jms;

import java.sql.SQLException;

import oracle.jdbc.driver.OracleConnection;

import oracle.jdbc.internal.OracleTypes;

import oracle.jpub.runtime.MutableStruct;

import oracle.sql.CustomDatum;

import oracle.sql.CustomDatumFactory;

import oracle.sql.Datum;

import oracle.sql.STRUCT;

@SuppressWarnings("deprecation")

public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {

    public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";

    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

 

    MutableStruct _struct;

    // 12表示字符串

    static int[] _sqlType = { 12 };

    static CustomDatumFactory[] _factory = new CustomDatumFactory[1];

    static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();

    public static CustomDatumFactory getFactory() {

        return _MessageFactory;

    }

    public QUEUE_MESSAGE_TYPE() {

        _struct = new MutableStruct(new Object[1], _sqlType, _factory);

    }

    public Datum toDatum(OracleConnection c) throws SQLException {

        return _struct.toDatum(c, _SQL_NAME);

    }

    public CustomDatum create(Datum d, int sqlType) throws SQLException {

        if (d == null)

            return null;

        QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();

        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);

        return o;

    }

    public String getContent() throws SQLException {

        return (String) _struct.getAttribute(0);

    }

}

 

4.4 主類進行消息處理:

package org.kevin.jms;

import java.util.Properties;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.Session;

 

import oracle.jms.AQjmsAdtMessage;

import oracle.jms.AQjmsDestination;

import oracle.jms.AQjmsFactory;

import oracle.jms.AQjmsSession;

public class Main {

    public static void main(String[] args) throws Exception {

        JmsConfig config = new JmsConfig();

        QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,

                new Properties());

        QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);

        AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        conn.start();

        Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);

        MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);

 

        consumer.setMessageListener(new MessageListener() {

            @Override

            public void onMessage(Message message) {

                System.out.println("ok");

                AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;

                try {

                    QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();

                    System.out.println(payload.getContent());

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        });

        Thread.sleep(1000000);

    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM