Chapter 32. Application Server Integration and Java EE

HornetQ can be easily installed in JBoss Application Server 5.1 or later. For details on installing HornetQ in the JBoss Application Server please refer to quick-start guide.

Since HornetQ also provides a JCA adaptor, it should also be possible to integrate HornetQ as a JMS provider in other JEE compliant app servers. For instructions on how to integrate a remote JCA adaptor into another application sever, please consult that application server's instructions.

A JCA Adapter basically controls the inflow of messages to Message Driven Beans and the outflow of messages sent from other JEE components, e.g. EJBs and Servlets.

This section explains the basics behind configuring the different JEE components in the AS.

32.1. Configuring Message Driven Beans

The delivery of messages to an MDB using HornetQ is configured on the JCA Adapter via a configuration file ra.xml which can be found under in the jms-ra.rar archive of directory. By default this is configured to consume messages using an InVM connector from the instance of HornetQ running within the application server. A full list of what is configurable is found later in this chapter.

All MDB's however need to have the destination type and the destination configured. The following example shows how this can be done via annotations.

@MessageDriven(name = "MDBExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
                     })
@ResourceAdapter("hornetq-ra.rar")
public class MDBExample implements MessageListener
{
   public void onMessage(Message message)...
}

In this example you can see that the MDB will consume messages from a queue that is mapped into JNDI with the binding queue/testQueue. This queue must be preconfigured in the usual way using the HornetQ configuration files.

The ResourceAdapter annotation is used to specify which ra adapter should be used. To use this you will need to import org.jboss.ejb3.annotation.ResourceAdapter which can be found in the jboss-ejb3-ext-api.jar which can be found in the jboss repository. Alternatively you can add use a deployment descriptor and add something like the following to jboss.xml

<message-driven>
   <ejb-name>ExampleMDB</ejb-name>
   <resource-adapter-name>quartz-ra.rar</resource-adapter-name>
</message-driven>

You can also rename the hornetq-ra.rar directory to jms-ra.rar and neither the annotation or the extra descriptor information will be needed. If you do this you will need to edit the jms-ds.xml datasource file and change rar-name element.

All the examples shipped with the HornetQ distribution use the annotation.

32.1.1. Using Container Managed Transactions

When an MDB is using Container Managed Transactions (CMT), the delivery of the message is done within the scope of a JTA transaction. The commit or rollback of this transaction is controlled by the container itself. If the transaction is rolled back then the message delivery semantics will kick in (by default this is to try and redeliver the message up to 10 times before sending to a DLQ). Using annotations this would be configured as follows:

@MessageDriven(name = "MDB_CMP_TxRequiredExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
                     })
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMP_TxRequiredExample implements MessageListener
{
   public void onMessage(Message message)...
}

The TransactionManagement annotation tells the container to treat this MDB to use Container Managed Persistence. The TransactionAttribute annotation tells the container that a JTA transaction is required for this MDB. Note that the only other valid value for this is TransactionAttributeType.NOT_SUPPORTED which tells the container that this MDB does not support JTA transactions and one should not be created.

It is also possible to inform the container that it must rollback the transaction by calling setRollbackOnly on the MessageDrivenContext. The code for this would look something like:

   @Resource
   MessageDrivenContext ctx;

   public void onMessage(Message message)
   {
      try
      {
         //something here fails
      }
      catch (Exception e)
      {
         ctx.setRollbackOnly();
      }
   }

If you don't want the over head of an xa transaction being created every time but you would still like the message delivered within a transaction (i.e. you are only using a JMS resource) then you can configure the MDB to use a local transaction. This would be configured as such:

@MessageDriven(name = "MDB_CMP_TxLocalExample",
               activationConfig =
                     {
                           @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                           @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
                           @ActivationConfigProperty(propertyName = "useLocalTx", propertyValue = "true")
                     })
@TransactionManagement(value = TransactionManagementType.CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMP_TxLocalExample implements MessageListener
{
   public void onMessage(Message message)...
}

32.1.2. Using Bean Managed Transactions

Message driven beans can also be configured to use Bean Managed Transactions (BMT). In this case a User Transaction is created. This would be configured as follows:

@MessageDriven(name = "MDB_BMPExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
                        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge")
                     })
@TransactionManagement(value= TransactionManagementType.BEAN)
@ResourceAdapter("hornetq-ra.rar")
public class MDB_BMPExample implements MessageListener
{
   public void onMessage(Message message)
}

When using Bean Managed Transactions the message delivery to the MBD will occur outside the scope of the user transaction and use the acknowledge mode specified by the user with the acknowledgeMode property. There are only 2 acceptable values for this Auto-acknowledge and Dups-ok-acknowledge.Not that because the message delivery is outside the scope of the transaction a failure within the MDB will not cause the message to be redelivered.

