Version

    JMS Message Listeners

    JMS Message Listeners allow you to listen for incoming JMS messages. You can specify the source of the messages (JMS Topic or JMS Queue) and a task that will be executed for incoming messages.

    JMS messaging requires a JMS API (javax.jms-api-2.0.jar) and third-party libraries. All these libraries must be available on classpath of an application server and classpath of Worker. Libraries must be added before starting the CloverDX Server.

    JMS Message Listeners on Worker

    JMS Message Listener always runs on Server Core, but you can use it to execute jobs on Worker. In such a case, make sure that the required .jar libraries are available on the Worker’s classpath as well. For more information, see Adding Libraries to the Worker’s Classpath.

    In Cluster, you can either explicitly specify which node will listen to JMS or not. If unspecified, all nodes will register as listeners. In the case of JMS Topic, all nodes will receive all messages and trigger the tasks (multiple instances) or, in the case of JMS Queue, a random node will consume the message and run the task (just one instance).

    JMS Message Listener Modes

    JMS Message Listener supports two modes which influence its life cycle. You can choose between these modes by enabling/disabling the Trigger new task for every message option:

    Table 52. JMS Message Listener Modes
    Mode Description

    Non-batch (default)

    Trigger new task for every message: enabled

    The listener connects to the JMS source (queue/topic) and responds to each incoming message by triggering the selected task (e.g. graph or jobflow). The connection to the source is not interrupted during the process.

    Batch

    Trigger new task for every message: disabled

    The listener connects to the JMS source (queue/topic) and waits for a message. Once the message is received, it stays in the queue, the listener disconnects from the source and triggers the selected task (e.g. graph or jobflow). Once the task is finished, the listener reconnects to the source and waits for another message. This way, the triggered task can use JMSReader component to connect to the same source and consume large number of messages, increasing performance.

    The listener is disconnected from the source for the duration of the triggered task, up to the task.jms.callback.timeout duration.

    Attributes of JMS Message Listener

    Below is the list of attributes which can be set up for JMS Message Listeners:

    Table 53. Attributes of JMS Message Listener
    Attribute Description

    Initialize by

    This attribute is useful only in a Cluster environment. It is a node ID or list of node IDs where the listener should be initialized. If it is not set, the listener is initialized on all nodes in the Cluster.

    In the Cluster environment, each JMS event listener has Initialize by attribute which may be used to specify the Cluster node which will consume messages from the queue/topic. There are the following possibilities:

    • All nodes: All Cluster nodes with status "ready" consume messages concurrently.

    • One of selected nodes: Just one node ID specified - Only the specified node may consume messages, however the node status must be "ready". When the node isn’t ready, messages aren’t consumed by any Cluster node.

    • One of selected nodes: Multiple node IDs specified - Just one of the specified nodes consumes messages at a time. If the node fails for any reason (or its status isn’t "ready"), any other "ready" node from the list continues with consuming messages.

    In a standalone environment, the Initialize by attribute is ignored.

    JNDI Access

    Initial context

    Default or custom

    Initial context factory class

    A full class name of the javax.naming.InitialContext implementation. Each JMS provider has its own implementation. For example, Apache MQ has org.apache.activemq.jndi.ActiveMQInitialContextFactory. If it is empty, the Server uses a default initial context.

    The specified class must be on the application-server classpath. It is usually included in one library with a JMS API implementation for each specific JMS provider.

    Broker URL

    A URL of a JMS message broker

    Listen To

    Connection factory

    A JNDI name of a connection factory. It depends on the name of the registered jndi resource, e.g. java:comp/env/jms/MyOwnConnectionFactory

    Username

    A username for a connection to a JMS message broker. For some providers it is enough to define username in the JNDI resource definition and this attribute can be left empty.

    Password

    A password for a connection to JMS message broker. For some providers it is enough to define password in the JNDI resource definition and this attribute can be left empty.

    Queue/Topic

    A JNDI name of a message queue/topic. It depends on the name of the registered jndi resource, e.g. java:comp/env/jms/MyQueue

    Durable subscriber

    If false, the message consumer is connected to the broker as 'non-durable', so it receives only messages which are sent while the connection is active. Other messages are lost.

    If the attribute is true, the consumer is subscribed as 'durable' so it receives even messages which are sent while the connection is inactive. The broker stores such messages until they can be delivered or until the expiration is reached.

    This switch is useful only for Topics destinations, because Queue destinations always store messages until they can be delivered or the expiration is reached.

    Please note that consumer is inactive e.g. during server restart and during short moment when the user updates the "JMS message listener" and it must be re-initialized. So during these intervals, the message in the Topic may get lost if the consumer does not have the durable subscription.

    If the subscription is durable, client must have ClientId specified. This attribute can be set in different ways in dependence on JMS provider. E.g. for ActiveMQ, it is set as a URL parameter tcp://localhost:1244?jms.clientID=TestClientID.

    Subscriber name

    Available only when Durable subscriber is true. By default, a durable subscriber name is generated automatically in the subscr_[clusterNodeId]_[listenerId] format; therefore, a subscriber has a different name on each Cluster node. Using this attribute, you can specify a custom subscriber name that will be identical on all Cluster nodes.

    Message selector

    This query string can be used as a specification of conditions for filtering incoming messages. Syntax is well described on Java EE API web site. It has different behavior depending on the type of consumer (queue/topic):

    Queue: Messages that are filtered out remain in the queue.

    Topic: Messages filtered out by a Topic subscriber’s message selector will never be delivered to the subscriber. From the subscriber’s perspective, they do not exist.

    Message Processing

    Number of consumers

    How many consumers will be initialized on every cluster node registered as a listener.

    Groovy code

    A Groovy code may be used for additional message processing and/or for refusing a message. Both features are described below.

    Optional Groovy code

    Groovy code may be used for additional message processing or for refusing a message.

    • Additional message processing: Groovy code may modify/add/remove values stored in the containers "properties" and "data".

    • Refuse/acknowledge the message: If the Groovy code returns Boolean.FALSE, the message is refused. Otherwise, the message is acknowledged. A refused message may be redelivered, however the JMS broker should configure a limit for redelivering messages. If the Groovy code throws an exception, it’s considered a coding error and the JMS message is NOT refused because of it. So, if the message refusal is to be directed by some exception, it must be handled in Groovy.

    Table 54. Variables accessible in Groovy code
    Type Key Description

    javax.jms.Message

    msg

    Instance of a JMS message

    java.util.Properties

    properties

    See below for details. It contains values (String or converted to String) read from a message and it is passed to the task which may then use them. For example, the execute graph task passes these parameters to the executed graph.

    java.util.Map<String, Object>

    data

    See below for details. Contains values (Object, Stream, etc.) read or proxied from the message instance and it is passed to the task which may then use them. For example, the execute graph task passes it to the executed graph as dictionary entries.

    javax.servlet.ServletContext

    servletContext

    An instance of ServletContext.

    com.cloveretl.server.api.ServerFacade

    serverFacade

    An instance of serverFacade usable for calling CloverDX Server core features.

    java.lang.String

    sessionToken

    SessionToken needed for calling serverFacade methods

    Message data available for further processing

    A JMS message is processed and the data it contains is stored into two data structures: Properties and Data.

    Table 55. Properties Elements
    Key Description

    JMS_PROP_[property key]

    For each message property, one entry is created where "key" is made of the JMS_PROP_ prefix and property key.

    JMS_MAP_[map entry key]

    If the message is an instance of MapMessage, for each map entry, one entry is created where "key" is made of the JMS_MAP_ prefix and map entry key. Values are converted to String.

    JMS_TEXT

    If the message is an instance of TextMessage, this property contains content of the message.

    JMS_MSG_CLASS

    A class name of a message implementation.

    JMS_MSG_CORRELATIONID

    Correlation ID is either a provider-specific message ID or an application-specific String value.

    JMS_MSG_DESTINATION

    The JMSDestination header field contains the destination to which the message was sent.

    JMS_MSG_MESSAGEID

    JMSMessageID is a String value that should function as a unique key for identifying messages in a historical repository. The exact scope of uniqueness is provider-defined. It should at least cover all messages for a specific installation of a provider, where an installation is some connected set of message routers.

    JMS_MSG_REPLYTO

    A destination to which a reply to this message should be sent.

    JMS_MSG_TYPE

    A message type identifier supplied by the client when the message was sent.

    JMS_MSG_DELIVERYMODE

    The DeliveryMode value specified for this message.

    JMS_MSG_EXPIRATION

    The time the message expires, which is the sum of the time-to-live value specified by the client and the GMT at the time of the send.

    JMS_MSG_PRIORITY

    The JMS API defines ten levels of priority value (0 = lowest, 9 = highest). In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority.

    JMS_MSG_REDELIVERED

    true if this message is being redelivered.

    JMS_MSG_TIMESTAMP

    The time a message was handed off to a provider to be sent. It is not the time the message was actually transmitted, because the actual send may occur later due to transactions or other client-side queueing of messages.

    Note that all values in the “Properties” structure are stored as a String type – however they are numbers or text.

    For backwards compatibility, all listed properties can also be accessed using lower-case keys; however, it is a deprecated approach.

    Table 56. "Data" elements
    Key Description

    JMS_DATA_MSG

    An instance of javax.jms.Message.

    JMS_DATA_STREAM

    An instance of java.io.InputStream. Accessible only for TextMessage, BytesMessage, StreamMessage, ObjectMessage (only if a payload object is an instance of String). Strings are encoded in UTF-8.

    JMS_DATA_TEXT

    An instance of String. Only for TextMessage and ObjectMessage, where a payload object is an instance of String.

    JMS_DATA_OBJECT

    An instance of java.lang.Object - message payload. Only for ObjectMessage.

    The Data container is passed to a task that can use it, depending on its implementation. For example, the task execute graph passes it to the executed graph as dictionary entries.

    In a Cluster environment, you can explicitly specify node IDs, which can execute the task. However, if the data payload is not serializable and the receiving and executing node differ, an error will be thrown as the Cluster cannot pass the data to the executing node.

    Inside a graph or a jobflow, data passed as dictionary entries can be used in some component attributes. For example, the File URL attribute would look like: "dict:JMS_DATA_STREAM:discrete" for reading the data directly from the incoming JMS message using a proxy stream.

    If the graph is executed on Worker, the dictionary entries must be serializable; otherwise, they cannot be passed to the graph.

    For backwards compatibility, all listed dictionary entries can also be accessed using lower-case keys; however, it is a deprecated approach.

    Using JMS Message Listener with RabbitMQ

    Connecting to RabbitMQ queues is possible using client library: https://github.com/rabbitmq/rabbitmq-jms-client

    The JNDI resource for RabbitMQ connection factory requires additional configuration to work seamlessly with JMS Message Listener. Example JNDI resource definitions for connection factory and a queue, for Tomcat:

    <Resource name="jms/MyConnectionFactory"
              type="javax.jms.ConnectionFactory"
              factory="com.rabbitmq.jms.admin.RMQObjectFactory"
              username="guest"
              password="guest"
              virtualHost="/"
              host="hostname_of_broker_goes_here"
              port="5672"
              onMessageTimeoutMs="3600000"
              keepTextMessageType="true"
              terminationTimeout="0"
              channelsQos="1"
              />
    
    <Resource name="jms/MyQueue"
              type="javax.jms.Queue"
              factory="com.rabbitmq.jms.admin.RMQObjectFactory"
              destinationName="whatever"
              amqp="true"
              amqpQueueName="MyQueueName"
              amqpExchangeName=""
              amqpRoutingKey="MyQueueName"
              />

    Explanation of additional configuration properties:

    • onMessageTimeoutMs - Lets the JMS listener wait until triggered graph finishes before processing next message. This configuration is needed for proper function of the listener.

    • channelsQos - Prevents processing of prefetched messages after disabling the listener. This configuration is needed for proper function of batch mode.

    • terminationTimeout - Prevents delay between finish of triggered task and processing of next message. This configuration is needed for proper function of batch mode.

    • keepTextMessageType - Allows sending of TextMessage instead of BytesMessage type. This configuration is needed only if you use Send JMS Message task.

    To find official descriptions of the configuration properties and more information, see RabbitMQ documentation.

    JMS is a complex topic that goes beyond the scope of this document. For more detailed information about JMS, refer to the Oracle website: https://docs.oracle.com/javaee/7/tutorial/jms-concepts.htm#BNCDQ