Skip Headers

Oracle9i Application Developer's Guide - Advanced Queuing
Release 2 (9.2)

Part Number A96587-01
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to next page

12
Creating Applications Using JMS

In Chapter 1, "Introduction to Oracle Advanced Queuing" we described a messaging system for an imaginary company, BooksOnLine. In this chapter we consider the features of the Oracle JMS interface to AQ in the context of a sample application based on that scenario. This chapter contains these topics:

A Sample Application Using JMS

The operations of a large bookseller, BooksOnLine, are based on an online book ordering system that automates activities across the various departments involved in the entire sale process. The front end of the system is an order entry application where new orders are entered. These incoming orders are processed by an order processing application that validates and records the order. Shipping departments located at regional warehouses are then responsible for ensuring that these orders are shipped in a timely fashion. There are three regional warehouses: one serving the East Region, one serving the West Region, and a third warehouse for shipping International orders. Once an order has been shipped, the order information is routed to a central billing department that handles payment processing. The customer service department, located at its own site, is responsible for maintaining order status and handling inquiries about orders.

In Chapter 1 we outlined a messaging system for an imaginary company, BooksOnLine. In this chapter we consider the features of the JMS interface to AQ in the context of a sample application based on that scenario. This sample application has been devised for the sole purpose of demonstrating the features of Oracle AQ. Our aim in creating this integrated scenario is to make it easier to grasp the possibilities of this technology by locating our explanations within a single context. However, it is not possible within the scope of a single relatively small code sample to demonstrate every possible application of AQ.

General Features of JMS

The following topics are discussed in this section:

J2EE Compliance

In release 9.2, Oracle JMS conforms to the Sun Microsystems JMS 1.0.2b standard. You can define the J2EE compliance mode for an OJMS client at run time. For compliance, set the Java property "oracle.jms.j2eeCompliant" to TRUE as a command line option. For noncompliance, do nothing. FALSE is the default value.

New features in release 9.2 support J2EE compliance and are also available in the noncompliant mode. These include support for:

Features of JMSPriority, JMSExpiration, and nondurable subscribers vary depending on which mode you use.

JMSPriority

JMSPriority values depend on whether you are running the default, noncompliant mode or the compliant mode, in which you set the compliance flag to TRUE:

JMSExpiration

JMSExpiration values depend on whether you are running the default, noncompliant mode or the compliant mode, in which you set the compliance flag to TRUE:

Durable Subscribers

Durable subscriber behavior, when subscribers use the same name, depends on whether you are running the default, noncompliant mode or the compliant mode, in which you set the compliance flag to TRUE.

JMS Connection and Session

Connection Factory

A ConnectionFactory encapsulates a set of connection configuration parameters that has been defined by an administrator. A client uses it to create a Connection with a JMS provider. In this case Oracle JMS, Oracle8i is the JMS Provider.

There are two types of ConnectionFactory objects

ConnectionFactory objects can be obtained in one of the following ways

  1. Static methods in AQjmsFactory
  2. Java Naming and Directory Interface (JNDI) Lookup from a LDAP directory server

Using AQjmsFactory to Obtain ConnectionFactory Objects

The AQjmsFactory class can be used to obtain a handle to Queue/Topic ConnectionFactory objects.

Example

public static void get_Factory() throws JMSException
{
  QueueConnectionFactory    qc_fact   = null;
 /* get queue connection factory for database "aqdb", host "sun-123", port 
    5521,  driver "thin" */
 qc_fact = AQjmsFactory.getQueueConnectionFactory("sun-123", "aqdb", 5521, 
                                                     "thin");
}

Using JNDI to Look Up ConnectionFactory Objects

ConnectionFactory objects can be registered in an LDAP server by a JMS administrator.

The following setup is required to enable JNDI lookup in JMS:

  1. When the Oracle9i server is installed, the database must be registered with the LDAP server. This can be done using the Database Configuration Assistant (DBCA).

    AQ entries in the LDAP server have the following structure:

    Text description of adque446.gif follows
    Text description of the illustration adque446.gif


    Connection Factory information is stored under <cn=OracleDBConnections>, while topics and queues are stored under <cn=OracleDBQueues>

  2. The GLOBAL_TOPIC_ENABLED system parameter for the database must be set to TRUE. This ensures that all queues and topics created in AQ are automatically registered with the LDAP server.

    This parameter can be set by using

    ALTER SYSTEM SET GLOBAL_TOPICS_ENABLED = TRUE

  3. After the database has been setup to use an LDAP server, the JMS administrator can register QueueConnectionFactory and TopicConnectionFactory objects in LDAP by using the AQjmsFactory.registerConnectionFactory() method.

    The registration can be done in one of the following ways:

    • Connect directly to the LDAP server - The user must have the GLOBAL_AQ_USER_ROLE to register connection factories in LDAP

      To connect directly to LDAP, the parameters for the registerConnectionFactory method include the LDAP context, the name of the Queue/Topic ConnectionFactory, hostname, database SID, port number, JDBC driver (thin or oci8) and factory type (queue or topic).

    • Connect to LDAP through the database server - the user can log on to the Oracle9i database first and then have the database update the LDAP entry. The user that logs on to the database must have the AQ_ADMINISTRATOR_ROLE to perform this operation.

      To connect directly to LDAP through the database server, the parameters for the registerConnectionFactory method include a JDBC connection (to a user having AQ_ADMINISTRATOR_ROLE), the name of the Queue/Topic ConnectionFactory, hostname, database SID, port number, JDBC driver (thin or oci8) and factory type (queue or topic).

After the Connection Factory objects have been registered in LDAP by a JMS administrator, they can be looked up by using JNDI

Example

Lets say the JMS administrator wants to register a order entry queue connection factory, oe_queue_factory. In LDAP, it can be registered as follows:

