Chapter 33. JMS桥(Bridge)

HornetQ提供了JMS消息桥服务。

桥的作用是从一个消息源队列或话题(topic)接收消息,然后将它们发送到一个目标队列或话题。通常源和 目的不在同一台服务器上。

作为消息源的服务器与目的服务器不必在同一个集群内。通过桥的作用,两台服务器可以通过非可靠的网络连接 起来,比如WAN。

桥可以作为单独的服务部署,或者部署于HornetQ单独服务器内,或者部署在JBoss应用服务器中。源或目的可以 在同一个VM中,也可以在其它的VM中。

桥还可以在HornetQ服务器与其它JMS 1.1 兼容的服务器之间进行消息的传递。

Note

还要将JMS桥与核心桥混淆。JMB桥可以连接两个JMS 1.1兼容的服务器,它使用的是JMS接口。 而核心桥(在Chapter 36, 核心桥中描述)使用核心API将两个HornetQ实例连接 起来。核心桥的性能通常要比JMS桥的性能高,所以尽可能使用核心桥。另外核心桥可以不用XA 就可以实现一次并只有一次的消息传递保证。

桥可以适当处理连接故障。当源的连接或目的的连接发生故障时,例如网络故障,桥将不断重试连接直到连接 恢复为止。当连接恢复后,桥会继续正常工作。

桥还可以有一个可选的JMS选择器,它可以使桥只接收选择器选择的消息。

可以配置桥从队列还是从话题中接收消息。如果配置成从话题中接收消息,还以设定是以非持久订阅的方式接收,还是 以持久订阅的方式接收。

通常桥是通过一个bean配置文件由JBoss Micro Container部署到JBoss应用服务器中。下面的就是一 个桥的bean文件例子。这个桥将同一服务器上的两个目标连接起来。

<?xml version="1.0" encoding="UTF-8"?>

<deployment xmlns="urn:jboss:bean-deployer:2.0">

       <bean name="JMSBridge" class="org.hornetq.api.jms.bridge.impl.JMSBridgeImpl">
           <!-- HornetQ must be started before the bridge -->
           <depends>HornetQServer</depends>
           <constructor>
               <!-- Source ConnectionFactory Factory -->
               <parameter>
                   <inject bean="SourceCFF"/>
               </parameter>
               <!-- Target ConnectionFactory Factory -->
               <parameter>
                   <inject bean="TargetCFF"/>
               </parameter>
               <!-- Source DestinationFactory -->
               <parameter>
                   <inject bean="SourceDestinationFactory"/>
               </parameter>
               <!-- Target DestinationFactory -->
               <parameter>
                   <inject bean="TargetDestinationFactory"/>
               </parameter>
               <!-- Source User Name (no username here) -->
               <parameter><null /></parameter>
               <!-- Source Password (no password here)-->
               <parameter><null /></parameter>
               <!-- Target User Name (no username here)-->
               <parameter><null /></parameter>
               <!-- Target Password (no password here)-->
               <parameter><null /></parameter>
               <!-- Selector -->
               <parameter><null /></parameter>
               <!-- Failure Retry Interval (in ms) -->
               <parameter>5000</parameter>
               <!-- Max Retries -->
               <parameter>10</parameter>
               <!-- Quality Of Service -->
               <parameter>ONCE_AND_ONLY_ONCE</parameter>
               <!-- Max Batch Size -->
               <parameter>1</parameter>
               <!-- Max Batch Time (-1 means infinite) -->
               <parameter>-1</parameter>
               <!-- Subscription name (no subscription name here)-->
               <parameter><null /></parameter>
               <!-- Client ID  (no client ID here)-->
               <parameter><null /></parameter>
               <!-- Add MessageID In Header -->
               <parameter>true</parameter>
               <!-- register the JMS Bridge in the AS MBeanServer -->
               <parameter>
                   <inject bean="MBeanServer"/>
               </parameter>
               <parameter>org.hornetq:service=JMSBridge</parameter>
             </constructor>
           <property name="transactionManager">
               <inject bean="RealTransactionManager"/>
           </property>
       </bean>

       <!-- SourceCFF describes the ConnectionFactory used to connect to the 
            source destination -->
       <bean name="SourceCFF" 
            class="org.hornetq.api.jms.bridge.impl.JNDIConnectionFactoryFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/ConnectionFactory</parameter>
           </constructor>  
       </bean>

       <!-- TargetCFF describes the ConnectionFactory used to connect to the 
        target destination -->
       <bean name="TargetCFF" 
            class="org.hornetq.api.jms.bridge.impl.JNDIConnectionFactoryFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/ConnectionFactory</parameter>
           </constructor>  
       </bean>

       <!-- SourceDestinationFactory describes the Destination used as the source -->
       <bean name="SourceDestinationFactory" 
            class="org.hornetq.api.jms.bridge.impl.JNDIDestinationFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/queue/source</parameter>
           </constructor>  
       </bean>

       <!-- TargetDestinationFactory describes the Destination used as the target -->
       <bean name="TargetDestinationFactory" 
            class="org.hornetq.api.jms.bridge.impl.JNDIDestinationFactory">
           <constructor>
               <parameter>
                   <inject bean="JNDI" />
               </parameter>
               <parameter>/queue/target</parameter>
           </constructor>  
       </bean>
       
       <!-- JNDI is a Hashtable containing the JNDI properties required -->
       <!-- to connect to the sources and targets JMS resrouces         -->       
      <bean name="JNDI" class="java.util.Hashtable">
         <constructor class="java.util.Map">
            <map class="java.util.Hashtable" keyClass="String"
                                             valueClass="String">
               <entry>
                  <key>java.naming.factory.initial</key>
                  <value>org.jnp.interfaces.NamingContextFactory</value>
               </entry>
               <entry>
                  <key>java.naming.provider.url</key>
                  <value>jnp://localhost:1099</value>
               </entry>
               <entry>
                  <key>java.naming.factory.url.pkgs</key>
                  <value>org.jboss.naming:org.jnp.interfaces"</value>
               </entry>
            </map>
         </constructor>
      </bean>

      <bean name="MBeanServer" class="javax.management.MBeanServer">
         <constructor factoryClass="org.jboss.mx.util.MBeanServerLocator"
                      factoryMethod="locateJBoss"/>
      </bean>
