Chapter 19. Flow Control

Flow control is used to limit the flow of data between a client and server, or a server and another server in order to prevent the client or server being overwhelmed with data.

19.1. Consumer Flow Control

This controls the flow of data between the server and the client as the client consumes messages. For performance reasons clients normally buffer messages before delivering to the consumer via the receive() method or asynchronously via a message listener. If the consumer cannot process messages as fast as they are being delivered and stored in the internal buffer, then you could end up with a situation where messages would keep building up possibly causing out of memory on the client if they cannot be processed in time.

19.1.1. Window-Based Flow Control

By default, HornetQ consumers buffer messages from the server in a client side buffer before the client consumes them. This improves performance: otherwise every time the client consumes a message, HornetQ would have to go the server to request the next message. In turn, this message would then get sent to the client side, if one was available.

A network round trip would be involved for every message and considerably reduce performance.

To prevent this, HornetQ pre-fetches messages into a buffer on each consumer. The total maximum size of messages (in bytes) that will be buffered on each consumer is determined by the consumer-window-size parameter.

By default, the consumer-window-size is set to 1 MiB (1024 * 1024 bytes).

The value can be:

Setting the consumer window size can considerably improve performance depending on the messaging use case. As an example, let's consider the two extremes:

Fast consumers

Fast consumers can process messages as fast as they consume them (or even faster)

To allow fast consumers, set the consumer-window-size to -1. This will allow unbounded message buffering on the client side.

Use this setting with caution: it can overflow the client memory if the consumer is not able to process messages as fast as it receives them.

Slow consumers

Slow consumers takes significant time to process each message and it is desirable to prevent buffering messages on the client side so that they can be delivered to another consumer instead.

Consider a situation where a queue has 2 consumers; 1 of which is very slow. Messages are delivered in a round robin fashion to both consumers, the fast consumer processes all of its messages very quickly until its buffer is empty. At this point there are still messages awaiting to be processed in the buffer of the slow consumer thus preventing them being processed by the fast consumer. The fast consumer is therefore sitting idle when it could be processing the other messages.

To allow slow consumers, set the consumer-window-size to 0 (for no buffer at all). This will prevent the slow consumer from buffering any messages on the client side. Messages will remain on the server side ready to be consumed by other consumers.

Setting this to 0 can give deterministic distribution between multiple consumers on a queue.

Most of the consumers cannot be clearly identified as fast or slow consumers but are in-between. In that case, setting the value of consumer-window-size to optimize performance depends on the messaging use case and requires benchmarks to find the optimal value, but a value of 1MiB is fine in most cases.

19.1.1.1. Using Core API

If HornetQ Core API is used, the consumer window size is specified by ClientSessionFactory.setConsumerWindowSize() method and some of the ClientSession.createConsumer() methods.

19.1.1.2. Using JMS

if JNDI is used to look up the connection factory, the consumer window size is configured in hornetq-jms.xml:

<connection-factory name="ConnectionFactory">
   <connectors>
      <connector-ref connector-name="netty-connector"/>
   </connectors>
   <entries>
      <entry name="ConnectionFactory"/>       
   </entries>
      
   <!-- Set the consumer window size to 0 to have *no* buffer on the client side -->
   <consumer-window-size>0</consumer-window-size>
</connection-factory>
            

If the connection factory is directly instantiated, the consumer window size is specified by HornetQConnectionFactory.setConsumerWindowSize() method.

Please see Section 11.1.32, “No Consumer Buffering” for an example which shows how to configure HornetQ to prevent consumer buffering when dealing with slow consumers.

19.1.2. Rate limited flow control

It is also possible to control the rate at which a consumer can consume messages. This is a form of throttling and can be used to make sure that a consumer never consumes messages at a rate faster than the rate specified.

The rate must be a positive integer to enable this functionality and is the maximum desired message consumption rate specified in units of messages per second. Setting this to -1 disables rate limited flow control. The default value is -1.

Please see Section 11.1.10, “Message Consumer Rate Limiting” for a working example of limiting consumer rate.

19.1.2.1. Using Core API

If the HornetQ core API is being used the rate can be set via the ClientSessionFactory.setConsumerMaxRate(int consumerMaxRate) method or alternatively via some of the ClientSession.createConsumer() methods.

19.1.2.2. Using JMS

If JNDI is used to look up the connection factory, the max rate can be configured in hornetq-jms.xml:

<connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty-connector"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>       
      </entries>
      <!-- We limit consumers created on this connection factory to consume messages
             at a maximum rate
      of 10 messages per sec -->
      <consumer-max-rate>10</consumer-max-rate>
 </connection-factory>

If the connection factory is directly instantiated, the max rate size can be set via the HornetQConnectionFactory.setConsumerMaxRate(int consumerMaxRate) method.