public static void register_Factory_in_LDAP() throws Exception
{
    Hashtable env = new Hashtable(5, 0.75f);
    env.put(Context.INITIAL_CONTEXT_FACTORY, AQjmsConstants.INIT_CTX_FACTORY);

    // aqldapserv is your LDAP host and 389 is your port
    env.put(Context.PROVIDER_URL, "ldap://aqldapserv:389); 
    
    // now authentication info
    // username/password scheme, user is OE, password is OE
    env.put(Context.SECURITY_AUTHENTICATION, "simple"); 
    env.put(Context.SECURITY_PRINCIPAL, "cn=oe,cn=users,cn=acme,cn=com");
    env.put(Context.SECURITY_CREDENTIALS, "oe");

       /* register queue connection factory for database "aqdb", host "sun-123", 
       port 5521,  driver "thin" */
       AQjmsFactory.registerConnectionFactory(env, "oe_queue_factory", "sun-123", 
                                          "aqdb", 5521, "thin", "queue");
   }

After order entry, queue connection factory oe_queue_factory has been registered in LDAP; it can be looked up as follows:

public static void get_Factory_from_LDAP() throws Exception
{
    Hashtable env = new Hashtable(5, 0.75f);
    env.put(Context.INITIAL_CONTEXT_FACTORY, AQjmsConstants.INIT_CTX_FACTORY);

    // aqldapserv is your LDAP host and 389 is your port
    env.put(Context.PROVIDER_URL, "ldap://aqldapserv:389); 
    
    // now authentication info
    // username/password scheme, user is OE, password is OE
    env.put(Context.SECURITY_AUTHENTICATION, "simple"); 
    env.put(Context.SECURITY_PRINCIPAL, "cn=oe,cn=users,cn=acme,cn=com");
    env.put(Context.SECURITY_CREDENTIALS, "oe");

    DirContext inictx = new InitialDirContext(env);
    // initialize context with the distinguished name of the database server
    inictx=(DirContext)inictx.lookup("cn=db1,cn=OracleContext,cn=acme,cn=com");

    //go to the connection factory holder cn=OraclDBConnections
    DirContext connctx = (DirContext)inictx.lookup("cn=OracleDBConnections");

    // get connection factory "oe_queue_factory"
    QueueConnectionFactory qc_fact =          
                  (QueueConnectionFactory)connctx.lookup("cn=oe_queue_factory");
}

Connection

A JMS Connection is a client's active connection to its JMS provider. A Connection performs several critical services:

A JMS Connection to the database can be created by invoking createQueueConnection() or createTopicConnection() and passing the parameters username and password on the QueueConnectionFactory and TopicConnectionFactory object respectively.

Connection Setup

A JMS client typically creates a Connection, Session and a number of MessageProducers and MessageConsumers. In the current version only one open session for each connection is allowed, except in the following cases:

When a Connection is created it is in stopped mode. In this state no messages can be delivered to it. It is typical to leave the Connection in stopped mode until setup is complete. At that point the Connection's start() method is called and messages begin arriving at the Connection's consumers. This setup convention minimizes any client confusion that may result from asynchronous message delivery while the client is still in the process of setup.

It is possible to start a Connection and to perform setup subsequently. Clients that do this must be prepared to handle asynchronous message delivery while they are still in the process of setting up. A MessageProducer can send messages while a Connection is stopped.

Some of the methods that are supported on the Connection object are

Session

A Connection is a factory for Sessions that use its underlying connection to a JMS provider for producing and consuming messages. A JMS Session is a single threaded context for producing and consuming messages. Although it may allocate provider resources outside the Java virtual machine, it is considered a light-weight JMS object.

A Session serves several purposes:

When you use the OCI JDBC driver, you can create multiple sessions for each connection. When you use other JDBC drivers, only one session can be created from one connection.

Because a provider may allocate some resources on behalf of a session outside the JVM, clients should close them when they are not needed. Relying on garbage collection to eventually reclaim these resources may not be timely enough. The same is true for the MessageProducers and MessageConsumers created by a session.

Methods on the Session object include:

The following are some of the extensions to JMS made by Oracle. The Session object has to be cast to AQjmsSession to use any of the extensions.

The following code illustrates how some of the preceding calls are used.

Example Code

public static void bol_example(String ora_sid, String host, int port,
                               String driver)
{
 
 QueueConnectionFactory    qc_fact   = null;
 QueueConnection           q_conn    = null;
 QueueSession              q_sess    = null;
 AQQueueTableProperty      qt_prop   = null;
 AQQueueTable              q_table   = null;
 AQjmsDestinationProperty  dest_prop = null;  
 Queue                     queue     = null;
 BytesMessage              bytes_msg = null;

 try
 {
   /* get queue connection factory */
   qc_fact = AQjmsFactory.getQueueConnectionFactory(host, ora_sid,
                      port, driver);

   /* create queue connection */
   q_conn = qc_fact.createQueueConnection("boluser", "boluser");   

   /* create queue session */
   q_sess = q_conn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);

   /* start the queue connection */
   q_conn.start();

   qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
  
   /* create a queue table */
   q_table = ((AQjmsSession)q_sess).createQueueTable("boluser",
                                                     "bol_ship_queue_table",
                                                     qt_prop);

   dest_prop = new AQjmsDestinationProperty();

   /* create a queue */
   queue = ((AQjmsSession)q_sess).createQueue(q_table, "bol_ship_queue",
                                             dest_prop);

   /* start the queue */
   ((AQjmsDestination)queue).start(q_sess, true, true);

   /* create a bytes message */
   bytes_msg = q_sess.createBytesMessage();

   /* close session */
   q_sess.close();

   /* close connection */
   q_conn.close();
 }
 catch (Exception ex)
 {
   System.out.println("Exception: " + ex);
 }
}

JMS Destinations - Queue and Topic

A Destination is an object a client uses to specify the destination where it sends messages, and the source from which it receives messages.

There are two types of destination objects - Queue and Topic. In AQ, these map to a <schema>.<queue> at a specific database. Queue maps to a single-consumer queue in AQ and Topic maps to multiconsumer queue in AQ.

Destination objects can be obtained in one of the following ways:

  1. Using domain specific methods in the JMS Session
  2. Java Naming and Directory Interface (JNDI) Lookup from a LDAP directory server

Using a JMS Session to Obtain Destination Objects

Destination objects are created from a Session object using domain specific session methods.

Example Code

In the BooksOnline application, new orders are to be sent to the neworders_queue in OE schema. After creating a JMS connection and session, we can get a handle to the queue as follows

public Queue get_queue_example(QueueSession jms_session)
{
   QueueSender   sender;
   Queue         queue = null;

   try
   {
  /* get a handle to the OE.oe_new_orders queue */   
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
   }
   catch (JMSException ex){
      System.out.println("Exception: " + ex); }
      return queue;
}   
      

Using JNDI to Look Up Destination Objects

As described in "Connection Factory", the database can be configured to register schema objects with an LDAP server. If a database has been configured to use LDAP and the GLOBAL_TOPIC_ENABLED parameter has been set to TRUE, then all JMS queues and topics are automatically registered with the LDAP server when they are created.

The administrator can also create aliases to the queues and topics registered in LDAP using the DBMS_AQAQDM.add_alias_to_ldap PL/SQL procedure.

Queues and topics that are registered in LDAP can be looked up through JNDI using the queue/topic name or one of their aliases.

Example Code

Lets say we have a new orders queue OE.OE_neworders_que stored in LDAP, it can be looked up as follows:

public static void get_Factory_from_LDAP() throws Exception
{
    Hashtable env = new Hashtable(5, 0.75f);
    env.put(Context.INITIAL_CONTEXT_FACTORY, AQjmsConstants.INIT_CTX_FACTORY);

    // aqldapserv is your LDAP host and 389 is your port
    env.put(Context.PROVIDER_URL, "ldap://aqldapserv:389); 
    
    // now authentication info
    // username/password scheme, user is OE, password is OE
    env.put(Context.SECURITY_AUTHENTICATION, "simple"); 
    env.put(Context.SECURITY_PRINCIPAL, "cn=oe,cn=users,cn=acme,cn=com");
    env.put(Context.SECURITY_CREDENTIALS, "oe");

    DirContext inictx = new InitialDirContext(env);
    // initialize context with the distinguished name of the database server
    inictx=(DirContext)inictx.lookup("cn=db1,cn=OracleContext,cn=acme,cn=com");

   // go to the destination holder
    DirContext destctx = (DirContext)inictx.lookup("cn=OracleDBQueues");
    
    // get the destination OE.OE_new_orders queue
    Queue myqueue = (Queue)destctx.lookup("cn=OE.OE_new_orders_que");

}

Methods on the Destination Object include:

Example Code

public static void setup_example(TopicSession t_sess)
{
  AQQueueTableProperty     qt_prop   = null;
  AQQueueTable             q_table   = null;
  AQjmsDestinationProperty dest_prop = null; 
  Topic                    topic     = null;
  TopicConnection          t_conn    = null;

  try
  {
    qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
 /* create a queue table */
    q_table = ((AQjmsSession)t_sess).createQueueTable("boluser",
                           "bol_ship_queue_table",
                           qt_prop);
    dest_prop = new AQjmsDestinationProperty();
 /* create a topic */
    topic = ((AQjmsSession)t_sess).createTopic(q_table, "bol_ship_queue",
                        dest_prop);
  /* start the topic */
    ((AQjmsDestination)topic).start(t_sess, true, true);
     
 /* schedule propagation from topic "boluser" to the destination 
 dblink "dba" */
    ((AQjmsDestination)topic).schedulePropagation(t_sess, "dba", null,
                   null, null, null);
  /* 
  some processing done here
 */
 /* Unschedule propagation */
    ((AQjmsDestination)topic).unschedulePropagation(t_sess, "dba");
 /* stop the topic */
    ((AQjmsDestination)topic).stop(t_sess, true, true, true);
 /* drop topic */
    ((AQjmsDestination)topic).drop(t_sess);
 /* drop queue table */
    q_table.drop(true);
 /* close session */
    t_sess.close();
 /* close connection */
    t_conn.close();
  }
  catch(Exception ex)
  {
    System.out.println("Exception: " + ex);
  }
}

System-Level Access Control in JMS

Oracle8i supports system-level access control for all queuing operations. This feature allows an application designer or DBA to create users as queue administrators. A queue/topic administrator can invoke all JMS interface (both administration and operation) on any queue in the database. This simplifies the administrative work since all administrative scripts for the queues in a database can be managed under one schema. For more information, see "Oracle Enterprise Manager Support" .

Example Scenario and Code

In the BooksOnLine (BOL) application, the DBA creates BOLADM, the BooksOnLine Administrator account, as the queue administrator of the database. This allows BOLADM to create, drop, manage, and monitor any queues in the database.If you decide to create PL/SQL packages in the BOLADM schema that can be used by any applications to enqueue or dequeue, then you should also grant BOLADM the ENQUEUE_ANY and DEQUEUE_ANY system privilege.

CREATE USER BOLADM IDENTIFIED BY BOLADM; GRANT CONNECT, RESOURCE, aq_
administrator_role TO BOLADM; 
((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "BOLADM", false);
((AQjmsSession)t_sess).grantSystemPrivilege("DEQUEUE_ANY", "BOLADM", false)
;where t_sess is the session object.

In the application, AQ propagators populate messages from the OE (Order Entry) schema to WS (Western Sales), ES (Eastern Sales) and OS (Worldwide Sales) schemas. The WS, ES and OS schemas in turn populate messages to CB (Customer Billing) and CS (Customer Service) schemas. Hence the OE, WS, ES and OS schemas all host queues that serve as the source queues for the propagators.

When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you need to grant schemas of the source queues enqueue privileges to the destination queues.

To simplify administration, all schemas that host a source queue in the BooksOnLine application are granted the ENQUEUE_ANY system privilege.

((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "OE", false);
((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "WS", false);
((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "ES", false);
((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "OS", false);
where t_sess is the session object

To propagate to a remote destination queue, the login user (specified in the database link in the address field of the agent structure) should either be granted the 'ENQUEUE ANY' privilege, or be granted the rights to enqueue to the destination queue. However, you do not need to grant any explicit privileges if the login user in the database link also owns the queue tables at the destination.

Destination-Level Access Control in JMS

Oracle8i supports queue/topic level access control for enqueue and dequeue operations. This feature allows the application designer to protect queues/topics created in one schema from applications running in other schemas. You need to grant only minimal access privileges to the applications that run outside the queue/topic's schema. The supported access privileges on a queue/topic are ENQUEUE, DEQUEUE and ALL. For more information see "Oracle Enterprise Manager Support" in Chapter 4, "Managing AQ".

Example Scenario and Code

The BooksOnLine application processes customer billings in its CB and CBADM schemas. CB (Customer Billing) schema hosts the customer billing application, and the CBADM schema hosts all related billing data stored as queue tables. To protect the billing data, the billing application and the billing data reside in different schemas. The billing application is allowed only to dequeue messages from CBADM_shippedorders_topic, the shipped order topic. It processes the messages, and then enqueues new messages into CBADM_billedorders_topic, the billed order topic.

To protect the queues from other illegal operations from the application, the following two grant calls are made:

/* Grant dequeue privilege on the shipped orders queue to the Customer 
 Billing application. The CB application retrieves orders that are shipped 
 but not billed from the shipped orders queue. */  

((AQjmsDestination)cbadm_shippedorders_topic).grantTopicPrivilege(t_sess, 
"DEQUEUE", "CB", false);
where t_sess is the session

/* Grant enqueue privilege on the billed orders queue to Customer Billing 
 application.The CB application is allowed to put billed orders into this 
 queue after processing the orders. */ 

((AQjmsDestination)cbadm_billedorders_topic).grantTopicPrivilege(t_sess, 
"ENQUEUE", "CB", false);

Retention and Message History in JMS

AQ allows users retain messages in the queue table. This means that SQL can then be used to query these message for analysis. Messages are often related to each other. For example, if a message is produced as a result of the consumption of another message, the two are related. As the application designer, you may want to keep track of such relationships. Along with retention and message identifiers, AQ lets you automatically create message journals, also called tracking journals or event journals. Taken together -- retention, message identifiers and SQL queries -- make it possible to build powerful message warehouses.

Example Scenario and Code

Let us suppose that the shipping application needs to determine the average processing times of orders. This includes the time the order has to wait in the backed_order topic. Specifying the retention as TRUE for the shipping queues and specifying the order number in the correlation field of the message, SQL queries can be written to determine the wait time for orders in the shipping application.

For simplicity, we will only analyze orders that have already been processed. The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_topic and the enqueue time in the WS_shipped_orders_topic.

SELECT  SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME 
   FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO  
   WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' 
   AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_topic'; 
 
/* Average waiting time in the backed order queue: */ 
SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME 
   FROM WS.AQ$WS_orders_mqtab BACK  
   WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_topic'; 

Supporting Oracle Real Application Clusters in JMS

Oracle Real Application Clusters can be used to improve AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue/dequeue) or topic operations (publish/subscribe) on different queues or topics to occur in parallel.

The AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance.

If the owner instance of a queue table terminates, the queue monitor changes ownership to a suitable instance such as the secondary instance.

AQ propagation is able to make use of Real Application Clusters, although it is transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as that of the affinities of the respective queue tables. Thus, a job_queue_process associated with the owner instance of a queue table will be handling the propagation from queues stored in that queue table thereby minimizing pinging. Additional discussion on this topic can be found under AQ propagation scheduling (see "Scheduling a Queue Propagation"in Chapter 9, "Administrative Interface"and Oracle9i Real Application Clusters Setup and Configuration.)

Example Scenario and Code

In the BooksOnLine example, operations on the OE_neworders_que and booked_order_topic at the order entry (OE) site can be made faster if the two topics are associated with different instances. This is done by creating the topics in different queue tables and specifying different affinities for the queue tables in the CreateQueueTable() command.

In the example, the queue table OE_orders_sqtab stores queue OE_neworders_que and the primary and secondary are instances 1 and 2 respectively. For queue table OE_orders_mqtab stores queue booked_order_topic and the primary and secondary are instances 2 and 1 respectively. The objective is to let instances 1 & 2 manage the two queues in parallel. By default, only one instance is available. In this case the owner instances of both queue tables will be set to instance1. However, if Oracle Real Application Clusters are set up correctly and both instances 1 and 2 are available, then queue table OE_orders_sqtab will be owned by instance 1 and the other queue table will be owned by instance 2. The primary and secondary instance specification of a queue table can be changed dynamically using the alter_queue_table() command as shown in the example that follows. Information about the primary, secondary and owner instance of a queue table can be obtained by querying the view USER_QUEUE_TABLES. See "Selecting Queue Tables in User Schema" in Chapter 10, "Administrative Interface: Views".

/* Create queue tables, topics for OE */

/* createing a queue table to hold queues */
qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_OBJECT_MESSAGE");
qt_prop.setPrimaryInstance(1);
qt_prop.setSecondaryInstance(2);
q_table = createQueueTable("OE", "OE_orders_sqtab", qt_prop);

/* creating a queue table to hold topics */
qt1_prop = new AQQueueTableProperty("SYS.AQ$_JMS_OBJECT_MESSAGE");
qt1_prop.setMultiConsumer(TRUE);
qt1_prop.setPrimaryInstance(2);
qt1_prop.setSecondaryInstance(1);
q_table1 = createQueueTable("OE", "OE_orders_mqtab", qt1_prop);

dest_prop = new AQjmsDestinationProperty();
queue = ((AQjmsSession)q_sess).createQueue(q_table. "OE_neworders_que", 
                                           dest_prop);

dest_prop1 = new AQjmsDestinationProperty();
topic = ((AQjmsSession)q_sess).createTopic(q_table1, "OE_bookedorders_topic", 
                                           dest_prop1);

  
/* Check instance affinity of OE queue tables from AQ administrative view: */ 
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 

/* Alter Instance Affinity of OE queue tables */
q_table.alter("OE_orders_sqtab", 2, 1);
q_table1.alter("OE_orders_mqtabl", 1, 2);

Supporting Statistics Views in JMS

Each instance keeps its own AQ statistics information in its own System Global Area (SGA), and does not have knowledge of the statistics gathered by other instances. Then, when a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.

Example Scenario and Code

The gv$view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order-processing processes. See Chapter , "Selecting the Number of Messages in Different States for the Whole Database" in Chapter 10, "Administrative Interface: Views".

CONNECT oe/oe 
 
/* Count the number as messages and the average time for which the messages 
   have been waiting: */ 
SELECT READY, AVERAGE_WAIT 
FROM gv$aq Stats, user_queues Qs 
WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que'; 

Structured Payload/Message Types in JMS

JMS Messages are composed of the following parts:

Message Headers

You can use a header-only JMS message. A message body is not required. The message header contains the following fields:

Message Properties

Properties are a mechanism to add optional header fields to a message. Properties allow a client, using message selectors, to have a JMS provider select messages on its behalf using application-specific criteria. Property names are Strings and values can be: boolean, byte, short, int, long, float, double, and string.

JMS-defined properties begin with "JMSX".

Oracle JMS specific properties begin with JMS_Oracle. The following properties are Oracle-specific:

A client can add additional header fields to a message by defining properties. These properties can then be used in message selectors to select specific messages.

JMS properties or header fields are set either explicitly by the client or automatically by the JMS provider (these are generally read-only). Some JMS properties are set using the parameters specified send and receive operations.

Table 12-1 Message Header Fields
Message Header Field  Type Set by Use

JMSDestination

Destination

Set by JMS after Send Method has completed

The destination to which the message is sent

JMSDeliveryMode

int

Set by JMS after Send Method has completed

The delivery mode -PERSISTENT

JMSExpiration

long

Set by JMS after Send Method has completed

The expiration time can be specified for a Message Producer or can be explicitly specified during each send or publish

JMSPriority

int

Set by JMS after Send Method has completed

Message's priority can be specified for a Message Producer or can be explicitly specified during each send or publish

JMSMessageID

String

Set by JMS after Send Method has completed

A value that uniquely identifies each message sent by the provider

JMSTimeStamp

long

Set by JMS after Send Method has completed

The time a message is handed to a provider to be sent

JMSCorrelationID

String

Set by JMS client

A field that can be used to link one message with another

JMSReplyTo

Destination

Set by JMS client

A destination set by the client, where a reply to the message should be sent. Should be specified as AQjsAgent, javax.jms.Queue, or javax.jms.Topic types

JMSType

String

Set by JMS client

Message type identifier

JMSRedelivered

boolean

Set by JMS provider

The message probably was delivered earlier but the client did not acknowledge it at that time

Table 12-2 JMS Defined Message Properties
JMS Defined Message Property Type Set by Use

JMSXUserID

String

Set by JMS after Send Method has completed

The identity of the user sending the message

JMSAppID

String

Set by JMS after Send Method has completed

The identity of the application sending the message

JMSDeliveryCount

int

Set by JMS after Receive Method has completed

The number of message delivery attempts; the first is 1, second is 2,...

JMSXGroupID

String

Set by JMS client

The identity of the message group the message is a part of

JMSXGroupSeq

int

Set by JMS client

The sequence number of the message within the group first message is 1, second message is 2...

JMSXRcvTimeStamp

String

Set by JMS after Receive Method has completed

The time that JMS delivered the message to the consumer

JMSXState

int

Set by JMS Provider

Message state set by provider

Table 12-3 Oracle Defined Message Properties
Header Field/Property Type Set by Use

JMS_OracleExcpQ

String

Set by JMS Client

Specifies the name of the exception queue

JMS_OracleDelay

int

Set by JMS Client

Specifies the time (seconds) after which the message should become available to the consumers

JMS_OracleOriginalMessageID

String

Set by JMS Provider

Specifies the message id of the message in source when the messages are propagated from one destination to another

Message Body

JMS provides five forms of message body:

The AQ$_JMS_MESSAGE Type

This type can store JMS messages of all the JMS-specified message types: JMSStream, JMSBytes, JMSMap, JMSText, and JMSObject. You can create a queue table of AQ$_JMS_MESSAGE type, but use any message type.

Stream Message

A StreamMessage is used to send a stream of Java primitives. It is filled and read sequentially. It inherits from Message and adds a stream message body. Its methods are based largely on those found in java.io.DataInputStream and java.io.DataOutputStream.

The primitive types can be read or written explicitly using methods for each type. They may also be read or written generically as objects. To use Stream Messages, create the queue table with the SYS.AQ$_JMS_STREAM_MESSAGE or AQ$_JMS_MESSAGE payload types.

Stream messages support the following conversion table. A value written as the row type can be read as the column type.

Table 12-4 Stream Message Conversion
boolean byte short char int long float double String byte[]

boolean

X

-

-

-

-

-

-

-

X

-

byte

-

X

X

-

X

X

-

-

X

-

short

-

-

X

-

X

X

-

-

X

-

char

-

-

-

X

-

-

-

-

X

-

int

-

-

-

-

X

X

-

-

X

-

long

-

-

-

-

-

X

-

-

X

-

float

-

-

-

-

-

-

X

X

X

-

double

-

-

-

-

-

-

-

X

X

-

String

X

X

X

X

X

X

X

X

X

-

byte[]

-

-

-

-

-

-

-

-

-

X

Bytes Message

A BytesMessage is used to send a message containing a stream of uninterpreted bytes. It inherits Message and adds a bytes message body. The receiver of the message supplies the interpretation of the bytes. Its methods are based largely on those found in java.io.DataInputStream and java.io.DataOutputStream.

This message type is for client encoding of existing message formats. If possible, one of the other self-defining message types should be used instead.

The primitive types can be written explicitly using methods for each type. They may also be written generically as objects. To use Bytes Messages, create the queue table with SYS.AQ$_JMS_BYTES_MESSAGE or AQ$_JMS_MESSAGE payload types.

Map Message

A MapMessage is used to send a set of name-value pairs where names are Strings and values are Java primitive types. The entries can be accessed sequentially or randomly by name. The order of the entries is undefined. It inherits from Message and adds a map message body. The primitive types can be read or written explicitly using methods for each type. They may also be read or written generically as objects.

To use Map Messages, create the queue table with the SYS.AQ$_JMS_MAP_MESSAGE or AQ$_JMS_MESSAGE payload types. Map messages support the following conversion table. A value written as the row type can be read as the column type.

Table 12-5 Map Message Conversion
boolean byte short char int long float double String byte[]

boolean

X

-

-

-

-

-

-

-

X

-

byte

-

X

X

-

X

X

-

-

X

-

short

-

-

X

-

X

X

-

-

X

-

char

-

-

-

X

-

-

-

-

X

-

int

-

-

-

-

X

X

-

-

X

-

long

-

-

-

-

-

X

-

-

X

-

float

-

-

-

-

-

-

X

X

X

-

double

-

-

-

-

-

-

-

X

X

-

String

X

X

X

X

X

X

X

X

X

-

byte[]

-

-

-

-

-

-

-

-

-

X

Text Message

A TextMessage is used to send a message containing a java.lang.StringBuffer. It inherits from Message and adds a text message body. The text information can be read or written using methods getText() and setText(...). To use Text Messages, create the queue table with the SYS.AQ$_JMS_TEXT_MESSAGE or AQ$_JMS_MESSAGE payload types.

Object Message

An ObjectMessage is used to send a message that contains a serializable Java object. It inherits from Message and adds a body containing a single Java reference. Only serializable Java objects can be used. If a collection of Java objects must be sent, one of the collection classes provided in JDK 1.2 can be used. The objects can be read or written using the methods getObject() and setObject(...).To use Object Messages, create the queue table with the SYS.AQ$_JMS_OBJECT_MESSAGE or AQ$_JMS_MESSAGE payload types.

Example Code

public void enqueue_new_orders(QueueSession jms_session, BolOrder new_order)
{
   QueueSender   sender;
   Queue         queue;
   ObjectMessage obj_message;   

   try
   {
       /* get a handle to the new_orders queue */   
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
       sender = jms_session.createSender(queue);
       obj_message = jms_session.createObjectMessage();
       obj_message.setJMSCorrelationID("RUSH");   
       obj_message.setObject(new_order);
       jms_session.commit();   
    }
    catch (JMSException ex)
    {
      System.out.println("Exception: " + ex); 
    }
   
}   

AdtMessage

An AdtMessage is used to send a message that contains a Java object that maps to an Oracle Object type. These objects inherit from Message and add a body containing a Java object that implements the CustomDatum or ORAData interface.

See Also:

Oracle9i Java Developer's Guide for information about the CustomDatum and ORAData interfaces

To use AdtMessage, create the queue table with payload type as the Oracle Object Type. The AdtMessage payload can be read and written using the getAdtPayload and setAdtPayload methods.

You can also use an AdtMessage to send messages to queues of type SYS.XMLType. You must use the oracle.xdb.XMLType class to create the message.

Using Message Properties with Different Message Types

Payload Used by JMS Examples

/*
 *  BooksOrder - payload for BooksOnline example
 *
 */

import java.lang.*;
import java.io.*;
import java.util.*;

public class BolOrder implements Serializable
{

  int             orderno;
  String          status;
  String          type;
  String          region;
  BolCustomer     customer;
  String          paymentmethod;
  BolOrderItem[]  itemlist;
  String          ccnumber;
  Date            orderdate;

  public BolOrder(int orderno, BolCustomer customer)
  {
    this.customer   = customer;
    this.orderno    = orderno;
  }

  public int getOrderNo()
  {
    return orderno;
  }

  public String getStatus()
  {
    return status;
  }


  public void setStatus(String new_status)
  {
    status = new_status;
  }


  public String getRegion()
  {
    return region;
  }


  public void setRegion(String region)
  {
    this.region = region;
  }

  public BolCustomer getCustomer()
  {
    return customer;
  }


  public String getPaymentmethod()
  {
    return paymentmethod;
  }

  public void setPaymentmethod(String paymentmethod)
  {
    this.paymentmethod = paymentmethod;
  }

  public BolOrderItem[] getItemList()
  {
    return itemlist;
  }

  public void setItemList(BolOrderItem[] itemlist)
  {
    this.itemlist = itemlist;
  }

  public String getCCnumber()
  {
    return ccnumber;
  }

  public void setCCnumber(String ccnumber)
  {
    this.ccnumber = ccnumber;
  }

  public Date getOrderDate()
  {
    return orderdate;
  }

  public void setOrderDate(Date orderdate)
  {
    this.orderdate = orderdate;
  }

}

/*
 *  BolOrderItem - order item type  for BooksOnline example
 *
 */

import java.lang.*;
import java.io.*;
import java.util.*;

public class BolOrderItem implements Serializable
{

  BolBook     item;
  int         quantity;

  public BolOrderItem(BolBook book, int quantity)
  {  
    item          = book;
    this.quantity = quantity;

  }

  public BolBook getItem()
  {
    return item;
  }

  public int getQuantity()
  {
    return quantity;
  }
}


/*
 *  BolBook - book type  for BooksOnline example
 *
 */

import java.lang.*;
import java.io.*;
import java.util.*;

public class BolBook implements Serializable
{

  String    title;
  String    authors;
  String    isbn;
  float     price;
 
  public BolBook(String title)
  {  
    this.title   = title;
  }

  public BolBook(String title, String authors, String isbn, float price)
  {  
    this.title   = title;
    this.authors = authors;
    this.isbn    = isbn;
    this.price   = price;

  }

  public String getISBN()
  {
    return isbn;
  }

  public String getTitle()
  {
    return title;
  }

  public String getAuthors()
  {
    return authors;
  }

  public float getPrice()
  {
    return price;
  }

}
/*
 *  BolCustomer - customer type  for BooksOnline example
 *
 */

import java.lang.*;
import java.io.*;
import java.util.*;

public class BolCustomer implements Serializable
{

  int          custno;
  String       custid;
  String       name;
  String       street;
  String       city;
  String       state;
  int          zip;
  String       country;

  public BolCustomer(int custno, String name)
  {

    this.custno  = custno;
    this.name    = name;
  }

  public BolCustomer(int custno, String custid, String name, String  street,
           String city, String state, int zip, String country)
  {

    this.custno  = custno;
    this.custid  = custid;
    this.name    = name;
    this.street  = street;
    this.city    = city;
    this.state   = state;
    this.zip     = zip;
    this.country = country;

  }

  public int getCustomerNo()
  {
    return custno;
  }

  public String getCustomerId()
  {
    return custid;
  }

  public String getName()
  {
    return name;
  }

  public String getStreet()
  {
    return street;
  }

  public String getCity()
  {
    return city;
  }

  public String getState()
  {
    return state;
  }

  public int getZipcode()
  {
    return zip;
  }

  public String getCountry()
  {
    return country;
  }
}

JMS Point-to-Point Model Features

Queues

In the point-to-point model, clients exchange messages using queues - from one point to another. These queues are used by message producers and consumers to send and receive messages.

An administrator creates single-consumer queues by means of the createQueue method in AQjmsSession. A client may obtain a handle to a previously created queue using the getQueue method on AQjmsSession.

These queues are described as single-consumer queues because a message can be consumed by only a single consumer. Put another way: a message can be consumed exactly once. This raises the question: What happens when there are multiple processes or operating system threads concurrently dequeuing from the same queue? Since a locked message cannot be dequeued by a process other than the one that has created the lock, each process will dequeue the first unlocked message at the head of the queue.

Before using a queue, the queue needs to be enabled for enqueue/dequeue using start call in AQjmsDestination.

After processing, the message is removed if the retention time of the queue is 0, or is retained for a specified retention time. As long as the message is retained, it can be either

Queue Sender

A client uses a QueueSender to send messages to a queue. A QueueSender is created by passing a queue to a session's createSender method. A client also has the option of creating a QueueSender without supplying a queue. In that case a queue must be specified on every send operation.

A client can specify a default delivery mode, priority and time-to-live for all messages sent by the QueueSender. Alternatively, the client can define these options on a per message basis.

Example Code

In the BooksOnline application, new orders are to be sent to the new_orders_queue. After creating a JMS connection and session, we create a sender:

public void enqueue_new_orders(QueueSession jms_session, BolOrder new_order)
{
   QueueSender   sender;
   Queue         queue;
   ObjectMessage obj_message;   

   try
   {
  /* get a handle to the new_orders queue */   
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
       sender = jms_session.createSender(queue);
       obj_message = jms_session.createObjectMessage();
       obj_message.setJMSCorrelationID("RUSH");   
       obj_message.setObject(new_order);
       sender.send(obj_message);
       jms_session.commit();   
    }
    catch (JMSException ex)
    {
      System.out.println("Exception: " + ex); 
    }
}

Queue Receiver

A client uses a QueueReceiver to receive messages from a queue. A QueueReceiver is created using the session's createQueueReceiver method. A QueueReceiver can be created with a message selector. This allows the client to restrict messages delivered to the consumer to those that match the selector.

The selector for queues containing payloads of type TextMessage, StreamMessage, BytesMessage, ObjectMessage, MapMessage can contain any expression that has a combination of one or more of the following:

For queues containing AdtMessages the selector must be a SQL expression on the message payload contents or message ID or priority or correlation ID.

Example Scenario and Code

In the BOL application, new orders are retrieved from the new_orders_queue. These orders are then published to the OE.OE_bookedorders_topic. After creating a JMS connection and session, you create a receiver to receive messages:

  public void get_new_orders(QueueSession jms_session)
  {
   QueueReceiver   receiver;
   Queue           queue;
   ObjectMessage   obj_message;   
   BolOrder        new_order;
   BolCustomer     customer;         
   String          state;
   String          cust_name;   

   try
   {

     /* get a handle to the new_orders queue */   
     queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

     receiver = jms_session.createReceiver(queue);

     for(;;)
     {
       /* wait for a message to show up in the queue */
       obj_message = (ObjectMessage)receiver.receive(10);
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       obj_message.clearBody();
       
       /* determine customer region and assign a shipping region*/
    if((state.equals("CA")) || (state.equals("TX")) ||
     (state.equals("WA")) || (state.equals("NV")))
    obj_message.setStringProperty("Region", "WESTERN");
       else
    obj_message.setStringProperty("Region", "EASTERN");
       
       cust_name = new_order.getCustomer().getName();   
       
       obj_message.setStringProperty("Customer", cust_name);
       
    if(obj_message.getJMSCorrelationID().equals("RUSH"))
    book_rush_order(obj_message);
    else
    book_new_order(obj_message);
    
       jms_session.commit();      
     } 
   }
   catch (JMSException ex)
   {
     System.out.println("Exception: " + ex);
   }
}   
   

Queue Browser

A client uses a QueueBrowser to view messages on a queue without removing them. The browser methods return a java.util.Enumeration that is used to scan the queue's messages. The first call to nextElement gets a snapshot of the queue. A QueueBrowser may also optionally lock messages as it is scanning them. This is similar to a "SELECT ... for UPDATE" command on the message. This prevents other consumers from removing the message while they are being scanned.

A QueueBrowser can also be created with a message selector. This allows the client to restrict messages delivered to the browser to those that match the selector.

The selector for queues containing payloads of type TextMessage, StreamMessage, BytesMessage, ObjectMessage, MapMessage can contain any expression that has a combination of one or more of the following:

For queues containing AdtMessages the selector must be a SQL expression on the message payload contents or messageID or priority or correlationID.

Example Scenario and Code

In the BooksOnline application, new orders are put into the new_orders_queue. A client can then browse selected messages.

public void browse_rush_orders(QueueSession jms_session)
{
   QueueBrowser    browser;
   Queue           queue;
   ObjectMessage   obj_message;   
   BolOrder        new_order;
   Enumeration     messages;
   String          customer_name;   

   try
   {
     /* get a handle to the new_orders queue */   
     queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
     
     /* create a Browser to look at RUSH orders in USA */
     browser = jms_session.createBrowser(queue, 
                               "JMSCorrelationID = 'RUSH' and country = 'USA' ");
     
     for (messages = browser.getEnumeration() ; messages.hasMoreElements() ;)
     {
       obj_message = (ObjectMessage)messages.nextElement();
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer_name = new_order.getCustomer().getName();
       System.out.println("Customer " +  customer_name + 
           " has placed a RUSH order");
     }
     
     browser.close(); 
   }   
   catch (Exception ex)
   {
     System.out.println("Exception " + ex);
   }
}

JMS Publish-Subscribe Model Features

The following topics are discussed in this section:

Topic

JMS has various features that allow you to develop an application based on a publish-subscribe model. The aim of this application model is to enable flexible and dynamic communication between applications functioning as publishers and applications playing the role of subscribers. The specific design point is that the applications playing these different roles should be decoupled in their communication.They should interact based on messages and message content.

In distributing messages, publisher applications do not have to explicitly handle or manage message recipients. This allows for the dynamic addition of new subscriber applications to receive messages without changing any publisher application logic. Subscriber applications receive messages based on message content without regard to which publisher applications are sending messages. This allows the dynamic addition of subscriber applications without changing any subscriber application logic. Subscriber applications specify interest by defining a rule-based subscription on message properties or the message content of a topic. The system automatically routes messages by computing recipients for published messages using the rule-based subscriptions.

In the Publish-Subscribe model, messages are published to and received from topics. A topic is created using the CreateTopic method in an AQjmsSession. A client may obtain a handle to a previously-created Topic using the getTopic method in AQjmsSession.

You use the publish-subscribe model of communication in JMS by taking the following steps:

Example Scenario

In the BooksOnline application all booked orders are published to the OE_bookedorders_topic. Orders for customers in the eastern region are routed to the ES.ES_bookedorders_topic and those for the western region are routed to the WS.WS_bookedorders_topic. There is also another application that subscribes to the OE_bookedorders_topic to track messages for some important customers. Refer to the code examples in the following sections.

Durable Subscriber

Durable Subscribers are instituted in either of the following ways:

The selector for topics containing payloads of type TextMessage, StreamMessage, BytesMessage, ObjectMessage, MapMessage can contain any expression that has a combination of one or more of the following:

For topics containing AdtMessages the selector must be a SQL expression on the message payload contents or priority or correlationID.

The syntax for the selector is described in detail in Oracle9i Supplied Java Packages Reference, createDurableSubscriber.

Remote subscribers are defined using the createRemoteSubscriber call.The remote subscriber may be a specific consumer at the remote topic or all subscribers at the remote topic

A remote subscriber is defined using the AQjmsAgent structure. An AQjmsAgent consists of a name and address. The name refers to the consumer_name at the remote topic. The address refers to the remote topic:

<schema>.<topic_name>[@dblink]

In the BooksOnline application there is one local subscriber SUBS1 and two remote subscribers -

Example Code

 public void create_booked_orders_subscribers(TopicSession jms_session)
{
   Topic            topic;
   TopicSubscriber  tsubs;
   AQjmsAgent       agt_east;
   AQjmsAgent       agt_west;

   try
   {
   
 /* get a handle to the OE_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("OE", 
                    "OR_bookedorders_topic");
     
     
 /* Create local subscriber - to track messages for some customers */
     tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
             "JMSPriority < 3 AND Customer = 'MARTIN'", 
                   false);
     
 /* Create remote subscribers in the western and eastern region */   
     agt_west = new AQjmsAgent("West_Shipping", "WS.WS_bookedorders_topic");
     
     ((AQjmsSession)jms_session).createRemoteSubscriber(topic, agt_west, 
                     "Region = 'WESTERN'");

     agt_east = new AQjmsAgent("East_Shipping", "ES.ES_bookedorders_topic");
     
     ((AQjmsSession)jms_session).createRemoteSubscriber(topic, agt_east, 
                     "Region = 'EASTERN'");

 /* schedule propagation between bookedorders_topic and 
   WS_bookedorders_topic, ES.ES_bookedorders_topic */   
     ((AQjmsDestination)topic).schedulePropagation(jms_session, 
                      "WS.WS_bookedorders_topic", 
                         null, null, null, null);

     ((AQjmsDestination)topic).schedulePropagation(jms_session, 
                   "ES.ES_bookedorders_topic", 
                      null, null, null, null);
   }   
   catch (Exception ex)
   {
     System.out.println("Exception " + ex);
   }
      
}

Topic Publisher

Messages are published using TopicPublisher:

A TopicPublisher is created by passing a Topic to a session's createPublisher method. A client also has the option of creating a TopicPublisher without supplying a Topic. In this case, a Topic must be specified on every publish operation. A client can specify a default delivery mode, priority and time-to-live for all messages sent by the TopicPublisher. It can also specify these options on a per message basis.

Example Scenario and Code

In the BooksOnline application, booked orders are published to the OE.OE_bookedorders_topic

public void book_new_order(TopicSession jms_session, ObjectMessage obj_message)
{
   TopicPublisher  publisher;   
   Topic           topic;

   try
   {
     /* get a handle to the booked_orders topic */   
     topic = ((AQjmsSession) jms_session).getTopic("OE",
                     "OE_bookedorders_topic");
     
     publisher = jms_session.createPublisher(topic);   
     
     publisher.publish(topic, obj_message);
     
     jms_session.commit();   
   }
   catch (JMSException ex)
   {
     System.out.println("Exception: " + ex);
   }

}

In the BooksOnline application, each shipping region receives messages from the corresponding booked orders topic (WS_bookedorder_topic or ES_bookedorder_topic). The local subscriber SUBS1 receives messages from the OE_booked_orders_topic.

public void get_martins_orders(TopicSession jms_session)
{
   Topic            topic;
   TopicSubscriber  tsubs;
   ObjectMessage    obj_message;
   BolCustomer      customer;
   BolOrder         new_order;
   String           state;
   int              i = 0;
   
   try
   {
     /* get a handle to the OE_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("OE", 
                  "OE_bookedorders_topic");
   
     /* Create local subscriber - to track messages for some customers */
     tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
                "JMSPriority < 3 AND Customer = 'MARTIN'",
                   false);

     /* process 10 messages */
     for(i=0; i<10; i++)
     {
       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)tsubs.receive(10);
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Order: " + i + " for customer " + 
           customer.getName()); 
       jms_session.commit();      
     }    
   }
   catch (Exception ex)
   {
     System.out.println("Exception " + ex); 
   }
}

Recipient Lists

In the JMS publish-subscribe model, clients can specify explicit recipient lists instead of having messages sent to all the subscribers of the topic. These recipients may or may not be existing subscribers of the topic. The recipient list overrides the subscription list on the topic for this message. The concept of recipient lists is an Oracle extension to JMS.

Example Scenario and Code

Suppose we want to send high priority messages only to SUBS1 and Fedex_Shipping in the Eastern region instead of publishing them to all the subscribers of the OE_bookedorders_topic:

public void book_rush_order(TopicSession jms_session, 
             ObjectMessage obj_message)
{
     
   TopicPublisher  publisher;   
   Topic           topic;
   AQjmsAgent[]    recp_list = new AQjmsAgent[2];   

   try
   {
 /* get a handle to the booked_orders topic */   
     topic = ((AQjmsSession) jms_session).getTopic("OE",
                     "OE_bookedorders_topic");
     
     publisher = jms_session.createPublisher(null);   
     
     recp_list[0] = new AQjmsAgent("SUBS1", null);
     recp_list[1] = new AQjmsAgent("Fedex_Shipping", 
               "ES.ES_bookedorders_topic");
     
     publisher.setPriority (1);
     ((AQjmsTopicPublisher)publisher).publish(topic, obj_message, recp_list);
     
     jms_session.commit();   

   }
   catch (Exception ex)
   {
     System.out.println("Exception: " + ex);
   }
}

TopicReceiver

If the recipient name is explicitly specified in the recipient list, but that recipient is not a subscriber to the queue, then messages sent to it can be received by creating a TopicReceiver.TopicReceiver is an Oracle extension to JMS.

A TopicReceiver can also be created with a message selector. This allows the client to restrict messages delivered to the recipient to those that match the selector.

The syntax for the selector for TopicReceiver is the same as that for QueueReceiver.

Example Scenario and Code

public void ship_rush_orders(TopicSession jms_session)
{
   Topic            topic;
   TopicReceiver    trec;
   ObjectMessage    obj_message;
   BolCustomer      customer;
   BolOrder         new_order;
   String           state;
   int              i = 0;   

   try
   {
 /* get a handle to the OE_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("ES", 
                    "ES_bookedorders_topic");
   
 /* Create local subscriber - to track messages for some customers */
     trec = ((AQjmsSession)jms_session).createTopicReceiver(topic, 
                         "Fedex_Shipping", 
                         null);
     
     /* process 10 messages */
     for(i = 0; i < 10; i++)
     {
       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)trec.receive(10);
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Rush Order for customer " + 
           customer.getName()); 
       jms_session.commit();      
     }    
   }
   catch (Exception ex)
   {
     System.out.println("Exception ex: " + ex);
   }
}