A user would control the lifecycle of the transaction something like the following:

   @Resource
   MessageDrivenContext ctx;

   public void onMessage(Message message)
   {
      UserTransaction tx;
      try
      {
         TextMessage textMessage = (TextMessage)message;

         String text = textMessage.getText();

         UserTransaction tx = ctx.getUserTransaction();

         tx.begin();
         //do some stuff within the transaction
         tx.xommit();

      }
      catch (Exception e)
      {
         tx.rollback();
      }
   }

32.1.3. Using Message Selectors with MDB's

It is also possible to use MDB's with message selectors. To do this simple define your message selector as follows:

@MessageDriven(name = "MDBMessageSelectorExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
                        @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "color = 'RED'")
                     })
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSelectorExample implements MessageListener
{
   public void onMessage(Message message)....
}

32.2. Sending Messages from within JEE components

The JCA adapter can also be used for sending messages. The Connection Factory to use is configured by default in the jms-ds.xml file and is mapped to java:/JmsXA. Using this from within a JEE component will mean that the sending of the message will be done as part of the JTA transaction being used by the component.

This means that if the sending of the message fails the overall transaction would rollback and the message redelivered. Heres an example of this from within an MDB:

@MessageDriven(name = "MDBMessageSendTxExample",
               activationConfig =
                     {
                        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                        @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
                     })
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSendTxExample implements MessageListener
{
   @Resource(mappedName = "java:JmsXA")
   ConnectionFactory connectionFactory;

   @Resource(mappedName = "queue/replyQueue")
   Queue replyQueue;

   public void onMessage(Message message)
   {
      Connection conn = null;
      try
      {
         //Step 9. We know the client is sending a text message so we cast
         TextMessage textMessage = (TextMessage)message;

         //Step 10. get the text from the message.
         String text = textMessage.getText();

         System.out.println("message " + text);

         conn = connectionFactory.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer producer = sess.createProducer(replyQueue);

         producer.send(sess.createTextMessage("this is a reply"));

      }
      catch (Exception e)
      {
         e.printStackTrace();
      }
      finally
      {
         if(conn != null)
         {
            try
            {
               conn.close();
            }
            catch (JMSException e)
            { 
            }
         }
      }
   }
   }

In JBoss Application Server you can use the JMS JCA adapter for sending messages from EJBs (including Session, Entity and Message Driven Beans), Servlets (including jsps) and custom MBeans.

32.3. Configuring the JCA Adaptor

The Java Connector Architecture (JCA) Adapter is what allows HornetQ to be integrated with JEE components such as MDB's and EJB's. It configures how components such as MDB's consume messages from the HornetQ server and also how components such as EJB's or Servlet's can send messages.

The HornetQ JCA adapter is deployed via the jms-ra.rar archive. The configuration of the Adapter is found in this archive under META-INF/ra.xml.

The configuration will look something like the following:

<resourceadapter>
      <resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
      <config-property>
         <description>The transport type</description>
         <config-property-name>ConnectorClassName</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>org.hornetq.core.remoting.impl.invm.InVMConnectorF
         actory</config-property-value>
      </config-property>
      <config-property>
         <description>The transport configuration. These values must be in the form of key=val;key=val;</description>
         <config-property-name>ConnectionParameters</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>serverid=0</config-property-value>
      </config-property>

      <outbound-resourceadapter>
         <connection-definition>
            <managedconnectionfactory-class>org.hornetq.ra.HornetQRAManagedConnection
            Factory</managedconnectionfactory-class>

            <config-property>
               <description>The default session type</description>
               <config-property-name>SessionDefaultType</config-property-name>
               <config-property-type>java.lang.String</config-property-type>
               <config-property-value>javax.jms.Queue</config-property-value>
            </config-property>
            <config-property>
               <description>Try to obtain a lock within specified number of seconds; less
               than or equal to 0 disable this functionality</description>
               <config-property-name>UseTryLock</config-property-name>
               <config-property-type>java.lang.Integer</config-property-type>
               <config-property-value>0</config-property-value>
            </config-property>

            <connectionfactory-interface>org.hornetq.ra.HornetQRAConnectionFactory
            </connectionfactory-interface>
            <connectionfactororg.hornetq.ra.HornetQConnectionFactoryImplonFactoryImpl
            </connectionfactory-impl-class>
            <connection-interface>javax.jms.Session</connection-interface>
            <connection-impl-class>org.hornetq.ra.HornetQRASession
            </connection-impl-class>
         </connection-definition>
         <transaction-support>XATransaction</transaction-support>
         <authentication-mechanism>
            <authentication-mechanism-type>BasicPassword
            </authentication-mechanism-type>
            <credential-interface>javax.resource.spi.security.PasswordCredential
            </credential-interface>
         </authentication-mechanism>
         <reauthentication-support>false</reauthentication-support>
      </outbound-resourceadapter>

      <inbound-resourceadapter>
         <messageadapter>
            <messagelistener>
               <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
               <activationspec>
                  <activationspec-class>org.hornetq.ra.inflow.HornetQActivationSpec
                  </activationspec-class>
                  <required-config-property>
                      <config-property-name>destination</config-property-name>
                  </required-config-property>
               </activationspec>
            </messagelistener>
         </messageadapter>
      </inbound-resourceadapter>

   </resourceadapter>

