Chapter 7. 使用JMS

很多用户喜欢使JMS,因此HornetQ提供了JMS服务。

JMS是一个普遍使用API标准,绝大多数的消息系统都提供JMS接口。如果你对JMS还不熟悉,建议你先参考一下 Sun的 JMS 教程

HornetQ还提供了许多的JMS的示例程序(examples)。比如简单的JMS Queue和Topic的示例,就很适合初学者做为了 解HornetQ JMS的起点。Chapter 11, 例子对这些示例作了详细的说明。

下面我们将带领读者一步步地配置HornetQ的JMS服务,并创建一个简单的JMS程序。我们还将展示如何在没有JNDI的情况下 来使用HornetQ中的JMS。

7.1. 一个简单的订购系统

本章我们将用一个简单的订购系统做为一个例子。尽管它十分简单,但是它能够很好地向大家展示JMS的设置和使用。

本例中有一个名为 OrderQueueJMS队列,还将有一个 MessageProducer 用来向队列发送订购消息。发送到队列的消息由一个 MessageConsumer 来接收。

我们所用的队列是持久(durable)的队列,也就是说这个队列不受服务器故障的影响。当服务器 发生故障重新启动后,这个队列仍然存在。我们需要把这个队列事先部署好。办法就是将队列写到JMS的配置文件中。当服务启动 时将配置文件中的队列自动部署好。

7.2. JMS服务的配置

hornetq-jms.xml文件包含了需要创建与部署的JMS Queue,Topic和ConnectionFactory 的实例。该文件必须要指定在classpath中。从这个文件中部署好的对象都可以用JNDI来找到。

JMS客户端可以利用JMS ConnectionFactory对象来创建与服务器的连接。ConnectionFactory中有关于服务器地址的 信息以及各种参数。通常使用这些参数的默认值即可。

这里我们将要在服务器端部署一个JMS队列和一个JMS ConnectionFactory (连接工厂)。当然完全可以部署多个JMS对象。 下面给出了具体的配置内容:

<configuration xmlns="urn:hornetq" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq ../schemas/hornetq-jms.xsd ">
    
    <connection-factory name="ConnectionFactory">
        <connectors>
           <connector-ref connector-name="netty"/>
        </connectors>
        <entries>
            <entry name="ConnectionFactory"/>           
        </entries>
    </connection-factory>
    
    <queue name="OrderQueue">
        <entry name="queues/OrderQueue"/>
    </queue>
    
</configuration> 
        

在本文件中我们部署了一个名为 ConnectionFactory 的一个连接工厂,并且将其绑定到 JNDI中。如果需要可以将一个连接工厂绑定为多个名称。只需要将绑定的名字加入到 entry 中即可。

Note

在JMS ConnectionFactory的配置中引用了一个名为 nettyconnector。 它实际上指向的是HornetQ核心中部署的一个连接器(connector)。它的配置在HornetQ的核心配置文件 hornetq-configuration.xml 中。它定义了采用何种传输与服务器连接。

7.3. JNDI的配置

当客户端使用JNDI时需要定义一些JNDI的参数。这些参数主要用来确定JNDI服务的地址。这些参数通常保存在 一个名为 jndi.properties 的文件中。这个文件需要在客户端的classpath中。或者你 可以在创建JNDI的InitialContext时将这些参数传进去。想了解全面的JNDI知识,可以参见 Sun JNDI 教程

要与JBoss的JNDI Server进行通迅,需要指定以下的JNDI参数:

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=jnp://myhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces                        
        

其中的 myhost 是 JNDI server的主机名或IP地址。 1099是端口号,根据不同的配置, 端口号也可能不同。

在默认的单独方式(standalone)配置中,JNDI服务端口等参数定义在hornetq-beans.xml 文件中的 JNDIServer bean下,如:

<bean name="JNDIServer" class="org.jnp.server.Main">
    <property name="namingInfo">
        <inject bean="Naming"/>
    </property>
    <property name="port">1099</property>
    <property name="bindAddress">localhost</property>
    <property name="rmiPort">1098</property>
    <property name="rmiBindAddress">localhost</property>
</bean>                        
        

Note

如果你的JNDI服务器与客户端不在同一台机器上,一定不要忘记将bindAddress改成相应的地址, 千万不能用localhost

Note