For remote subscribers - if the subscriber name at the remote topic has explicitly been specified in the createRemoteSubscriber call, then to receive a message, we can use TopicReceivers

public void get_westernregion_booked_orders(TopicSession jms_session)
{
   Topic            topic;
   TopicReceiver    trec;
   ObjectMessage    obj_message;
   BolCustomer      customer;
   BolOrder         new_order;
   String           state;
   int              i = 0;   

   try
   {
 /* get a handle to the WS_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("WS", 
                    "WS_bookedorders_topic");
   
 /* Create local subscriber - to track messages for some customers */
     trec = ((AQjmsSession)jms_session).createTopicReceiver(topic, 
                         "West_Shipping", 
                         null);

 /* process 10 messages */
     for(i = 0; i < 10; i++)
     {
  /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)trec.receive(10);
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Received Order for customer " + 
           customer.getName()); 
       jms_session.commit();      
     }    
   }
   catch (Exception ex)
   {
     System.out.println("Exception ex: " + ex); 
   }

}

If the subscriber name is not specified in the createRemoteSubscriber call, clients have to use durable subscribers at the remote site to receive messages.

Topic Browser

A client uses a TopicBrowser to view messages on a topic without removing them. The browser methods return a java.util.Enumeration that is used to scan the topic's messages. The first call to nextElement gets a snapshot of the topic. A TopicBrowser may also optionally lock messages as it is scanning them. This is similar to a SELECT ... for UPDATE command on the message. This prevents other consumers from removing the message while they are being scanned.

