Oracle Advanced Queues - missing messages and best approach to debug

1.1k Views Asked by At

Env: Oracle 12c

I currently have an Oracle Advanced Queue system setup as follows:

NAME                  QUEUE_TABLE     QID       QUEUE_TYPE      MAX_RETRIES  RETRY_DELAY  RETENTION
---------------       --------------- -------   --------------- -----------  -----------  ---------
MY_WORK_Q             MY_WORK_QT      2518333   NORMAL_QUEUE    100          0            0 
MY_WORK_QT_E          MY_WORK_QT      2518332   EXCEPTION_QUEUE 0            0            0

I also have registered a callback that calls a PL/SQL package procedure.

For some reason, I seem to have a situation where I am losing messages or messages are not being dequeued.

Based on this, I have the following questions:

  1. Have I setup my actual queue - MY_WORK_Q with MAX_RETRIES and other info correctly?
  2. Is there a way to check messages that have been enqueued?
  3. Is there a way to check messages that have been de-queued?
  4. Is there a means of checking the EXCEPTION_QUEUE/Exception table to see if lost messages have reached there?

I just can't see why I am losing messages within my queueing system and what I could check to see what might be causing the issue.

I also increased my MAX_RETRIES to 100 but still seem to be having issues.

Update

It seems like I am getting the error ORA-25263: no message in queue.

I am not sure if this is related to a timing issue on the dbms_aq.enqueue or dbms_aq.dequeue calls but on the dbms_aq.dequeue I have this set:

l_dequeue_options.wait := dbms_aq.no_wait;

Do I require say a 10 second wait instead of no_wait? I am unsure if this possible and whether the wait needs to be on the enqueue or dequeue steps.

1

There are 1 best solutions below

0
Kevin Seymour On

Here is a simple example that works. Perhaps you can use it to identify your issue?

-- DROP TYPE some_type_t FORCE;

CREATE OR REPLACE TYPE some_type_t AS OBJECT (
  a NUMBER,
  b VARCHAR(100),
  c DATE
);

CREATE OR REPLACE PROCEDURE some_type_callback(CONTEXT  IN RAW,
                                               reginfo  IN sys.aq$_reg_info,
                                               descr    IN sys.aq$_descriptor,
                                               payload  IN RAW,
                                               payloadl IN NUMBER) AS

  -- Local variables
  v_dequeue_options    dbms_aq.dequeue_options_t;
  v_message_properties dbms_aq.message_properties_t;
  v_message_handle     RAW(26);
  v_some_type          some_type_t;
BEGIN

  -- Set the dequeue options from the descriptor
  v_dequeue_options.consumer_name := descr.consumer_name;
  v_dequeue_options.msgid         := descr.msg_id;

  -- Dequeue the message
  dbms_aq.dequeue(queue_name         => descr.queue_name,
                  dequeue_options    => v_dequeue_options,
                  message_properties => v_message_properties,
                  payload            => v_some_type,
                  msgid              => v_message_handle);
END some_type_callback;
/

SELECT *
  FROM user_errors e
 WHERE e.name = 'SOME_TYPE_CALLBACK';

BEGIN
 -- dbms_aqadm.drop_queue_table(queue_table => 'some_type_qt',
 --                             force       => TRUE);

  dbms_aqadm.create_queue_table(queue_table        => 'some_type_qt',
                                queue_payload_type => 'some_type_t',
                                multiple_consumers => TRUE);

  dbms_aqadm.create_queue(queue_name     => 'some_type_q',
                          queue_table    => 'some_type_qt',
                          retention_time => 86400); -- 1 day

  dbms_aqadm.start_queue(queue_name => 'some_type_q');

  dbms_aqadm.add_subscriber(queue_name => 'some_type_q',
                            subscriber => sys.aq$_agent(NAME     => 'some_type_qs',
                                                        address  => NULL,
                                                        protocol => NULL));

  dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('some_type_q:some_type_qs',
                                                          dbms_aq.namespace_aq,
                                                          'plsql://some_type_callback',
                                                          hextoraw('FF'))),
                   1);
END;
/

SELECT *
  FROM aq$some_type_qt;
-- nothing

DECLARE
  v_some_type some_type_t;
  eopt        dbms_aq.enqueue_options_t;
  mprop       dbms_aq.message_properties_t;
  enq_msgid   RAW(16);
BEGIN
  v_some_type := some_type_t(a => 42,
                             b => 'forty-two',
                             c => to_date('1/1/2942',
                                          'mm/dd/yyyy'));

  dbms_aq.enqueue(queue_name         => 'some_type_q',
                  enqueue_options    => eopt,
                  message_properties => mprop,
                  payload            => v_some_type,
                  msgid              => enq_msgid);
END;
/

SELECT *
  FROM aq$some_type_qt;
-- msg_state = READY => PROCESSED

One thing that will definitely help you is setting retention_time on your queue table. You can then use the aq$ reference for the queue table to look at the messages. The msg_state column will show READY for a fresh message and PROCESSED for a message that was consumed. There are some other columns on there that can be helpful: retry_count for example.

If you are getting ORA-25263 it seems instead of handling the one message you are getting provoked for in your callback that you are trying to read a different message and clashing with another job that was started to consume the queue. This is resolved by the two lines I have in my callback before calling dequeue.

If you need to trigger when a message arrives and then preserve message order you need to add some locking and additional complexity to your callback. You could take a look at Metalink 225810.1 for examples on how to do this.