</deployment>

33.1. JMS桥的配置参数

桥的主要的bean是JMSBridge。所有的配置参数需要传递给这个bean的 构造函数。

Note

如果不想指定某个参数的值(例如匿名认证或没有选择器),将该参数设为<null />即可。

  • 源连接工厂的工厂(Source ConnectionFactory Factory)

    这个参数注入一个SourceCFFbean(由bean文件定义)。它被 用来创建ConnectionFactory

  • 目标连接工厂的工厂(Target ConnectionFactory Factory)

    这个参数注入一个TargetCFFbean(由bean文件定义)。它被 用来创建目的ConnectionFactory

  • 源目标工厂(Source DestinationFactory)

    这个参数注入一个SourceDestinationFactorybean(由 bean文件定义)。它用来创建 目标(Destination)

  • 目的目标工厂(Target DestinationFactory)

    这个参数注入一个TargetDestinationFactorybean(由 bean文件定义)。它用来创建目的 目标(Destination)

  • 源用户名(Source User Name)

    用于创建到的连接的用户名

  • 源密码(Source Password)

    用于创建连接的密码

  • 目的用户名(Target User Name)

    用于创建目的连接的用户名

  • 目的密码(Target Password)

    t用于创建目的连接的密码

  • 选择器(Selector)

    这是一个JMS的选择器表达式,它用于从源目标接收消息。只有与选择器相匹配的消息才会被桥 转发到目的目标

    选择器必须符合JMS 选择器语法

  • 故障重试间隔(Failure Retry Interval)

    代表当桥发现连接故障时在每两次重试连接之间所要等待的时间间隔,单位毫秒

  • 最大重试次数(Max Retries)

    表示桥在发现连接故障时所进行的最大重试次数。超过这个次数,桥就放弃重试。 -1代表一直重试下去

  • 服务质量(Quality Of Service)

    这个参数代表所需要的服务质量模式

    有效的值为:

    • AT_MOST_ONCE

    • DUPLICATES_OK

    • ONCE_AND_ONLY_ONCE

    有关这些模式的解释,参见Section 33.4, “服务质量”

  • 最大批量(Max Batch Size)

    表示桥一次性从源目标最多接收多少消息,并将它们一次发往目的地。它的值必须是 >= 1

  • 最大批时间(Max Batch Time)

    代表桥在将一批消息发向目的之前等待的最大毫秒数。这个时间过后,即使接收的消息数小于 MaxBatchSize,桥也会开始向目的发送消息。它的值必须是 -1 (代表永远等待)或>= 1

  • 订阅名(Subscription Name)

    如果源的目标是一个话题(topic),你想使用持久的订阅来接收消息的话,这个参数可以指定 订阅名。

  • 客户ID(Client ID)

    如果源的目标是一个话题(topic),你想使用持久的订阅来接收消息的话,这个参数可以指定 JMS的客户ID。它用于创建/查找持久订阅。

  • 在消息头添加MessageID(Add MessageID In Header)

    如果值为true,原始的消息ID在发往目的是回到消息的名为HORNETQ_BRIDGE_MSG_ID_LIST的头中。如果一个消息被桥转发了多次, 则每次转发的消息ID都添加在这个头中。这用于分布式请求/回答的消息模式。

    Note

    当收到一个消息时,通过它的相关ID(coorelation id)可以发送一个回答。这样 在消息发送方得到这个回答消息时,它可以与原消息相关联起来。

  • MBean服务器(MBean Server)

    要使用JMX管理JMS桥,需指定JMS桥所要注册的MBeanServer(如JVM Platform MBeanServer 或 JBoss 应用服务器的MBeanServer)

  • ObjectName

    设置了MBeanServer后,你还需要设置JMS桥MBean注册用的ObjectName(必须是唯一的)