A TopicBrowser can also be created with a message selector. This allows the client to restrict messages delivered to the browser to those that match the selector.

The selector for the TopicBrowser can take any of the following forms:

For topics containing AdtMessages, the selector must be a SQL expression on the message payload contents or messageID or priority or correlationID.

As with any consumer for topics, only durable subscribers are allowed

to create topic browsers.

TopicBrowsers also support a purge feature. This allows a client using a topic browser to discard all messages that have been seen during the current browse operation on the topic. A purge is equivalent to a destructive receive of all of the seen messages (as if performed using a TopicSubscriber).

For the purpose of a purge, a message is considered seen if it has been returned to the client using a call to the nextElement() operation on the java.lang.Enumeration for the topic browser. Messages that have not yet been seen by the client will not be discarded during a purge. A purge operation may be performed multiple times on the same topic browser.

As with all other JMS messaging operations, the effect of a purge becomes stable when the JMS session used to create the TopicBrowser is committed. If the operations on the session are rolled back, the effects of the purge operation are also undone.

Example Scenario and Code

In the BooksOnline application, all booked orders are published to the OE_booked_orders_topic. A client can then browse selected messages.

import oracle.jms.TopicBrowser;
// ...
public void browse_rush_orders(TopicSession jms_session)
{
    TopicBrowser    browser;
    Topic           topic;
    ObjectMessage   obj_message;
    BolOrder        new_order;
    Enumeration     messages;
    String          customer_name;

    try
    {
        /* get a handle to the OE_booked_orders_topic topic */
        topic = ((AQjmsSession) jms_session).getTopic("OE",
            "OE_booked_orders_topic");

        /* create a Browser to look at RUSH orders */
        browser = jms_session.createBrowser(
            topic, "SUBS1", "JMSCorrelationID = 'RUSH'");

        int count = 0;
        for (messages = browser.getEnumeration() ; messages.hasMoreElements() ;)
        {
            obj_message = (ObjectMessage)messages.nextElement();
            new_order = (BolOrder)obj_message.getObject();

            customer_name = new_order.getCustomer().getName();
            System.out.println("Customer " +  customer_name +
                " has placed a RUSH order");

            ++count;
        }

        /* purge messages seen during this browse if there are too many */
        if (count > 100)
        {
            browser.purgeSeen();
        }

        browser.close();
    }
    catch (Exception ex)
    {
    System.out.println("Exception " + ex);
    }
} 