只有当HornetQ作为独立服务器运行时 才可以配置JNDIServer bean。当HornetQ运行于JBoss应用服务器中时,由于JBOSS服务器已经提供了 JNDI服务,所以就不需要再进行配置了。

7.4. 程序代码

下面给出的例子中的代码:

首先我们创建一个JNDI的Initial Context:

InitialContect ic = new InitialContext();

下面我们查找 connection factory:

ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");

然后查找 Queue:

Queue orderQueue = (Queue)ic.lookup("/queues/OrderQueue");

接下来用拿到的ConnectionFactory建立JMS连接:

Connection connection = cf.createConnection();

再创建一个非事务的、AUTO_ACKNOWLEDGE方式的JMS Session:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

创建一个 MessageProducer 以向队列发送订单消息:

MessageProducer producer = session.createProducer(orderQueue);

创建一个 MessageConsumer 以从队列中接收订单消息:

MessageConsumer consumer = session.createConsumer(orderQueue);

要启动连接,以使消息能传递给接收者:

connection.start();

发送一个简单的TextMessage:

TextMessage message = session.createTextMessage("This is an order");
producer.send(message);

之后接收这个消息:

TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());
        

看起来就是这么简单。 在HornetQ有发布包中有很多各种各样的JMS例子供用户参考。

Warning

请注意,JMS的连接(connection)、会话(session)、生产者(producer)和消费者(consumer) 对象是可以重用的。

如果每发送或接收一个消息都要重新创建这些JMS对象,是不符合设计模式的要求的。这样做会造成应用程序 的性能很差。这方面的内容在Chapter 46, 性能调优中将会进一步的讨论。

7.5. 不使用JNDI而直接创建JMS的对象

尽管采用JNDI对 JMS 的各种管理对象(Administered Objects) (即JMS Queue, Topic and ConnectionFactory)是很常用的方法,但在有些 情况时JNDI不可用,或者你不需要用JNDI时,如何还能正常使用JMS呢?

HornetQ允许你不通过JNDI也能使用JMS。HornetQ支持直接创建JMS的各种对象而无需JNDI的存在。

Chapter 11, 例子中包括有这样的例子供读者参考。

下面我们就将上述那个简单的例子重写,以抛开对JNDI的依赖:

我们通过HornetQJMSClient类来方便地创建JMS的ConnectionFactory。注意这里要提供各种连接参数和定义 所用的传输方式。有关连接器(connector)的信息参见Chapter 16, 传输层的配置

              
TransportConfiguration transportConfiguration = 
                     new TransportConfiguration(NettyConnectorFactory.class.getName());                
ConnectionFactory cf = HornetQJMSClient.createConnectionFactory(transportConfiguration);
        

同样利用HornetQJMSClient类创建JMS Queue对象:

Queue orderQueue = HornetQJMSClient.createQueue("OrderQueue");

然后用连接工厂创建 JMS 连接:

Connection connection = cf.createConnection();

还有非事务的\AUTO_ACKNOWLEDGE方式的 JMS 会话(session):

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

以及用于发送消息的MessageProducer:

MessageProducer producer = session.createProducer(orderQueue);

和接收消息的 MessageConsumer:

MessageConsumer consumer = session.createConsumer(orderQueue);

启动连接:

connection.start();

创建一个简单的 TextMessage 并将其发送到队列:

TextMessage message = session.createTextMessage("This is an order");
producer.send(message);

接收消息:

TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());
        

7.6. Client ID的设置

在建立持久的订阅(subscription)时,JMS客户需要有一个客户ID (client id)。我们可以通过配置 connection factory来定义它。(其中的 client-id项)。这样所有通过这个 connection factory来创建的连接都具有这个客户ID。

7.7. 设置DUPS_OK的Batch Size

如果JMS的通知模式为DUPS_OK,我们可以配置接收者(consumer)以使得它以批为单位 发送通知,而不是一个一个地发通知。这样做可以节省很多带宽,效率高。配置的方法是设置connection factory下 的dups-ok-batch-size项。单位是字节(byte)。默认值是1024 * 1024 bytes = 1 MiB。

7.8. 设置事务(Transaction)的Batch Size

当在一个事务内接收消息时,可能通过配置使接收者采用批量的方式发送通知,而不是一个一个的发送。这样也可以节省带宽。 配置方法是设置connection factory下的transaction-batch-size项。 单位是字节(byte)。默认值是1024 * 1024。