33.2. 源和目的的连接工厂

源工目的的连接工厂分别用于创建到源和到目的的连接。

上面的配置例子中使用的是HornetQ提供的默认实现。它使用JNDI查找连接工厂。对于其它的应用服务器 或JMS提供者,需要实现相应的实现,即实现org.hornetq.jms.bridge.ConnectionFactoryFactory接口。

33.3. 源和目的的目标工厂

它们用来创建或查找相应的目标。

上面例子中,我们使用了HornetQ的默认实现,从JNDI中查找相应的对象。

要提供新的实现,只要实现接口org.hornetq.jms.bridge.DestinationFactory即可。

33.4. 服务质量

下面给是桥的三种服务质量的详细说明。

33.4.1. AT_MOST_ONCE

这种QoS模式表示的是消息最多送达目标一次。在消息发往目的之前,消息就会被通知。因此, 如果在消息被源删除但并未到达目的时发生故障,消息有可能丢失。所以说消息的 发送最多一次。

这个模式适用于持久或非持久的消息。

33.4.2. DUPLICATES_OK

在这个QoS模式下,消息从源接收后再发送到目的,之后才对源进行消息通知。这样如果在发送成功之后 消息通知前的时间内发生故障的话,在故障恢复时同一个消息可能被再次传递。結果可能是在目的处 该消息收到了两次。

这个模式适用于持久或非持久的消息。

33.4.3. ONCE_AND_ONLY_ONCE

这个模式保证消息从源发送到目的一次,并且只有一次。(有时这个模式又称为“只一次”)。若源与目的处于 同一个HornetQ服务器中,这个模式通过本地事务来保证消息的发送和通知。如果是在不同的服务器上, 则会使用一个JTA的事务将发送和接收包括其中。这里使用的JTA事务是JBoss的实现,它包含有一个 完整的事务恢复管理器,所以能提供高度可靠的持久性。如果使用JTA则桥的所有连接工厂必须是 XAConnectionFactory。这种模式的效率通常是最低的,因为它需要额外记录事务的日志。

这个模式只适用于持久性消息。

Note

某些情况下可以不使用ONCE_AND_ONLY_ONCE模式,而同样可以保证“一次且只一次”的效果。 这是通过使用DUPLICATES_OK模式,加上在目的端应用程序来检测重复的消息,如果有则将其丢弃。 一些JMS服务器本身提供自动重复消息检测的功能,这样节省了在应用层实现的工作。在应用层常见 的实现方法是将接收到的消息的ID存放到缓存文件中,然后与每个新到的消息进行对比。这种方式 可能在某段时间内有效,所以它不如ONCE_AND_ONLY_ONCE那样严格,它视具体情况也可能是一个 不错的选择。

33.4.4. 例子

参见Section 11.3.5, “JMS 桥(Bridge)”。这个例子展示了如何在JBoss应用服务器中配置并使用 JMS桥从一处目标将消息转发到另一个目标。

关于如何在两个单独HornetQ服务器间使用桥的例子,请参见Section 11.1.20, “JMS桥(Bridge)”