JMS Message Producer Features

Priority and Ordering of Messages

The message ordering dictates the order in which messages will be received from a queue or topic. The ordering method is specified when the queue table for the queue or topic is created (see "Creating a Queue Table" in Chapter 9, "Administrative Interface"). Currently, AQ supports ordering on two of the message attributes:

When combined, they lead to four possible ways of ordering:

FIFO Ordering of Messages

If enqueue time was chosen as the ordering criteria, then messages are received in the order of the enqueue time. The enqueue time is assigned to the message by AQ at message publish/send time. This is also the default ordering.

Priority Ordering of Messages

If priority ordering is chosen, each message will be assigned a priority. Priority can be specified as a message property at publish/send time by the Message Producer. The messages will be received in the order of the priorities assigned.

First-In, First-Out (FIFO) Priority Ordering

A FIFO-priority topic/queue can also be created by specifying both the priority and the enqueue time as the sort order of the messages. A FIFO-priority topic/queue behaves like a priority queue, except if two messages are assigned the same priority, they will be received in the order of their enqueue time.

Enqueue Time Followed by Priority

Messages with the same enqueue time will be received according to their priorities. If the ordering criteria of two message is the same, then the order they are received is indeterminate. However, AQ does ensure that messages send/published in the same session with the same ordering criteria will be received in the order they were sent.

Example Scenario and Code

Using the BooksOnLine application, a customer can request one of the following:

Priority can be specified at the Message Producer level using the setPriority call, or during the send or publish call. The latter overrides the former.

The Order Entry application uses a FIFO queue to store new orders. New orders are processed by the order entry application and published to the booked orders topic. The order entry application will retrieve messages from the new orders queue in the order of their enqueue time. It uses a FIFO-priority topic to store booked orders. Booked orders are propagated to the regional booked orders topics. At each region, orders in these regional booked orders topics are processed in the order of the shipping priorities. The following calls create the FIFO-priority topic for the Order Entry application to store booked orders.

public static void createPriorityTopic(TopicSession jms_session)
{
    AQQueueTableProperty       qt_prop;
    AQQueueTable               pr_qtable;   
    AQjmsDestinationProperty   dest_prop;
    Topic                      bookedorders_topic;   

    try
    {

 /* Create a priority queue table for OE */
   qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_OBJECT_MESSAGE");
   qt_prop.setComment("Order Entry Priority " + 
             "MultiConsumer Orders queue table");
   qt_prop.setCompatible("8.1");
   qt_prop.setMultiConsumer(true);

 /* Set a FIFO-priority order */
   qt_prop.setSortOrder("priority, enq_time");

   pr_qtable = ((AQjmsSession)jms_session).createQueueTable("OE",
                     "OE_orders_pr_mqtab", qt_prop);
     
 /* Create a Queue in this queue table */
   dest_prop = new AQjmsDestinationProperty();
   
   bookedorders_topic =((AQjmsSession)jms_session).createTopic(pr_qtable, 
              "OE_bookedorders_topic", dest_prop);
    /* Enable enqueue and dequeue on the topic */
   ((AQjmsDestination)bookedorders_topic).start(jms_session, true, true);


  
} catch (Exception ex) { System.out.println("Exception: " + ex); } } /* When an order arrives, the order entry application can use the following procedure to publish the order into its booked orders topic. A shipping priority is specified for each order: */ public static void order_enqueue(TopicSession jms_session, String book_title, int book_qty, int order_num, int cust_no, String cust_name, int ship_priority, String cust_state, String cust_country, String cust_order_type) { BolOrder order; BolCustomer cust_data; BolBook book_data; BolOrderItem[] item_list; Topic topic; ObjectMessage obj_message; TopicPublisher tpub; try { book_data = new BolBook(book_title); cust_data = new BolCustomer(cust_no, cust_name); order = new BolOrder(order_num, cust_data); item_list = new BolOrderItem[1]; item_list[0] = new BolOrderItem(book_data, book_qty); order.setItemList(item_list); /* get a handle to the OE bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); /* Create the topic publisher */ tpub = jms_session.createPublisher(topic); obj_message = jms_session.createObjectMessage(); obj_message.setObject(order); /* Send message - specify priority */ tpub.publish(topic, obj_message, DeliveryMode.PERSISTENT, ship_priority,0); jms_session.commit(); } catch (Exception ex) { System.out.println("Exception ex: " + ex); } }

Time Specification - Delay

Messages can be sent/published to a queue/topic with Delay. The delay represents a time interval after which the message becomes available to the Message Consumer. A message specified with a delay is in a waiting state until the delay expires and the message becomes available. Delay for a message is specified as message property (JMS_OracleDelay). This property is not specified in the JMS standard. It is an AQ extension to JMS message properties.

Delay processing requires the AQ background process, the queue monitor to be started. Note also that receiving by msgid overrides the delay specification.

Example Scenario and Code

In the BooksOnLine application, delay can be used to implement deferred billing. The billing application defines a queue in which shipped orders that are not billed immediately are placed with a delay. For example, a certain class of customer accounts, such as corporate customers, may not be billed for 15 days. The billing application dequeues incoming shipped order messages (from the shipped orders queue) and if the order is for a corporate customer, this order is enqueued into a deferred billing queue with a delay. Delay works similarly for publish, though a scenario has not been provided.

public static void defer_billing(QueueSession jms_session, 
             BolOrder deferred_order)
{
    Queue             def_bill_q;   
    ObjectMessage     obj_message;
    QueueSender       qsender;

    try
    {
   /* get a handle to the deferred billing queue */
   def_bill_q = ((AQjmsSession)jms_session).getQueue("CBADM", 
                   "deferbilling_que");

   /* Create the QueueSender */
   qsender = jms_session.createSender(def_bill_q);

   obj_message = jms_session.createObjectMessage();
   obj_message.setObject(deferred_order);

   /* Set Delay as 15 days 
    * Delay is specified in seconds
         */
   obj_message.setIntProperty("JMS_OracleDelay", 15*60*60*24);

   qsender.send(obj_message);

   jms_session.commit();

    }
    catch (Exception ex)
    {
   System.out.println("Exception " + ex);
    }
    
}

Time Specification - Expiration

Producers of messages can specify expiration limits, or Time-to-Live (coded as TimeToLive) for messages. This defines the period of time the message is available for a Message Consumer.

Time-to-Live can be specified at send/publish time or using the set TimeToLive method of a Message Producer, with the former overriding the latter. Note that the AQ background process, the queue monitor must be running to implement Time-to-Live.

Example Scenario

In the BooksOnLine application, TimeToLive can be used to control the amount of time that is allowed to process a back order. The shipping application places orders for books that are not available on a back order topic. If the shipping policy is that all back orders must be shipped within a week, then messages can be published into the back order topic with an expiration of one week. In this case, any back orders that are not processed within one week are moved to the exception topic with the message state set to EXPIRED. This can be used to flag any orders that have not been shipped according to the back order shipping policy.

Example Code

/* Re-enqueue a back order into a back_order Topic and set a timeToLive of 
   7 days; 
   All back orders must be processed in 7 days or they are moved to the 
   exception queue */ 
public static void requeue_back_order(TopicSession jms_session, 
                  String sale_region, BolOrder back_order)
{
    Topic             back_order_topic;   
    ObjectMessage     obj_message;
    TopicPublisher    tpub;
    long              timetolive;

    try
    {
   /* Look up a back order topic based on the region */
   if(sale_region.equals("WEST"))
   {
       back_order_topic = ((AQjmsSession)jms_session).getTopic("WS", 
                     "WS_backorders_topic");
   }
   else if(sale_region.equals("EAST"))
   {
       back_order_topic = ((AQjmsSession)jms_session).getTopic("ES", 
                     "ES_backorders_topic");
   }
   else
   {
       back_order_topic = ((AQjmsSession)jms_session).getTopic("OS", 
                     "OS_backorders_topic");
   }

   obj_message = jms_session.createObjectMessage();
   obj_message.setObject(back_order);

   tpub = jms_session.createPublisher(null);


   /* Set message expiration to 7 days: */ 
   timetolive = 7*60*60*24*1000;          // specified in  milliseconds 


   /* Publish the message */
   tpub.publish(back_order_topic, obj_message, DeliveryMode.PERSISTENT,
           1, timetolive);


   jms_session.commit();
    }
    catch (Exception ex)
    {
   System.out.println("Exception :" + ex); 
    }
}

Message Grouping

Messages belonging to a queue/topic can be grouped to form a set that can only be consumed by one consumer at a time. This requires the queue/topic be created in a queue table that is enabled for transactional message grouping (see "Creating a Queue Table", Chapter 9, "Administrative Interface"). All messages belonging to a group have to be created in the same transaction and all messages created in one transaction belong to the same group. Using this feature, you can segment a complex message into simple messages. This is an AQ extension and not part of the JMS specification.

For example, messages directed to a queue containing invoices could be constructed as a group of messages starting with the header message, followed by messages representing details, followed by the trailer message. Message grouping is also very useful if the message payload contains complex large objects such as images and video that can be segmented into smaller objects.

The general message properties (priority, delay, expiration) for the messages in a group are determined solely by the message properties specified for the first message (head) of the group irrespective of which properties are specified for subsequent messages in the group.

The message grouping property is preserved across propagation. However, it is important to note that the destination topic to which messages have to be propagated must also be enabled for transactional grouping. There are also some restrictions you need to keep in mind if the message grouping property is to be preserved while dequeuing messages from a queue enabled for transactional grouping (see "Dequeue Methods" and "Modes of Dequeuing" for additional information).