Note

Rate limited flow control can be used in conjunction with window based flow control. Rate limited flow control only effects how many messages a client can consume in a second and not how many messages are in its buffer. So if you had a slow rate limit and a high window based limit the clients internal buffer would soon fill up with messages.

Please see Section 11.1.10, “Message Consumer Rate Limiting” for an example which shows how to configure HornetQ to prevent consumer buffering when dealing with slow consumers.

19.2. Producer flow control

HornetQ also can limit the amount of data sent from a client to a server to prevent the server being overwhelmed.

19.2.1. Window based flow control

In a similar way to consumer window based flow control, HornetQ producers, by default, can only send messages to an address as long as they have sufficient credits to do so. The amount of credits required to send a message is given by the size of the message.

As producers run low on credits they request more from the server, when the server sends them more credits they can send more messages.

The amount of credits a producer requests in one go is known as the window size.

The window size therefore determines the amount of bytes that can be in-flight at any one time before more need to be requested - this prevents the remoting connection from getting overloaded.

19.2.1.1. Using Core API

If the HornetQ core API is being used, window size can be set via the ClientSessionFactory.setProducerWindowSize(int producerWindowSize) method.

19.2.1.2. Using JMS

If JNDI is used to look up the connection factory, the producer window size can be configured in hornetq-jms.xml:

               <connection-factory name="ConnectionFactory">
                  <connectors>
                     <connector-ref connector-name="netty-connector"/>
                  </connectors>
                  <entries>
                     <entry name="ConnectionFactory"/>       
                  </entries>
                  <producer-window-size>10</producer-window-size>
               </connection-factory>

If the connection factory is directly instantiated, the producer window size can be set via the HornetQConnectionFactory.setProducerWindowSize(int producerWindowSize) method.

19.2.1.3. Blocking producer window based flow control

Normally the server will always give the same number of credits as have been requested. However, it is also possible to set a maximum size on any address, and the server will never send more credits than could cause the address's upper memory limit to be exceeded.

For example, if I have a JMS queue called "myqueue", I could set the maximum memory size to 10MiB, and the the server will control the number of credits sent to any producers which are sending any messages to myqueue such that the total messages in the queue never exceeds 10MiB.

When the address gets full, producers will block on the client side until more space frees up on the address, i.e. until messages are consumed from the queue thus freeing up space for more messages to be sent.

We call this blocking producer flow control, and it's an efficient way to prevent the server running out of memory due to producers sending more messages than can be handled at any time.

It is an alternative approach to paging, which does not block producers but instead pages messages to storage.

To configure an address with a maximum size and tell the server that you want to block producers for this address if it becomes full, you need to define an AddressSettings (Section 25.3, “Configuring Queues Via Address Settings”) block for the address and specify max-size-bytes and address-full-policy

The address block applies to all queues registered to that address. I.e. the total memory for all queues bound to that address will not exceed max-size-bytes. In the case of JMS topics this means the total memory of all subscriptions in the topic won't exceed max-size-bytes.

Here's an example:

               <address-settings>
                  <address-setting match="jms.queue.exampleQueue">            
                     <max-size-bytes>100000</max-size-bytes>
                     <address-full-policy>DROP</address-full-policy>   
                  </address-setting>
               </address-settings>

The above example would set the max size of the JMS queue "exampleQueue" to be 100000 bytes and would block any producers sending to that address to prevent that max size being exceeded.

Note the policy must be set to DROP to enable blocking producer flow control.

Please note the default value for address-full-policy is to PAGE. Please see the chapter on paging for more information on paging.

19.2.2. Rate limited flow control

HornetQ also allows the rate a producer can emit message to be limited, in units of messages per second. By specifying such a rate, HornetQ will ensure that producer never produces messages at a rate higher than that specified.

The rate must be a positive integer to enable this functionality and is the maximum desired message consumption rate specified in units of messages per second. Setting this to -1 disables rate limited flow control. The default value is -1.

Please see the Section 11.1.36, “Message Producer Rate Limiting” for a working example of limiting producer rate.

19.2.2.1. Using Core API

If the HornetQ core API is being used the rate can be set via the ClientSessionFactory.setProducerMaxRate(int consumerMaxRate) method or alternatively via some of the ClientSession.createProducer() methods.

19.2.2.2. Using JMS

If JNDI is used to look up the connection factory, the max rate can be configured in hornetq-jms.xml:

<connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty-connector"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>       
      </entries>
      <!-- We limit producers created on this connection factory to produce messages 
                at a maximum rate
      of 10 messages per sec -->
      <producer-max-rate>10</producer-max-rate>
 </connection-factory>

If the connection factory is directly instantiated, the max rate size can be set via the HornetQConnectionFactory.setProducerMaxRate(int consumerMaxRate) method.