Chapter 33. Application Server Integration and Java EE

HornetQ can be easily installed in JBoss Application Server 5.1 or later. For details on installing HornetQ in the JBoss Application Server please refer to quick-start guide.

Since HornetQ also provides a JCA adaptor, it should also be possible to integrate HornetQ as a JMS provider in other JEE compliant app servers. For instructions on how to integrate a remote JCA adaptor into another application sever, please consult that application server's instructions.

A JCA Adapter basically controls the inflow of messages to Message Driven Beans and the outflow of messages sent from other JEE components, e.g. EJBs and Servlets.

This section explains the basics behind configuring the different JEE components in the AS.

33.1. Configuring Message Driven Beans

The delivery of messages to an MDB using HornetQ is configured on the JCA Adapter via a configuration file ra.xml which can be found under in the jms-ra.rar archive of directory. By default this is configured to consume messages using an InVM connector from the instance of HornetQ running within the application server. A full list of what is configurable is found later in this chapter.

All MDB's however need to have the destination type and the destination configured. The following example shows how this can be done via annotations.

@MessageDriven(name = "MDBExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
                     })
public class MDBExample implements MessageListener
{
   public void onMessage(Message message)...
}

In this example you can see that the MDB will consume messages from a queue that is mapped into JNDI with the binding queue/testQueue. This queue must be preconfigured in the usual way using the HornetQ configuration files.

33.1.1. Using Container Managed Transactions

When an MDB is using Container Managed Transactions (CMT), the delivery of the message is done within the scope of a JTA transaction. The commit or rollback of this transaction is controlled by the container itself. If the transaction is rolled back then the message delivery semantics will kick in (by default this is to try and redeliver the message up to 10 times before sending to a DLQ). Using annotations this would be configured as follows:

@MessageDriven(name = "MDB_CMP_TxRequiredExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
                     })
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
public class MDB_CMP_TxRequiredExample implements MessageListener
{
   public void onMessage(Message message)...
}

The TransactionManagement annotation tells the container to treat this MDB to use Container Managed Persistence. The TransactionAttribute annotation tells the container that a JTA transaction is required for this MDB. Note that the only other valid value for this is TransactionAttributeType.NOT_SUPPORTED which tells the container that this MDB does not support JTA transactions and one should not be created.

It is also possible to inform the container that it must rollback the transaction by calling setRollbackOnly on the MessageDrivenContext. The code for this would look something like:

   @Resource
   MessageDrivenContext ctx;

   public void onMessage(Message message)
   {
      try
      {
         //something here fails
      }
      catch (Exception e)
      {
         ctx.setRollbackOnly();
      }
   }

If you don't want the over head of an xa transaction being created every time but you would still like the message delivered within a transaction (i.e. you are only using a JMS resource) then you can configure the MDB to use a local transaction. This would be configured as such:

@MessageDriven(name = "MDB_CMP_TxLocalExample",
               activationConfig =
                     {
                           @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                           @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
                           @ActivationConfigProperty(propertyName = "useLocalTx", propertyValue = "true")
                     })
@TransactionManagement(value = TransactionManagementType.CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
public class MDB_CMP_TxLocalExample implements MessageListener
{
   public void onMessage(Message message)...
}

33.1.2. Using Bean Managed Transactions

Message driven beans can also be configured to use Bean Managed Transactions (BMT). In this case a User Transaction is created. This would be configured as follows:

@MessageDriven(name = "MDB_BMPExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
                        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge")
                     })
@TransactionManagement(value= TransactionManagementType.BEAN)
public class MDB_BMPExample implements MessageListener
{
   public void onMessage(Message message)
}

When using Bean Managed Transactions the message delivery to the MBD will occur outside the scope of the user transaction and use the acknowledge mode specified by the user with the acknowledgeMode property. There are only 2 acceptable values for this Auto-acknowledge and Dups-ok-acknowledge.Not that because the message delivery is outside the scope of the transaction a failure within the MDB will not cause the message to be redelivered.