Example Scenario

In the BooksOnLine application, message grouping can be used to handle new orders. Each order contains a number of books ordered one by one in succession. Items ordered over the Web exhibit similar behavior.

In the example that follows, each send corresponds to an individual book that is part of an order, and the group/transaction represents a complete order. Only the first message contains customer information. Note that the OE_neworders_que is defined in the queue table OE_orders_sqtab which has been enabled for transactional grouping.

Example Code

public static void createMsgGroupQueueTable(QueueSession jms_session)
{
    AQQueueTableProperty       sqt_prop;
    AQQueueTable               sq_table;   
    AQjmsDestinationProperty   dest_prop;
    Queue                      neworders_q;   

    try
    {
   /* Create a single-consumer orders queue table 
         * with message grouping = TRANSACTIONAL 
         */
   sqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
   sqt_prop.setComment("Order Entry Single-Consumer Orders queue table");
   sqt_prop.setCompatible("8.1");
   sqt_prop.setMessageGrouping(AQQueueTableProperty.TRANSACTIONAL);

   sq_table = ((AQjmsSession)jms_session).createQueueTable("OE", 
                  "OE_orders_sqtab", sqt_prop);

   /* Create new orders queue for OE */
   dest_prop = new AQjmsDestinationProperty();
   neworders_q = ((AQjmsSession)jms_session).createQueue(sq_table, 
                            "OE_neworders_que", 
                        dest_prop);   
     
    }
    catch (Exception ex)
    {
   System.out.println("Exception: " + ex); 
    }
}

/* This method send an order to the specified queue */
public static void enqueue_order(QueueSession jms_session, Queue queue,
                  int order_num, String cust_name, int cust_id, 
             int book_qty, String book_title)
{
   QueueSender       sender;
   ObjectMessage     obj_message;   
   BolOrder          order;
   BolCustomer       cust_data=null;
   BolBook           book_data;
   BolOrderItem[]    item_list;

   try
   {
     book_data = new BolBook(book_title);
     
     if(cust_name != null)
     {    
       cust_data = new BolCustomer(cust_id, cust_name);
     }

     order = new BolOrder(order_num, cust_data);
     
     item_list = new BolOrderItem[1];
     item_list[0] = new BolOrderItem(book_data, book_qty);
     
     order.setItemList(item_list);
     
     
     sender = jms_session.createSender(queue);
     
     obj_message = jms_session.createObjectMessage();
     
     obj_message.setObject(order);
     
     sender.send(obj_message);
   }
   catch (Exception ex)
   {
     System.out.println("Exception ex: " + ex); 
   }
}   

/* Enqueue groups of orders */
public static void enqueue_order_groups(QueueSession jms_session)
{
  Queue neworders_q;

  try
  {
    neworders_q = ((AQjmsSession)jms_session).getQueue("OE",
                         "OE_neworders_que"); 

    /* Enqueue first group */   
    enqueue_order(jms_session, neworders_q, 1, "John", 1000, 2,
        "John's first book");
    
    enqueue_order(jms_session, neworders_q, 1, null, 0, 1, 
        "John's second book");
    
    jms_session.commit();
    
    /* Enqueue second group */   
    enqueue_order(jms_session, neworders_q, 2, "Mary", 1001, 1,
        "Mary's first book");
    
    enqueue_order(jms_session, neworders_q, 2, null, 0, 1, 
        "Mary's second book");
    
    enqueue_order(jms_session, neworders_q, 2, null, 0, 1, 
        "Mary's third book");
    
    jms_session.commit();
    
    /* Enqueue third group */   
    enqueue_order(jms_session, neworders_q, 3, "Scott", 1002, 1,
        "Scott's first book");
    
    enqueue_order(jms_session, neworders_q, 3, null, 0, 2, 
        "Scott's second book");
    
    enqueue_order(jms_session, neworders_q, 3, null, 0, 2,
        "Scott's third book");
        
    jms_session.commit();
  }
  catch (Exception ex)
  {
    System.out.println("Exception ex: " + ex);
  }

}

JMS Message Consumer Features

Receiving Messages

A JMS application can receive messages by creating a message consumer. Messages can be received synchronously using the receive call or an synchronously using a Message Listener.

There are three modes of receive,

Example Code: Block Until a Message Arrives