There are 3 main parts to this configuration.

  1. A set of global properties for the Adapter

  2. The configuration for the outbound part of the adapter. This is used for creating JMS resources within EE components.

  3. The configuration of the inbound part of the adapter. This is used for controlling the consumption of messages via MDB's.

32.3.1. Adapter Global properties

The first element you see is resourceadapter-class which should be left unchanged. This is the HornetQ resource adapter class.

After that there is a list of configuration properties. This will be where most of the configuration is done. The first 2 configure the transport used by the adapter and the rest configure the connection factory itself.

Note

All connection factory properties will use the defaults when not provided. This is accept for the reconnectAttempts which will default to -1 which signifies that the connection should attempt to reconnect on connection failure indefinately. This is only used when the adapter is configured to connect to a remote server as an InVM connector can never fail.

The following table explains what each property is for.

Table 32.1. Global Configuration Properties

Property NameProperty TypeProperty Description
ConnectorClassNameStringThe Connector class name see Chapter 16, Configuring the Transport for info on available connectors
ConnectionParametersStringThe transport configuration. These values must be in the form of key=val;key=val; and will be specific to the connector used
useLocalTxbooleanTrue will enable local transaction optimisation.
UseXAbooleanWhether XA should be used
UserNameStringThe user name to use when making a connection
PasswordStringThe password to use when making a connection
BackUpTransportTypeStringThe back up transport to use on failure.
TransportConfigurationStringThe back up transport configuration
DiscoveryGroupAddressStringThe discovery group address to use to autodetect a server
DiscoveryGroupPortintegerThe port to use for discovery
DiscoveryRefreshTimeoutlongThe timeout, in milli seconds, to refresh.
DiscoveryInitialWaitTimeoutlongThe initial time to wait for discovery.
LoadBalancingPolicyClassNameStringThe load balancing policy class to use.
PingPeriodlongThe period, in milliseconds, to ping the server for failure.
ConnectionTTLlongThe time to live for the connection.
CallTimeoutlongthe call timeout, in milli seconds, for each packet sent.
DupsOKBatchSizeintegerThe batch size of message acks to use if Dups ok is used.

continued..

TransactionBatchSizeintegerThe batch size to use for sending messages within a transaction
ConsumerWindowSizeintegerThe window size for the consumers internal buffer.
ConsumerMaxRateintegerThe max rate a consumer can receive.
ConfirmationWindowSizeintegerThe window size for confirmations.
ProducerMaxRateintegerThe max rate a producer can send messages.
MinLargeMessageSizeintegerThe size a message can be, in bytes, before it is sent as a multi part large message.
BlockOnAcknowledgebooleanIf true then block on acknowledge of messages.
BlockOnNonDurableSendbooleanIf true then block when sending non-durable messages
BlockOnDurableSendbooleanIf true then block when sending durable messages
AutoGroupbooleanIf true then auto group messages
PreAcknowledgebooleanWhether to pre acknowledge messages before sending to consumer
reconnectAttemptsIntegerHow attemts to try at reconnecting, default is -1
RetryIntervallongHow long to wait , in milli seconds, before retrying a failed connection
RetryIntervalMultiplierdoubleUsed for calculating the retry interval
FailoverOnServerShutdownbooleanIf true client will reconnect to another server if available
ClientIDStringThe client ID of the connection

32.3.2. Adapter Outbound configuration

The outbound configuration should remain unchanged as they define connection factories that are used by Java EE components. These Connection Factories can be defined inside a configuration file that matches the name *-ds.xml. You'll find a default jms-ds.xml configuration under the hornetq.sar directory in the Jboss AS deployment. The connection factories defined in the config file inherit their properties from the main ra.xml configuration but can also be overridden, the following example show how to define one.

Please note that this configuration only applies to install the HornetQ resource adapter in the JBoss Application Server. If you are using another JEE application server please refer to your application servers documentation for how to do this.

<tx-connection-factory>
      <jndi-name>RemoteJmsXA</jndi-name>
      <xa-transaction/>
      <rar-name>jms-ra.rar</rar-name>
      <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory
</connection-definition>
      <config-property name="SessionDefaultType" type="String">javax.jms.Topic
      </config-property>
      <config-property name="ConnectorClassName" type="String">
        org.hornetq.integration.transports.netty.NettyConnectorFactory
      </config-property>
      <config-property name="ConnectionParameters" type="String">
          port=5445</config-property>
      <max-pool-size>20</max-pool-size>
</tx-connection-factory>

In this example the connection factory will be bound to JNDI with the name RemoteJmsXA and can be looked up in the usual way using JNDI or defined within the EJB or MDB as such:

@Resource(mappedName="java:RemoteJmsXA")
private ConnectionFactory connectionFactory;

The config-property elements are what over rides those in the ra.xml config. Any of the elements pertaining to the connection factory can be over ridden here.

32.3.3. Adapter Inbound configuration

The inbound configuration should again remain unchanged. This controls what forwards messages onto MDB's. It is possible to override properties on the MDB by adding an activation configuration to the MDB itself. This could be used to configure the MDB to consume from a different server. The next section demonstrates over riding the configuration.

32.4. High Availability JNDI (HA-JNDI)

If you are using JNDI to look-up JMS queues, topics and connection factories from a cluster of servers, it is likely you will want to use HA-JNDI so that your JNDI look-ups will continue to work if one or more of the servers in the cluster fail.

HA-JNDI is a JBoss Application Server service which allows you to use JNDI from clients without them having to know the exact JNDI connection details of every server in the cluster. This service is only available if using a cluster of JBoss Application Server instances.

To use it use the following properties when connecting to JNDI.

Hashtable<String, String> jndiParameters = new Hashtable<String, String>();
jndiParameters.put("java.naming.factory.initial", 
    "org.jnp.interfaces.NamingContextFactory");
jndiParameters.put("java.naming.factory.url.pkgs=", 
    "org.jboss.naming:org.jnp.interfaces");

initialContext = new InitialContext(jndiParameters);

For more information on using HA-JNDI see the JBoss Application Server clustering documentation

32.5. XA Recovery

XA recovery deals with system or application failures to ensure that of a transaction are applied consistently to all resources affected by the transaction, even if any of the application processes or the machine hosting them crash or lose network connectivity. For more information on XA Recovery,please refer to JBoss Transactions.

When HornetQ is integrated with JBoss AS, it can take advantage of JBoss Transactions to provide recovery of messaging resources. If messages are involved in a XA transaction, in the event of a server crash, the recovery manager will ensure that the transactions are recovered and the messages will either be committed or rolled back (depending on the transaction outcome) when the server is restarted.

32.5.1. XA Recovery Configuration

To enable HornetQs XA Recovery, the Recovery Manager must be configured to connect to HornetQ to recover its resources. The following property must be added to the jta section of conf/jbossts-properties.xml of JBoss AS profiles:

<properties depends="arjuna" name="jta">
   ...
                     
   <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HornetQ1"
                value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;[connection configuration]"/>
</properties>
            

The [connection configuration] contains all the information required to connect to HornetQ node under the form [connector factory class name],[user name], [password], [connector parameters].

  • [connector factory class name] corresponds to the name of the ConnectorFactory used to connect to HornetQ. Values can be org.hornetq.core.remoting.impl.invm.InVMConnectorFactory or org.hornetq.integration.transports.netty.NettyConnectorFactory

  • [user name] is the user name to create a client session. It is optional

  • [password] is the password to create a client session. It is mandatory only if the user name is specified

  • [connector parameters] is a list of comma-separated key=value pair which are passed to the connector factory (see Chapter 16, Configuring the Transport for a list of the transport parameters).

Note

HornetQ must have a valid acceptor which corresponds to the connector specified in conf/jbossts-properties.xml.

32.5.1.1. Configuration Settings

If HornetQ is configured with a default in-vm acceptor:

<acceptor name="in-vm">
    <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
                

the corresponding configuration in conf/jbossts-properties.xml is:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
   value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>        			
                

If it is now configured with a netty acceptor on a non-default port:

<acceptor name="netty">
    <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
    <param key="port" value="8888"/>
</acceptor>
                

the corresponding configuration in conf/jbossts-properties.xml is:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
       value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.integration.transports.netty.NettyConnectorFactory, , , port=8888"/>        			                    
                

Note

Note the additional commas to skip the user and password before connector parameters

If the recovery must use admin, adminpass, the configuration would have been:

                    <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
                           value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.integration.transports.netty.NettyConnectorFactory, admin, adminpass, port=8888"/>        			                    
                

Configuring HornetQ with an invm acceptor and configuring the Recovery Manager with an invm connector is the recommended way to enable XA Recovery.

32.5.2. Example

See Section 11.3.8, “XA Recovery” which shows how to configure XA Recovery and recover messages after a server crash.