A user would control the lifecycle of the transaction something like the following:

   @Resource
   MessageDrivenContext ctx;

   public void onMessage(Message message)
   {
      UserTransaction tx;
      try
      {
         TextMessage textMessage = (TextMessage)message;

         String text = textMessage.getText();

         UserTransaction tx = ctx.getUserTransaction();

         tx.begin();
         //do some stuff within the transaction
         tx.xommit();

      }
      catch (Exception e)
      {
         tx.rollback();
      }
   }

33.1.3. Using Message Selectors with MDB's

It is also possible to use MDB's with message selectors. To do this simple define your message selector as follows:

@MessageDriven(name = "MDBMessageSelectorExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
                        @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "color = 'RED'")
                     })
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
public class MDBMessageSelectorExample implements MessageListener
{
   public void onMessage(Message message)....
}

33.2. Sending Messages from within JEE components

The JCA adapter can also be used for sending messages. The Connection Factory to use is configured by default in the jms-ds.xml file and is mapped to java:/JmsXA. Using this from within a JEE component will mean that the sending of the message will be done as part of the JTA transaction being used by the component.

This means that if the sending of the message fails the overall transaction would rollback and the message redelivered. Heres an example of this from within an MDB:

@MessageDriven(name = "MDBMessageSendTxExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
                     })
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
public class MDBMessageSendTxExample implements MessageListener
{
   @Resource(mappedName = "java:JmsXA")
   ConnectionFactory connectionFactory;

   @Resource(mappedName = "queue/replyQueue")
   Queue replyQueue;

   public void onMessage(Message message)
   {
      Connection conn = null;
      try
      {
         //Step 9. We know the client is sending a text message so we cast
         TextMessage textMessage = (TextMessage)message;

         //Step 10. get the text from the message.
         String text = textMessage.getText();

         System.out.println("message " + text);

         conn = connectionFactory.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer producer = sess.createProducer(replyQueue);

         producer.send(sess.createTextMessage("this is a reply"));

      }
      catch (Exception e)
      {
         e.printStackTrace();
      }
      finally
      {
         if(conn != null)
         {
            try
            {
               conn.close();
            }
            catch (JMSException e)
            { 
            }
         }
      }
   }
   }

In JBoss Application Server you can use the JMS JCA adapter for sending messages from EJBs (including Session, Entity and Message Driven Beans), Servlets (including jsps) and custom MBeans.

33.3. Configuring the JCA Adaptor

The Java Connector Architecture (JCA) Adapter is what allows HornetQ to be integrated with JEE components such as MDB's and EJB's. It configures how components such as MDB's consume messages from the HornetQ server and also how components such as EJB's or Servlet's can send messages.

The HornetQ JCA adapter is deployed via the jms-ra.rar archive. The configuration of the Adapter is found in this archive under META-INF/ra.xml.

The configuration will look something like the following:

<resourceadapter>
      <resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
      <config-property>
         <description>The transport type</description>
         <config-property-name>ConnectorClassName</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>org.hornetq.core.remoting.impl.invm.InVMConnectorF
         actory</config-property-value>
      </config-property>
      <config-property>
         <description>The transport configuration. These values must be in the form of key=val;key=val;</description>
         <config-property-name>ConnectionParameters</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>hornetq.remoting.invm.serverid=0</config-property-value>
      </config-property>

      <outbound-resourceadapter>
         <connection-definition>
            <managedconnectionfactory-class>org.hornetq.ra.HornetQRAManagedConnection
            Factory</managedconnectionfactory-class>

            <config-property>
               <description>The default session type</description>
               <config-property-name>SessionDefaultType</config-property-name>
               <config-property-type>java.lang.String</config-property-type>
               <config-property-value>javax.jms.Queue</config-property-value>
            </config-property>
            <config-property>
               <description>Try to obtain a lock within specified number of seconds; less
               than or equal to 0 disable this functionality</description>
               <config-property-name>UseTryLock</config-property-name>
               <config-property-type>java.lang.Integer</config-property-type>
               <config-property-value>0</config-property-value>
            </config-property>

            <connectionfactory-interface>org.hornetq.ra.HornetQRAConnectionFactory
            </connectionfactory-interface>
            <connectionfactororg.hornetq.ra.HornetQConnectionFactoryImplonFactoryImpl
            </connectionfactory-impl-class>
            <connection-interface>javax.jms.Session</connection-interface>
            <connection-impl-class>org.hornetq.ra.HornetQRASession
            </connection-impl-class>
         </connection-definition>
         <transaction-support>XATransaction</transaction-support>
         <authentication-mechanism>
            <authentication-mechanism-type>BasicPassword
            </authentication-mechanism-type>
            <credential-interface>javax.resource.spi.security.PasswordCredential
            </credential-interface>
         </authentication-mechanism>
         <reauthentication-support>false</reauthentication-support>
      </outbound-resourceadapter>

      <inbound-resourceadapter>
         <messageadapter>
            <messagelistener>
               <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
               <activationspec>
                  <activationspec-class>org.hornetq.ra.inflow.HornetQActivationSpec
                  </activationspec-class>
                  <required-config-property>
                      <config-property-name>destination</config-property-name>
                  </required-config-property>
               </activationspec>
            </messagelistener>
         </messageadapter>
      </inbound-resourceadapter>

   </resourceadapter>

There are 3 main parts to this configuration.

  1. A set of global properties for the Adapter

  2. The configuration for the outbound part of the adapter. This is used for creating JMS resources within EE components.

  3. The configuration of the inbound part of the adapter. This is used for controlling the consumption of messages via MDB's.

33.3.1. Adapter Global properties

The first element you see is resourceadapter-class which should be left unchanged. This is the HornetQ resource adapter class.

After that there is a list of configuration properties. This will be where most of the configuration is done. The first 2 configure the transport used by the adapter and the rest configure the connection itself.

The following table explains what each property is for.

Table 33.1. Global Configuration Properties

Property NameProperty TypeProperty Description
ConnectorClassNameStringThe Connector class name see Chapter 16, Configuring the Transport for info on available connectors
ConnectionParametersStringThe transport configuration. These values must be in the form of key=val;key=val; and will be specific to the connector used
useLocalTxbooleanTrue will enable local transaction optimisation.
UseXAbooleanWhether XA should be used
UserNameStringThe user name to use when making a connection
PasswordStringThe password to use when making a connection
BackUpTransportTypeStringThe back up transport to use on failure.
TransportConfigurationStringThe back up transport configuration
DiscoveryGroupAddressStringThe discovery group address to use to autodetect a server
DiscoveryGroupPortintegerThe port to use for discovery
DiscoveryRefreshTimeoutlongThe timeout, in milli seconds, to refresh.
DiscoveryInitialWaitTimeoutlongThe initial time to wait for discovery.
LoadBalancingPolicyClassNameStringThe load balancing policy class to use.
PingPeriodlongThe period, in milliseconds, to ping the server for failure.
ConnectionTTLlongThe time to live for the connection.
CallTimeoutlongthe call timeout, in milli seconds, for each packet sent.
DupsOKBatchSizeintegerThe batch size of message acks to use if Dups ok is used.

continued..

TransactionBatchSizeintegerThe batch size to use for sending messages within a transaction
ConsumerWindowSizeintegerThe window size for the consumers internal buffer.
ConsumerMaxRateintegerThe max rate a consumer can receive.
ProducerWindowSizeintegerThe window size for the sending of messages.
ProducerMaxRateintegerThe max rate a producer can send messages.
MinLargeMessageSizeintegerThe size a message can be, in bytes, before it is sent as a multi part large message.
BlockOnAcknowledgebooleanIf true then block on acknowledge of messages.
BlockOnNonPersistentSendbooleanIf true then block when sending non persistent messages
BlockOnPersistentSendbooleanIf true then block when sending persistent messages
AutoGroupbooleanIf true then auto group messages
MaxConnectionsintegerThe max number of connections.
PreAcknowledgebooleanWhether to pre acknowledge messages before sending to consumer
RetryIntervallongHow long to wait , in milli seconds, before retrying a failed connection
RetryIntervalMultiplierdoubleUsed for calculating the retry interval
FailoverOnServerShutdownbooleanIf true client will reconnect to another server if available
ClientIDStringThe client ID of the connection

33.3.2. Adapter Outbound configuration

The outbound configuration should remain unchanged as they define connection factories that are used by Java EE components. These Connection Factories can be defined inside a configuration file that matches the name *-ds.xml. You'll find a default jms-ds.xml configuration under the messaging.sar directory in the Jboss AS deployment. The connection factories defined in the config file inherit their properties from the main ra.xml configuration but can also be overridden, the following example show how to define one.

Please note that this configuration only applies to install the HornetQ resource adapter in the JBoss Application Server. If you are using another JEE application server please refer to your application servers documentation for how to do this.

<tx-connection-factory>
      <jndi-name>RemoteJmsXA</jndi-name>
      <xa-transaction/>
      <rar-name>jms-ra.rar</rar-name>
      <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory
</connection-definition>
      <config-property name="SessionDefaultType" type="String">javax.jms.Topic
      </config-property>
      <config-property name="ConnectorClassName" type="String">
        org.hornetq.integration.transports.netty.NettyConnectorFactory
      </config-property>
      <config-property name="ConnectionParameters" type="String">
          hornetq.remoting.netty.port=5445</config-property>
      <max-pool-size>20</max-pool-size>
</tx-connection-factory>

In this example the connection factory will be bound to JNDI with the name RemoteJmsXA and can be looked up in the usual way using JNDI or defined within the EJB or MDB as such:

@Resource(mappedName="java:RemoteJmsXA")
private ConnectionFactory connectionFactory;

The config-property elements are what over rides those in the ra.xml config. Any of the elements pertaining to the connection factory can be over ridden here.

33.3.3. Adapter Inbound configuration

The inbound configuration should again remain unchanged. This controls what forwards messages onto MDB's. It is possible to override properties on the MDB by adding an activation configuration to the MDB itself. This could be used to configure the MDB to consume from a different server. The next section demonstrates over riding the configuration.

33.4. High Availability JNDI (HA-JNDI)

If you are using JNDI to look-up JMS queues, topics and connection factories from a cluster of servers, it is likely you will want to use HA-JNDI so that your JNDI look-ups will continue to work if one or more of the servers in the cluster fail.

HA-JNDI is a JBoss Application Server service which allows you to use JNDI from clients without them having to know the exact JNDI connection details of every server in the cluster. This service is only available if using a cluster of JBoss Application Server instances.

To use it use the following properties when connecting to JNDI.

Hashtable<String, String> jndiParameters = new Hashtable<String, String>();
jndiParameters.put("java.naming.factory.initial", 
    "org.jnp.interfaces.NamingContextFactory");
jndiParameters.put("java.naming.factory.url.pkgs=", 
    "org.jboss.naming:org.jnp.interfaces");

initialContext = new InitialContext(jndiParameters);

For more information on using HA-JNDI see the JBoss Application Server clustering documentation

33.5. The JMS Bridge

HornetQ includes a fully functional message bridge.

The function of the bridge is to consume messages from a source queue or topic, and send them to a target queue or topic, typically on a different server.

The source and target servers do not have to be in the same cluster which makes bridging suitable for reliably sending messages from one cluster to another, for instance across a WAN, and where the connection may be unreliable.

A bridge is deployed inside a JBoss AS instance. The instance can be the same instance as either the source or target server. Or could be on a third, separate JBoss AS instance.

The bridge can also be used to bridge messages from other non HornetQ JMS servers, as long as they are JMS 1.1 compliant.

Note

Don't confuse a JMS bridge with a core bridge. A JMS bridge can be used to bridge any two JMS 1.1 compliant JMS providers and uses the JMS API. A core bridge (described in Chapter 36, Core Bridges) is used to bridge any two HornetQ instances and uses the core API. Always use a core bridge if you can in preference to a JMS bridge. The core bridge will typically provide better performance than a JMS bridge. Also the core bridge can provide once and only once delivery guarantees without using XA.

The bridge has built-in resilience to failure so if the source or target server connection is lost, e.g. due to network failure, the bridge will retry connecting to the source and/or target until they come back online. When it comes back online it will resume operation as normal.

The bridge can be configured with an optional JMS selector, so it will only consume messages matching that JMS selector

It can be configured to consume from a queue or a topic. When it consumes from a topic it can be configured to consume using a non durable or durable subscription

The bridge is deployed by the JBoss Micro Container via a beans configuration file. This would typically be deployed inside the JBoss Application Server and the following example shows an example of a beans file that bridges 2 destinations which are actually on the same server.

<?xml version="1.0" encoding="UTF-8"?>

<deployment xmlns="urn:jboss:bean-deployer:2.0">

       <bean name="JMSBridge" class="org.hornetq.jms.bridge.impl.JMSBridgeImpl">
           <!-- HornetQ must be started before the bridge -->
           <depends>HornetQServer</depends>
           <constructor>
               <!-- Source ConnectionFactory Factory -->
               <parameter>
                   <inject bean="SourceCFF"/>
               </parameter>
               <!-- Target ConnectionFactory Factory -->
               <parameter>
                   <inject bean="TargetCFF"/>
               </parameter>
               <!-- Source DestinationFactory -->
               <parameter>
                   <inject bean="SourceDestinationFactory"/>
               </parameter>
               <!-- Target DestinationFactory -->
               <parameter>
                   <inject bean="TargetDestinationFactory"/>
               </parameter>
               <!-- Source User Name (no username here) -->
               <parameter><null /></parameter>
               <!-- Source Password (no password here)-->
               <parameter><null /></parameter>
               <!-- Target User Name (no username here)-->
               <parameter><null /></parameter>
               <!-- Target Password (no password here)-->
               <parameter><null /></parameter>
               <!-- Selector -->
               <parameter><null /></parameter>
               <!-- Failure Retry Interval (in ms) -->
               <parameter>5000</parameter>
               <!-- Max Retries -->
               <parameter>10</parameter>
               <!-- Quality Of Service -->
               <parameter>ONCE_AND_ONLY_ONCE</parameter>
               <!-- Max Batch Size -->
               <parameter>1</parameter>
               <!-- Max Batch Time (-1 means infinite) -->
               <parameter>-1</parameter>
               <!-- Subscription name (no subscription name here)-->
               <parameter><null /></parameter>
               <!-- Client ID  (no client ID here)-->
               <parameter><null /></parameter>
               <!-- Add MessageID In Header -->
               <parameter>true</parameter>
           </constructor>
           <property name="transactionManager">
               <inject bean="RealTransactionManager"/>
           </property>
       </bean>

       <!-- SourceCFF describes the ConnectionFactory used to connect to the 
            source destination -->
       <bean name="SourceCFF" 
            class="org.hornetq.jms.bridge.impl.JNDIConnectionFactoryFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/ConnectionFactory</parameter>
           </constructor>  
       </bean>

       <!-- TargetCFF describes the ConnectionFactory used to connect to the 
        target destination -->
       <bean name="TargetCFF" 
            class="org.hornetq.jms.bridge.impl.JNDIConnectionFactoryFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/ConnectionFactory</parameter>
           </constructor>  
       </bean>

       <!-- SourceDestinationFactory describes the Destination used as the source -->
       <bean name="SourceDestinationFactory" 
            class="org.hornetq.jms.bridge.impl.JNDIDestinationFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/queue/source</parameter>
           </constructor>  
       </bean>

       <!-- TargetDestinationFactory describes the Destination used as the target -->
       <bean name="TargetDestinationFactory" 
            class="org.hornetq.jms.bridge.impl.JNDIDestinationFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/queue/target</parameter>
           </constructor>  
       </bean>
       
       <!-- JNDI is a Hashtable containing the JNDI properties required -->
       <!-- to connect to the sources and targets JMS resrouces         -->       
      <bean name="JNDI" class="java.util.Hashtable">
         <constructor class="java.util.Map">
            <map class="java.util.Hashtable" keyClass="String"
                                             valueClass="String">
               <entry>
                  <key>java.naming.factory.initial</key>
                  <value>org.jnp.interfaces.NamingContextFactory</value>
               </entry>
               <entry>
                  <key>java.naming.provider.url</key>
                  <value>jnp://localhost:1099</value>
               </entry>
               <entry>
                  <key>java.naming.factory.url.pkgs</key>
                  <value>org.jboss.naming:org.jnp.interfaces"</value>
               </entry>
            </map>
         </constructor>
      </bean>

</deployment>

33.5.1. JMS Bridge Parameters

The main bean deployed is the JMSBridge bean. The bean is configurable by the parameters passed to its constructor.

Note

To let a parameter be unspecified (for example, if the authentication is anonymous or no message selector is provided), use <null /> for the unspecified parameter value.

  • Source Connection Factory Factory

    This injects the SourceCFF bean (also defined in the beans file). This bean is used to create the source ConnectionFactory

  • Target Connection Factory Factory

    This injects the TargetCFF bean (also defined in the beans file). This bean is used to create the target ConnectionFactory

  • Source Destination Factory Factory

    This injects the SourceDestinationFactory bean (also defined in the beans file). This bean is used to create the source Destination

  • Target Destination Factory Factory

    This injects the TargetDestinationFactory bean (also defined in the beans file). This bean is used to create the target Destination

  • Source User Name

    this parameter is the username for creating the source connection

  • Source Password

    this parameter is the parameter for creating the source connection

  • Target User Name

    this parameter is the username for creating the target connection

  • Target Password

    this parameter is the password for creating the target connection

  • Selector

    This represents a JMS selector expression used for consuming messages from the source destination. Only messages that match the selector expression will be bridged from the source to the target destination

    Note

    Ut is always more efficient to apply selectors on source topic subscriptions to source queue consumers

    The selector expression must follow the JMS selector syntax

  • Failure Retry Interval

    This represents the amount of time in ms to wait between trying to recreate connections to the source or target servers when the bridge has detected they have failed

  • Max Retries

    This represents the number of times to attempt to recreate connections to the source or target servers when the bridge has detected they have failed. The bridge will give up after trying this number of times. -1 represents 'try forever'

  • Quality Of Service

    This parameter represents the desired quality of service mode

    Possible values are:

    • AT_MOST_ONCE

    • DUPLICATES_OK

    • ONCE_AND_ONLY_ONCE

    See Section 33.5.4, “Quality Of Service” for a explanation of these modes.

  • Max Batch Size

    This represents the maximum number of messages to consume from the source destination before sending them in a batch to the target destination. Its value must >= 1

  • Max Batch Time

    This represents the maximum number of milliseconds to wait before sending a batch to target, even if the number of messages consumed has not reached MaxBatchSize. Its value must be -1 to represent 'wait forever', or >= 1 to specify an actual time

  • Subscription Name

    If the source destination represents a topic, and you want to consume from the topic using a durable subscription then this parameter represents the durable subscription name

  • Client ID

    If the source destination represents a topic, and you want to consume from the topic using a durable subscription then this attribute represents the the JMS client ID to use when creating/looking up the durable subscription

  • Add MessageID In Header

    If true, then the original message's message ID will be appended in the message sent to the destination in the header HORNETQ_BRIDGE_MSG_ID_LIST. If the message is bridged more than once, each message ID will be appended. This enables a distributed request-response pattern to be used

    Note

    when you receive the message you can send back a response using the correlation id of the first message id, so when the original sender gets it back it will be able to correlate it.

33.5.2. Source and Target Connection Factories

The source and target connection factory factories are used to create the connection factory used to create the connection for the source or target server.

The configuration example above uses the default implementation provided by HornetQ that looks up the connection factory using JNDI. For other Application Servers or JMS providers a new implementation may have to be provided. This can easily be done by implementing the interface org.hornetq.jms.bridge.ConnectionFactoryFactory.

33.5.3. Source and Target Destination Factories

Again, similarly, these are used to create or lookup up the destinations.

In the configuration example above, we have used the default provided by HornetQ that looks up the destination using JNDI.

A new implementation can be provided by implementing org.hornetq.jms.bridge.DestinationFactory interface.

33.5.4. Quality Of Service

The quality of service modes used by the bridge are described here in more detail.

33.5.4.1. AT_MOST_ONCE

With this QoS mode messages will reach the destination from the source at most once. The messages are consumed from the source and acknowledged before sending to the destination. Therefore there is a possibility that if failure occurs between removing them from the source and them arriving at the destination they could be lost. Hence delivery will occur at most once.

This mode is available for both persistent and non persistent messages.

33.5.4.2. DUPLICATES_OK

With this QoS mode, the messages are consumed from the source and then acknowledged after they have been successfully sent to the destination. Therefore there is a possibility that if failure occurs after sending to the destination but before acknowledging them, they could be sent again when the system recovers. I.e. the destination might receive duplicates after a failure.

This mode is available for both persistent and non persistent messages.

33.5.4.3. ONCE_AND_ONLY_ONCE

This QoS mode ensures messages will reach the destination from the source once and only once. (Sometimes this mode is known as "exactly once"). If both the source and the destination are on the same HornetQ server instance then this can be achieved by sending and acknowledging the messages in the same local transaction. If the source and destination are on different servers this is achieved by enlisting the sending and consuming sessions in a JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA * implementation which is a fully recovering transaction manager, thus providing a very high degree of durability. If JTA is required then both supplied connection factories need to be XAConnectionFactory implementations. This is likely to be the slowest mode since it requires extra persistence for the transaction logging.

This mode is only available for persistent messages.

Note

For a specific application it may possible to provide once and only once semantics without using the ONCE_AND_ONLY_ONCE QoS level. This can be done by using the DUPLICATES_OK mode and then checking for duplicates at the destination and discarding them. Some JMS servers provide automatic duplicate message detection functionality, or this may be possible to implement on the application level by maintaining a cache of received message ids on disk and comparing received messages to them. The cache would only be valid for a certain period of time so this approach is not as watertight as using ONCE_AND_ONLY_ONCE but may be a good choice depending on your specific application.

33.5.4.4. Example

Please see Section 11.3.4, “JMS Bridge” which shows how to configure and use a JMS Bridge to send messages to the source destination and consume them from the target destination.

33.6. XA Recovery

XA recovery deals with system or application failures to ensure that of a transaction are applied consistently to all resources affected by the transaction, even if any of the application processes or the machine hosting them crash or lose network connectivity. For more information on XA Recovery,please refer to JBoss Transactions.

When HornetQ is integrated with JBoss AS, it can take advantage of JBoss Transactions to provide recovery of messaging resources. If messages are involved in a XA transaction, in the event of a server crash, the recovery manager will ensure that the transactions are recovered and the messages will either be committed or rolled back (depending on the transaction outcome) when the server is restarted.

33.6.1. XA Recovery Configuration

To enable HornetQs XA Recovery, the Recovery Manager must be configured to connect to HornetQ to recover its resources. The following property must be added to the jta section of conf/jbossts-properties.xml of JBoss AS profiles:

<properties depends="arjuna" name="jta">
   ...
                     
   <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HornetQ1"
                value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;[connection configuration]"/>
</properties>
            

The [connection configuration] contains all the information required to connect to HornetQ node under the form [connector factory class name],[user name], [password], [connector parameters].

  • [connector factory class name] corresponds to the name of the ConnectorFactory used to connect to HornetQ. Values can be org.hornetq.core.remoting.impl.invm.InVMConnectorFactory or org.hornetq.integration.transports.netty.NettyConnectorFactory

  • [user name] is the user name to create a client session. It is optional

  • [password] is the password to create a client session. It is mandatory only if the user name is specified

  • [connector parameters] is a list of comma-separated key=value pair which are passed to the connector factory (see Chapter 16, Configuring the Transport for a list of the transport parameters).

Note

HornetQ must have a valid acceptor which corresponds to the connector specified in conf/jbossts-properties.xml.

33.6.1.1. Configuration Settings

If HornetQ is configured with a default in-vm acceptor:

<acceptor name="in-vm">
    <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
                

the corresponding configuration in conf/jbossts-properties.xml is:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
   value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>        			
                

If it is now configured with a netty acceptor on a non-default port:

<acceptor name="netty">
    <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
    <param key="hornetq.remoting.netty.port" value="8888" type="Integer"/>
</acceptor>
                

the corresponding configuration in conf/jbossts-properties.xml is:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
       value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.integration.transports.netty.NettyConnectorFactory, , , hornetq.remoting.netty.port=8888"/>        			                    
                

Note

Note the additional commas to skip the user and password before connector parameters

If the recovery must use admin, adminpass, the configuration would have been:

                    <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
                           value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.integration.transports.netty.NettyConnectorFactory, admin, adminpass, hornetq.remoting.netty.port=8888"/>        			                    
                

Configuring HornetQ with an invm acceptor and configuring the Recovery Manager with an invm connector is the recommended way to enable XA Recovery.

33.6.2. Example

See Section 11.3.8, “XA Recovery” which shows how to configure XA Recovery and recover messages after a server crash.