public BolOrder get_new_order1(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;
        
      try
      {
      /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* wait for a message to show up in the queue */
       obj_message = (ObjectMessage)qrec.receive();
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
      
       System.out.println("Order:  for customer " + 
                           customer.getName()); 
  
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Example: Block for a Maximum of 60 Seconds

public BolOrder get_new_order2(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;
        
      try
      {
            /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* wait for 60 seconds for a message to show up in the queue */
       obj_message = (ObjectMessage)qrec.receive(60000);
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Order:  for customer " + 
                           customer.getName()); 
  
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Example Code: Nonblocking

public BolOrder poll_new_order3(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;
        
      try
      {
            /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* check for a message to show in the queue */
       obj_message = (ObjectMessage)qrec.receiveNoWait();
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Order:  for customer " + 
                           customer.getName()); 
  
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Message Navigation in Receive

When a consumer does the first receive in its session, its gets the first message in the queue or topic. Subsequent receives get the next message, and so on. The default behavior works well for FIFO queues and topics, but not for priority ordered queues. If a high priority message arrives for the consumer, this client program will not receive the message until it has cleared the messages that were already there for it.

To provide the consumer a better control in navigating the queue for its messages, the AQ navigation modes are made available to it as JMS extensions. These modes can be set at the TopicSubscriber, QueueReceiver or the TopicReceiver.

For transaction grouping

Note that the transaction grouping property may be negated if messages are received in the following ways:

If in navigating through the queue, the program reaches the end of the queue while using the NEXT_MESSAGE or NEXT_TRANSACTION option, and you have specified a blocking receive, then the navigating position is automatically changed to the beginning of the queue.

By default, a QueueReceiver, Topic Receiver, or TopicSubscriber uses FIRST_MESSAGE for the first receive call, and NEXT_MESSAGE for the subsequent receive calls.

Example Scenario

The get_new_orders() procedure retrieves orders from the OE_neworders_que. Each transaction refers to an order, and each message corresponds to an individual book in that order. The get_orders() procedure loops through the messages to retrieve the book orders. It resets the position to the beginning of the queue using the FIRST_MESSAGE option before the first receive. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an exception indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and get the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.

Example Code

public void get_new_orders(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order;
      String           state;
      int              new_orders = 1;  
             
      try
      {

         /* get a handle to the new_orders queue */
         queue = ((AQjmsSession) jms_session).getQueue("OE","OE_neworders_que");
         qrec = jms_session.createReceiver(queue);
        
    /* set navigation to first message */
        
((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_
MESSAGE);

        while(new_orders != 0)
        {
          try{

             /* wait for a message to show up in the topic */
             obj_message = (ObjectMessage)qrec.receiveNoWait();
       
             if (obj_message != null)   /* no more orders in the queue */
             { 
               System.out.println(" No more orders ");
               new_orders = 0;
             }
             new_order = (BolOrder)obj_message.getObject();      
             customer = new_order.getCustomer();
             state    = customer.getState();
       
             System.out.println("Order: for customer " + 
                                customer.getName()); 

            /* Now get the next message */
            
((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_
MESSAGE);
 
          }catch(AQjmsException ex)
          {  if (ex.getErrorNumber() == 25235) 
             {
               System.out.println("End of transaction group");
              
((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_
TRANSACTION);
             }
             else
               throw ex;
          }
        }                   
     }catch (JMSException ex)
     {
        System.out.println("Exception: " + ex);
      }
   }

Modes for Receiving Messages

For Point-to-Point Mode

Aside from the normal receive, which allows the dequeuing client to delete the message from the queue, JMS provides an interface that allows the JMS client to Browse its messages in the queue. A QueueBrowser can be created using the createBrowser method from QueueSession.

If a message is browsed, it remains available for further processing. Note that after a message has been browsed there is no guarantee that the message will be available to the JMS session again as a receive call from a concurrent session might remove the message.

To prevent a viewed message from being removed by a concurrent JMS client, you can view the message in the locked mode. To do this, you need to create a QueueBrowser with the locked mode using the AQ extension to the JMS interface.The lock on the message with a browser with locked mode is released when the session performs a commit or a rollback.

To remove the message viewed by a QueueBrowser, the session must create a QueueReceiver and use the JMSmesssageID as the selector.

Example Code

Refer to the QueueBrowser Example in Point-to-Point features

Remove-No-Data

The MessageConsumer can remove the message from the queue or topic without retrieving the message using the receiveNoData call. This is useful when the application has already examined the message, perhaps using the QueueBrowser. This mode allows the JMS client to avoid the overhead of retrieving the payload from the database, which can be substantial for a large message.

Example Scenario and Code

In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) using the QueueBrowser and the customer country (message payload) is checked. If the customer country is Mexico or Canada the message be deleted from the queue using the remove with no data (since the payload is already known) mode. Alternatively, the lock on the message is released by the commit call. Note that the receive call uses the message identifier obtained from the locked mode browse.

 public void process_international_orders(QueueSession jms_session)
  {
   QueueBrowser    browser;
   Queue           queue;
   ObjectMessage   obj_message;
   BolOrder        new_order;
   Enumeration     messages;
   String          customer_name;
   String          customer_country;
   QueueReceiver   qrec;
   String          msg_sel;

   try
   {
 /* get a handle to the new_orders queue */
     queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

 /* create a Browser to look at RUSH orders */
     browser = ((AQjmsSession)jms_session).createBrowser(queue, null, true);

     for (messages = browser.getEnumeration() ; messages.hasMoreElements() ;)
     {
       obj_message = (ObjectMessage)messages.nextElement();
       
       new_order = (BolOrder)obj_message.getObject();

       customer_name = new_order.getCustomer().getName();

       customer_country = new_order.getCustomer().getCountry();
       
       if (customer_country equals ("Canada") || customer_country equals ( 
"Mexico"))
       {
         System.out.println("Order for Canada or Mexico");
         msg_sel = "JMSMessageID = '" + obj_message. getJMSMessageID()+ "'";
         qrec = jms_session.createReceiver(queue, msg_sel);
         ((AQjmsQueueReceiver)qrec).receiveNoData();
       }
     }
   }catch (JMSException ex)
    { System.out.println("Exception " + ex);

    }
  }

Retry With Delay Interval

Max Retries

If the transaction receiving the message from a queue/topic fails, it is regarded as an unsuccessful attempt to remove the message. AQ records the number of failed attempts to remove the message in the message history.

In addition, it also allows the application to specify at the queue/topic level, the maximum number of retries supported on messages. If the number of failed attempts to remove a message exceed this number, the message is moved to the exception queue and is no longer available to applications.

Retry Delay

If the transaction receiving a message aborted, this could be because of a 'bad' condition, for example, an order that could not be fulfilled because there were insufficient books in stock. Since inventory updates are made every 12 hours, it makes sense to retry after that time. If an order was not filled after 4 attempts, this could indicates there is a problem.

AQ allows users to specify a retry_delay along with max_retries. This means that a message that has undergone a failed attempt at retrieving will remain visible in the queue for dequeue after 'retry_delay' interval. Until then it will be in the 'WAITING' state. The AQ background process, the time manager enforces the retry delay property.

The maximum retries and retry delay are properties of the queue/topic which can be set when the queue/topic is created or using the alter method on the queue/topic. The default value for MAX_RETRIES is 5.

Example Scenario and Code

If an order cannot be filled because of insufficient inventory, the transaction processing the order is aborted. The booked_orders topic is set up with max_retries = 4 and retry_delay = 12 hours.Thus, if an order is not filled up in two days, it is moved to an exception queue.

public BolOrder process_booked_order(TopicSession jms_session)
  {
    Topic            topic;
    TopicSubscriber  tsubs;
    ObjectMessage    obj_message;
    BolCustomer      customer;
    BolOrder         booked_order = null;
    String           country;
    int              i = 0;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Create local subscriber - to track messages for Western Region  */
      tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
                                       "Region = 'Western' ",
                                                   false);

       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)tsubs.receive(10);

       booked_order = (BolOrder)obj_message.getObject();

       customer = booked_order.getCustomer();
       country    = customer.getCountry();

       if (country == "US")
       {
          jms_session.commit();
       }
       else
       {
          jms_session.rollback();
          booked_order = null;
       }
    }catch (JMSException ex)
    { System.out.println("Exception " + ex) ;}
     
     return booked_order;
   }

Asynchronously Receiving Message Using Message Listener

Message Listener for a Message Consumer

The JMS client can receive messages asynchronously by setting the MessageListener using the setMessageListener method available with the Consumer.

When a message arrives for the message consumer, the onMessage method of the message listener is invoked with the message. The message listener can commit or abort the receipt of the message. The message listener will not receive messages if the JMS Connection has been stopped. The receive call must not be used to receive messages once the message listener has been set for the consumer.

Example

The application processing the new orders queue can be set up for asynchronously receiving messages from the queue.

 public class OrderListener implements MessageListener
   {
      QueueSession   the_sess;
    
      /* constructor */
      OrderListener(QueueSession  my_sess)
      {
        the_sess = my_sess;
      }
      
      /* message listener interface */
      public void onMessage(Message m)
      {
        ObjectMessage    obj_msg;
        BolCustomer      customer;
        BolOrder         new_order = null;

        try {
          /* cast to JMS Object Message */
          obj_msg = (ObjectMessage)m;

          /*  Print some useful information */
          new_order = (BolOrder)obj_msg.getObject();
          customer = new_order.getCustomer();
          System.out.println("Order:  for customer " +  customer.getName()); 

          /* call the process order method 
          * NOTE: we are assuming it is defined elsewhere
          * /
          process_order(new_order);
      
          /* commit the asynchronous receipt of the message */
           the_sess.commit();
        }catch (JMSException ex)
        { System.out.println("Exception " + ex) ;}
     
      }
   }

     public void setListener1(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      MessageListener  ourListener;
        
      try
      {
       /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       /* create a queue receiver */
       qrec = jms_session.createReceiver(queue);

       /* create the message listener */
       ourListener = new OrderListener(jms_session);
 
       /* set the message listener for the receiver */
       qrec.setMessageListener(ourListener);
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
   }

Message Listener for All Consumers on a Session

The JMS client can receive messages asynchronously for all the consumers of the session by setting the MessageListener at the session.

When a message arrives for any of the message consumers of the session, the onMessage method of the message listener is invoked with the message. The message listener can commit or abort the receipt of the message. The message listener will not receive messages if the JMS connection has been stopped. No other mode for receiving messages must be used in the session once the message listener has been set.

Example Scenario and Code

In the customer service component of the BooksOnLine example, messages from different databases arrive at the customer service topics, indicating the state of the order. The customer service application monitors the topics and whenever there is a message about a customer order, it updates the order status in the order_status_table. The application uses the session listener to monitor the different topics. Whenever there is a message in any of the topics, the onMessage method of the session MessageListener is invoked.

/* define our message listener class */
   public class CustomerListener implements MessageListener
   {
      TopicSession   the_sess;
    
 /* constructor */
      CustomerListener(TopicSession  my_sess)
      {
        the_sess = my_sess;
      }
      
      /* message listener interface */
      public void onMessage(Message m)
      {
        ObjectMessage    obj_msg;
        BolCustomer      customer;
        BolOrder         new_order = null;

        try
   {
          /* cast to JMS Object Message */
          obj_msg = (ObjectMessage)m;

     /*  Print some useful information */
          new_order = (BolOrder)obj_msg.getObject();
          customer = new_order.getCustomer();
          System.out.println("Order:  for customer " +  customer.getName()); 

     /* call the update status method 
      * NOTE: we are assuming it is defined elsewhere
      * /
          update_status(new_order, new_order.getStatus());
      
     /* commit the asynchronous receipt of the message */
          the_sess.commit();
       }catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
   }

 }
 public void  monitor_status_topics(TopicSession jms_session) 
  {
    Topic[]             topic = new Topic[4];
    TopicSubscriber[]   tsubs= new TopicSubscriber[4];
   
    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic[0] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_bookedorders_topic");
      tsubs[0] = jms_session.createDurableSubscriber(topic[0], "BOOKED_ORDER");

      topic[1] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_billedorders_topic");
      tsubs[1] = jms_session.createDurableSubscriber(topic[1], "BILLED_ORDER");

      topic[2] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_backdorders_topic");
      tsubs[2] = jms_session.createDurableSubscriber(topic[2], "BACKED_ORDER");

      topic[3] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_shippedorders_topic");
      tsubs[3] = jms_session.createDurableSubscriber(topic[3], "SHIPPED_ORDER");

      MessageListener  mL = new CustomerListener(jms_session);
      
 /* set the session's message listener */
      jms_session.setMessageListener(mL);

    }catch(JMSException ex)
     { System.out.println("Exception: " + ex); }
  }

AQ Exception Handling

AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES, EXPIRATION, MAX_RETRIES and RETRY_DELAY.

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. However, an application that intends to handle these expired or unserviceable messages can receive/remove them from the exception queue.

To retrieve messages from exception queues, the JMS client must use the point-to-point interface.The exception queue for messages intended for a topic must be created in a queue table with multiple consumers enabled. Like any other queue, the exception queue must be enabled for receiving messages using the start method in the AQOracleQueue class. You will get an exception if you try to enable it for enqueue.

The exception queue is a provider (Oracle) specific message property called "JMS_OracleExcpQ" that can be set with the message before sending/publishing it. If an exception queue is not specified, the default exception queue is used. If the queue/topic is created in a queue table, say QTAB, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created.

Messages are moved to the exception queues by AQ under the following conditions:

Example Scenarios

The section retry with delay interval has an example with MAX_RETRIES. In the BooksOnLine application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 7 days, it is placed in an exception queue for special processing. We implement this using the Time-to-Live property of messages in conjunction with exception queues.

  1. Create the exception queue WS_back_order_exp_que
    public void create_excp_que(TopicSession jms_session)
        {
          AQQueueTable     q_table;
          Queue            excpq;
    
          try {
        /* create the exception queue in the queue table with multiple 
         * consumer flag true
         */
              q_table = ((AQjmsSession)jms_session).getQueueTable("WS", "WS_orders_
    mqtab");
     
              AQjmsDestinationProperty dest_prop = new AQjmsDestinationProperty();
    
              dest_prop.setQueueType(AQjmsDestinationProperty.EXCEPTION_QUEUE);
         excpq = ((AQjmsSession)jms_session).createQueue(q_table, 
                                "WS_back_orders_excp_que", 
                            dest_prop);   
         /* start the exception queue for receiving (dequeuing) messages only 
    */
              ((AQjmsDestination)excpq).start(jms_session, false, true);
    
              }
          catch (JMSException ex)
          { System.out.println("Exception  " + ex); }
        }   
    
    
  2. Publish message on back orders queue with exception queue set to WS_back_orders_excp_que
    
     public static void requeue_back_order(TopicSession jms_session, 
                      String sale_region, BolOrder back_order)
     {
        Topic             back_order_topic;   
        ObjectMessage     obj_message;
        TopicPublisher    tpub;
        long              timetolive;
    
        try
        {
       back_order_topic = ((AQjmsSession)jms_session).getTopic("WS", 
                         "WS_backorders_topic");
       obj_message = jms_session.createObjectMessage();
       obj_message.setObject(back_order);
             
       /* set exception queue */
            obj_message.setStringProperty("JMS_OracleExcpQ", "WS.WS_back_orders_
    excp_que");
    
       tpub = jms_session.createPublisher(null);
    
     /* Set message expiration to 7 days: */ 
       timetolive = 7*60*60*24*1000;          // specified in  milliseconds 
    
     /* Publish the message */
       tpub.publish(back_order_topic, obj_message, DeliveryMode.PERSISTENT,
               1, timetolive);
       jms_session.commit();
        }
        catch (Exception ex)
        {
       System.out.println("Exception :" + ex); 
        }
    }
    
    
  3. Receive expired messages from the exception queue using the point-to-point interface
    public BolOrder get_expired_order(QueueSession jms_session)
       {
          Queue            queue;
          QueueReceiver    qrec;
          ObjectMessage    obj_message;
          BolCustomer      customer;
          BolOrder         exp_order = null;
            
          try
          {
          /* get a handle to the exception queue */
           queue = ((AQjmsSession) jms_session).getQueue("WS", "WS_back_orders_excp_
    que");
    
           qrec = jms_session.createReceiver(queue);
    
           /* wait for a message to show up in the queue */
           obj_message = (ObjectMessage)qrec.receive();
           
           exp_order = (BolOrder)obj_message.getObject();
           
           customer = exp_order.getCustomer();
          
           System.out.println("Expired Order:  for customer " + 
                               customer.getName()); 
      
          }
          catch (JMSException ex)
          {
            System.out.println("Exception: " + ex);
          }
          return exp_order;
       }
    

JMS Propagation

Remote Subscribers

This feature enables applications to communicate with each other without having to be connected to the same database.

AQ allows a remote subscriber, that is a subscriber at another database, to subscribe to a topic. When a message published to the topic meets the criterion of the remote subscriber, AQ will automatically propagate the message to the queue/topic at the remote database specified for the remote subscriber.

The snapshot (job_queue) background process performs propagation. Propagation is performed using database links and Oracle Net Services.

There are two ways to implement remote subscribers:

There are two kinds of remote subscribers:

Case 1

The remote subscriber is a topic. This occurs when no name is specified for the remote subscriber in the AQjmsAgent object and the address is a topic. The message satisfying the subscriber's subscription is propagated to the remote topic. The propagated message is now available to all the subscriptions of the remote topic that it satisfies.

Case 2

Specify a specific remote recipient for the message. The remote subscription can be for a particular consumer at the remote database. If the name of the remote recipient is specified (in the AQjmsAgent object), then the message satisfying the subscription is propagated to the remote database for that recipient only. The recipient at the remote database uses the TopicReceiver interface to retrieve its messages. The remote subscription can also be for a point-to-point queue

Example Scenario for Case 1

Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a dblink dblink_oe_ws from database db1, the order entry database, to the western shipping database db2. The WS_bookedorders_topic at db2 is a remote subscriber to the OE_bookedorders_topic in db1.

Example Scenario for Case 2

Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a dblink dblink_oe_ws from the local order entry database db1 to the western shipping database db2. The agent "Priority" at WS_bookedorders_topic in db2 is a remote subscriber to the OE_bookedorders_topic in db1. Messages propagated to the WS_bookedorders_topic are for "Priority" only.

public void remote_subscriber(TopicSession jms_session)
   {
     Topic            topic;
     ObjectMessage    obj_message;
     AQjmsAgent       remote_sub;

    try
    {
 /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("OE",
                                                   "OE_bookedorders_topic");
 /* create the remote subscriber, name unspecified  and address
  * the topic WS_booked_orders_topic at db2 
  */
      remote_sub = new AQjmsAgent(null, "WS.WS_bookedorders_topic@dblink_oe_
ws");

 /* subscribe for western region orders */
      ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, 
"Region = 'Western' ");
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
    catch (java.sql.SQLException  ex1)
    {System.out.println("SQL Exception :" + ex1); }
  }
   

Database db2 - shipping database: The WS_booked_orders_topic has two subscribers, one for priority shipping and the other normal. The messages from the Order Entry database are propagated to the Shipping database and delivered to the correct subscriber. Priority orders have a message priority of 1.

public void  get_priority_messages(TopicSession jms_session)
   {
     Topic            topic;
     TopicSubscriber  tsubs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

       /* Create local subscriber - for priority messages */
      tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY",
                                       " JMSPriority = 1 ", false);

      obj_message = (ObjectMessage) tsubs.receive();
      
      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Priority Order:  for customer " +  
customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }

  public void  get_normal_messages(TopicSession jms_session)
   {
     Topic            topic;
     TopicSubscriber  tsubs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

       /* Create local subscriber - for priority messages */
      tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY",
                                       " JMSPriority > 1 ", false);

      obj_message = (ObjectMessage) tsubs.receive();
      
      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Normal Order:  for customer " +  customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }
 

public void remote_subscriber1(TopicSession jms_session)
   {
     Topic            topic;
     ObjectMessage    obj_message;
     AQjmsAgent       remote_sub;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("OE",
                                                   "OE_bookedorders_topic");
      /* create the remote subscriber, name "Priority"  and address
       * the topic WS_booked_orders_topic at db2 
       */
      remote_sub = new AQjmsAgent("Priority", "WS.WS_bookedorders_topic@dblink_
oe_ws");

      /* subscribe for western region orders */
      ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, 
"Region = 'Western' ");
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
    catch (java.sql.SQLException  ex1)
    {System.out.println("SQL Exception :" + ex1); }
  }


   Remote database:
   database db2 - Western Shipping database.
/* get messages for subscriber priority */
   public void  get_priority_messages1(TopicSession jms_session)
   {
     Topic            topic;
     TopicReceiver    trecs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* create a local receiver "Priority" for the remote subscription
       * to WS_bookedorders_topic 
       */
      trecs = ((AQjmsSession)jms_session).createTopicReceiver(topic, "Priority", 
null);

      obj_message = (ObjectMessage) trecs.receive();
      
      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Priority Order:  for customer " +  
customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }

Scheduling Propagation

Propagation must be scheduled using the schedule_propagation method for every topic from which messages are propagated to target destination databases.

A schedule indicates the time frame during which messages can be propagated from the source topic. This time frame may depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore has to be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue facility to handle propagation.

The administrative calls for propagation scheduling provide great flexibility for managing the schedules (see "Scheduling a Queue Propagation", Chapter 9, "Administrative Interface"). The duration or propagation window parameter of a schedule specifies the time frame during which propagation has to take place. If the duration is unspecified then the time frame is an infinite single window. If a window has to be repeated periodically then a finite duration is specified along with a next_time function that defines the periodic interval between successive windows.

The latency parameter for a schedule is relevant only when a queue does not have any messages to be propagated. This parameter specifies the time interval within which a queue has to be rechecked for messages. Note that if the latency parameter is to be enforced, then the job_queue_interval parameter for the job_queue_processes should be less than or equal to the latency parameter. The propagation schedules defined for a queue can be changed or dropped at anytime during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active then it will take a few seconds for the calls to be executed.

Job queue processes must be started for propagation to take place. At least 2 job queue processes must be started. The dblinks to the destination database must also be valid. The source and destination topics of the propagation must be of the same message type. The remote topic must be enabled for enqueue. The user of the dblink must also have enqueue privileges to the remote topic.

Example Code

 public void  schedule_propagation(TopicSession jms_session)
  {
     Topic            topic;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Schedule propagation immediately with duration of 5 minutes and latency 
20 sec */
      ((AQjmsDestination)topic).schedulePropagation(jms_session, "dba", null,
                                       new Double(5*60), null, new Double(20));
    }catch (JMSException ex)
    {System.out.println("Exception: " + ex);}
  }

  Propagation schedule parameters can also be altered.


  /* alter duration to 10 minutes and latency to zero */
  public void  alter_propagation(TopicSession jms_session)
  {
     Topic            topic;
    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Schedule propagation immediately with duration of 5 minutes and latency 
20 sec */
    ((AQjmsDestination)topic).alterPropagationSchedule(jms_session, "dba",
                        new Double(10*60), null, new Double(0));
    }catch (JMSException ex)
    {System.out.println("Exception: " + ex);}
  }

Enhanced Propagation Scheduling Capabilities

Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules--such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle instance handling a schedule (relevant if Real Application Clusters are being used)--can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.

For each schedule, detailed propagation statistics are maintained:

These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.

Propagation has built-in support for handling failures and reporting errors. For example, if the database link specified is invalid, or the remote database is unavailable, or the remote topic/queue is not enabled for enqueuing, then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure. If a schedule continuously encounters failures, the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window then the next retry is attempted at the start time of the next window.

A maximum of 16 retry attempts are made after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log. At anytime it is possible to check if there were failures encountered by a schedule and if so how many successive failure were encountered, the error message indicating the cause for the failure and the time at which the last failure was encountered. By examining this information, an administrator can fix the failure and enable the schedule.

During a retry if propagation is successful then the number of failures is reset to 0. Propagation has built-in support for Real Application Clusters and is transparent to the user and the administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table where the source topic resides. If at anytime there is a failure at an instance and the queue table that stores the topic is migrated to a different instance, the propagation job is also automatically migrated to the new instance. This will minimize the pinging between instances and thus offer better performance. Propagation has been designed to handle any number of concurrent schedules.

Note that the number of job_queue_processes is limited to a maximum of 36 and some of these may be used to handle non-propagation related jobs.Hence, propagation has built in support for multi-tasking and load balancing. The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue processes can be skewed based on the arrival rate of messages in the different source topics. If one process is overburdened with several active schedules while another is less loaded with many passive schedules, propagation automatically re-distributes the schedules among the processes such that they are loaded uniformly.

Example Scenario

In the BooksOnLine example, the OE_bookedorders_topic is busy since messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.

Example Code

CONNECT OE/OE; 
/*  get averages 
select avg_time, avg_number, avg_size from user_queue_schedules; 
 
/*  get totals 
select total_time, total_number, total_bytes from user_queue_schedules; 
 
/*  get maximums for a window 
select max_number, max_bytes from user_queue_schedules; 
 
/*  get current status information of schedule 
select process_name, session_id, instance, schedule_disabled  
   from user_queue_schedules; 
 
/*  get information about last and next execution 
select last_run_date, last_run_time, next_run_date, next_run_time 
   from user_queue_schedules; 
 
/*  get last error information if any 
select failures, last_error_msg, last_error_date, last_error_time  
   from user_queue_schedules; 

Exception Handling During Propagation

When a system errors such as a network failure occurs, AQ will continue to attempt to propagate messages using an exponential back-off algorithm. In some situations that indicate application errors AQ will mark messages as UNDELIVERABLE if there is an error in propagating the message.

Examples of such errors are when the remote queue/topic does not exist or when there is a type mismatch between the source queue/topic and the remote queue/topic.In such situations users must query the DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination.The trace files in the $ORACLE_HOME/log directory can provide additional information about the error.

Message Transformation with JMS AQ

The following topics are discussed in this section:

Defining Message Transformations

Transformations can be defined to map messages of one format to another. Transformations are useful when applications that use different formats to represent the same information have to be integrated. Transformations can be SQL expressions and PLSQL functions.

The transformations can be created using the DBMS_TRANSFORM.create_transformation procedure. Transformation can be specified for the following operations:

The Message Transformation feature is an AQ extension to the standard JMS interface.

Example Scenario

In the BooksOnLine example, we will consider the order entry and shipping applications. For these examples, we will use topics with ADT type payloads.

Example Code

Assume that the Order entry topic OE.OE_bookedorders_topic has a payload of type OE.OE_ORDER.

create or replace TYPE OE_order as OBJECT ( 
             orderno         NUMBER, 
             status          VARCHAR2(30), 
             ordertype       VARCHAR2(30), 
             orderregion     VARCHAR2(30), 
             customer        CUSTOMER_TYP, 
             paymentmethod   VARCHAR2(30), 
             creditcard#     VARCHAR2(30);
             items           ORDERITEMLIST_VARTYP, 
             order_date      DATE,
             total           NUMBER);              

The Western Shipping topic WS_bookedorders_topic has payload of type WS.WS_ORDER:

create or replace TYPE WS_Order AS OBJECT (
           customer_name   VARCHAR2(100), 
           address         VARCHAR2(1000),
           city            VARCHAR2(1000),
           state           VARCHAR2(1000),
         country         VARCHAR2(1000),
           zipcode         VARCHAR2(1000),
       orderno         NUMBER, 
       status          VARCHAR2(30), 
       ordertype       VARCHAR2(30), 
       items           ORDERITEMLIST_VARTYP, 
       order_date      VARCHAR2(10));

The java classes (that implement the CustomDatum interface) can be generated for these types using the Jpublisher utility.

We will define a transformation that defines the mapping from OE.OE_Order to WS.WS_ORDER as:

execute dbms_transform.create_transformation(
    schema => 'OE',
        name => 'OE2WS',
        from_schema => 'OE,
        from_type => 'OE_order',
        to_schema => 'WS',
        to_type => 'WS_order',
        transformation => 'OE_order(source.user_data.customer.name, \
            source.user_data.customer.street, \
            source.user_data.customer.city, \
            source.user_data.customer.state, \
                source.user_data.customer.country, \
                source.user_data.customer.zipcode, \
                source.user_data.customer.country, \
                source.user_data.orderno, \
                source.user_data.status, \
                source.user_data.ordertype, \
                source.user_date.items, \
            TO_CHAR(source.user_date.order_date, 'MM:DD:YYYY'))');

Sending Messages to a Destination Using a Transformation

A transformation can be supplied when sending/publishing a message to a queue/topic. The transformation will be applied before putting the message into the queue/topic.

The application can specify a transformation using the setTransformation interface in the AQjmsQueueSender and AQjmsTopicPublisher interfaces.

Example Code

Lets say that the orders that are processed by the order entry application should be published to the WS_bookedorders_topic.

The transformation OE2WS (defined in the previous section) is supplied so that the messages are inserted into the topic in the correct format.

public void ship_booked_orders(TopicSession    jms_session,
                                         AQjmsADTMessage adt_message)                    
{
       TopicPublisher  publisher;
       Topic           topic;

       try
       {
         /* get a handle to the WS_bookedorders_topic */
             topic = ((AQjmsSession)jms_session).getTopic("WS", 
                                                          "WS_bookedorders_topic");
             publisher = jms_session.createPublisher(topic);

         /* set the transformation in the publisher */
          ((AQjmsTopicPublisher)publisher).setTransformation("OE2WS");

             publisher.publish(topic, adt_message);

           }
           catch (JMSException ex)
           {
              System.out.println("Exception :" ex);
       }
}

Receiving Messages from a Destination Using a Transformation

A transformation can be applied when receiving a message from a queue or topic. The transformation will be applied to the message before returning it to JMS application.

The transformation can be specified using setTransformation( ) interface of the AQjmsQueueReceiver, AQjmsTopicSubscriber and AQjmsTopicReceiver.

Example Code

Lets say the Western Shipping application retrieves messages from the OE_bookedorders_topic. It specifies the transformation 'OE2WS' to retrieve the message as the WS_order ADT.

Lets say that the WSOrder Java class has been generated by Jpublisher to map to the Oracle Object WS.WS_order

public AQjmsAdtMessage retrieve_booked_orders(TopicSession jms_session)
       AQjmsTopicReceiver  receiver;
       Topic               topic;
       Message             msg = null;

       try
       {
         /* get a handle to the OE_bookedorders_topic */
             topic = ((AQjmsSession)jms_session).getTopic("OE", 
                   "OE_bookedorders_topic");

             /* Create a receiver for WShip */
             receiver = ((AQjmsSession)jms_session).createTopicReceiver(topic,    
                                      "WShip, null, WSOrder.getFactory());

         /* set the transformation in the publisher */
            receiver.setTransformation("OE2WS");

             msg = receiver.receive(10);
         }
           catch (JMSException ex)
           {
              System.out.println("Exception :" ex);
       }
     
       return (AQjmsAdtMessage)msg;
}

Specifying Transformations for Topic Subscribers

A transformation can also be specified when creating Topic Subscribers using the CreateDurableSubscriber call. The transformation is applied to the retrieved message before returning it to the subscriber. If the subscriber specified in the CreateDurableSubscriber already exists, it's transformation is set to the specified transformation.

Example Code

The Western Shipping application subscribes to the OE_bookedorders_topic with the transformation 'OE2WS'. This transformation is applied to the messages and the returned message is of Oracle Object type WS.WS_orders.

Lets say that the WSOrder java class has been generated by Jpublisher to map to the Oracle Object WS.WS_order:

public AQjmsAdtMessage retrieve_booked_orders(TopicSession jms_session)
{
       TopicSubscriber     subscriber;
       Topic               topic;
       AQjmsAdtMessage     msg = null;

       try
       {
         /* get a handle to the OE_bookedorders_topic */
             topic = ((AQjmsSession)jms_session).getTopic("OE", 
                                                 "OE_bookedorders_topic");

         /* create a subscriber with the transformation OE2WS */
                 subs = ((AQjmsSession)jms_session).createDurableSubscriber(topic,  
                 'WShip', null, false, WSOrder.getFactory(), "OE2WS");

             msg = subscriber.receive(10);
           }
         catch (JMSException ex)
           {
               System.out.println("Exception :" ex);
       }
     
       return (AQjmsAdtMessage)msg;
}

Specifying Transformations for Remote Subscribers

AQ allows a remote subscriber, that is a subscriber at another database, to subscribe to a topic.

Transformations can be specified when creating remote subscribers using the createRemoteSubscriber. This enables propagation of messages between Topics of different formats. When a message published at a topic meets the criterion of a remote subscriber, AQ will automatically propagate the message to the queue/topic at the remote database specified for the remote subscriber. If a transformation is also specified, AQ will apply the transformation to the message before propagating it to the queue/topic at the remote database.

Example Code

A remote subscriber is created at the OE.OE_bookedorders_topic so that messages are automatically propagated to the WS.WS_bookedorders_topic. The transformation OE2WS is specified when creating the remote subscriber so that the messages reaching the WS_bookedorders_topic have the correct format.

Lets say that the WSOrder java class has been generated by Jpublisher to map to the Oracle Object WS.WS_order

public void create_remote_sub(TopicSession jms_session)
{
     AQjmsAgent          subscriber;
     Topic               topic;

     try
     {
       /* get a handle to the OE_bookedorders_topic */
       topic = ((AQjmsSession)jms_session).getTopic("OE", 
                                                "OE_bookedorders_topic");
        
       subscriber = new AQjmsAgent("WShip", "WS.WS_bookedorders_topic");

       ((AQjmsSession )jms_session).createRemoteSubscriber(topic, 
                                subscriber, null, WSOrder.getFactory(),"OE2WS");
     }
     catch (JMSException ex)
     {
       System.out.println("Exception :" ex);
     }
}

Go to previous page Go to next page
Oracle
Copyright © 1996, 2002 Oracle Corporation.

All Rights Reserved.
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback