Designing and implementing applications for correlator-integrated messaging for JMS
This section describes guidelines for designing and implementing applications that make use of correlator-integrated messaging for JMS.
This section describes guidelines for designing and implementing applications that make use of correlator-integrated messaging for JMS.
Correlator-integrated messaging for JMS can be used with or without the correlator’s state persistence feature. In a persistent correlator, all reliability modes can be used (both reliable and unreliable messaging), but in a non-persistent correlator only BEST_EFFORT
(unreliable) messaging is supported, and attempts to add senders or receivers using any other reliability mode will result in an error.
In a persistent correlator, information about all senders and receivers is always stored in the recovery datastore. This includes unreliable ones as well as reliable ones and statically defined ones as well as dynamic ones. This means that persistent Apama applications never need to re-create previously-added JMS senders and receivers after recovery. This will happen automatically, even for BEST_EFFORT
(unreliable) senders and receivers. For reliable senders and receivers no messages or duplicate detection information will be lost after a crash or restart.
Because sender and receiver information is stored in the database, it is not permitted to shut down a persistent correlator and then make changes such as removing static senders and receivers from the configuration file before restarting. If the ability to remove senders and receivers is required, they must be added dynamically using EPL rather than from the configuration file. However, you can add new senders and receivers to the configuration files between restarts, provided the identifiers do not clash with any previously defined static or dynamic sender or receiver.
It is never possible to change the configuration of dynamic senders or receivers after they are created. For static senders and receivers this is also mostly prohibited, with the exception that the destination of a static receiver defined explicitly in the configuration file can be changed between restarts of the correlator (provided the receiverId
and dupDetectionDomainId
remain the same).
To retain maximum flexibility, Apama recommends that customers follow the industry standard practice of using JNDI names for queues and topics. This means that it is always possible to configure any necessary redirections to allow the same logical (JNDI) name to be used in different deployment environments, such as production and deployment (for dynamic as well as static receivers).
There is no restriction on changing the connection factory or JNDI server details between restarts of a persistent correlator. By using the same JNDI names (or if necessary, queue and topic names) in all environments, but different isolated JMS and JNDI servers for production and testing, it is possible to avoid unintended interactions between the production and test environments. At the same time, this keeps the two configurations very similar and allows production datastores to be examined in the test environment if necessary.
This topic describes the details of how JMS sending integrates with correlator persistence. This information is intended for advanced users.
When sending JMS messages in a persistent correlator using any reliability mode other than BEST_EFFORT
, all events sent to a JMS sender are queued inside the correlator until the next persist cycle begins. The events cannot be passed to JMS until the EPL application state changes that caused them to be sent have been persisted, otherwise the downstream receiver might see an inconsistent set of events in the case of a failure and recovery. In addition, messages sent using any of the reliable modes are sent with the JMS PERSISTENT
delivery mode flag by default, and are guaranteed to remain in the correlator’s persistence store until they have been successfully sent to the JMS broker (or until the send failed for a reason other than connection loss).
Unique identifiers are generated and assigned to each message when they are sent, and persisted with the events to allow downstream receivers to perform EXACTLY_ONCE
duplicate detection if desired (note, this assumes the uniqueMessageId
is mapped into the JMS message in some fashion).
Once the next persist cycle has completed and both the events and the application state that caused them have been committed to disk, the events can be sent to JMS. After messages have been successfully sent to the JMS broker, they are lazily removed from the correlator’s in-memory and on-disk data structures. The latency of sent messages is therefore dependent on the time taken for the correlator to perform a persist cycle (including the persist interval, the time required to take a snapshot the correlator’s state and commit it to disk, and any retries if the correlator cannot take a snapshot for the state immediately), plus any time spent waiting to fill the batch of events to be sent (although this is usually relatively small). Note that if a message send fails and it is not due to the JMS connection being lost, then after a small number of retries it will be dropped with an ERROR
message in the log. If a send fails because the connection is down, the correlator simply waits for it to come up again in all cases.
When sending messages in a persistent correlator using BEST_EFFORT
, the behavior is different. In this case, messages are passed to JMS immediately without waiting for a correlator persistence cycle. This results in lower latency, but also means it is possible for a client receiving JMS messages sent by the correlator to see inconsistent output in the event of a correlator failure. For example, the correlator might send one set of messages with unique identifiers (for example, from integer.incrementCounter()
) but on restart send similar messages but in a different ordering, while responses from the first set of messages may then be received, resulting in mismatches between the requests and the responses being processed.
Important:
Consider carefully what behavior is required by your application, and use one of the reliable modes instead of BEST_EFFORT
if you need to avoid inconsistent output.
This topic, for advanced users, describes how JMS receiving integrates with correlator persistence.
When receiving in AT_LEAST_ONCE
or EXACTLY_ONCE
mode, messages are taken from the JMS queue or topic in batches (using JMS CLIENT_ACKNOWLEDGE
mode). The resulting Apama events are persisted in the reliable receive datastore (which is separate from the correlator’s recovery datastore) and then acknowledged back to JMS before the next batch of messages is received. After a batch of events finishes being asynchronously committed to the datastore, it is added to the input queue of each context. When the correlator next completes a persist cycle, all events that had at least been added to the input queue by the beginning of the persist cycle have been (or will be) reliably passed to the application. This means that in AT_LEAST_ONCE
mode they can be removed from the receive datastore immediately.
If EXACTLY_ONCE
is being used and the event was mapped with a non-empty uniqueMessageId
from the JMS message, the uniqueMessageId
and other metadata are stored both in memory and in the on-disk reliable datastore, and are kept there until the associated uniqueMessageId
is expired from the duplicate detector. Note however, that as an optimization, because the persisted event strings are no longer needed once the event has been included in the correlator state database, any particularly long event strings may become null in the database. The latency of received messages is therefore dependent on the time spent waiting for other messages to be received to fill the batch, and the time taken to commit the batch to the receive datastore.
When a persistent correlator is restarted and recovers its state from the recovery datastore, no new JMS messages will be received from the broker until recovery is complete. Specifically, until the correlator calls the onConcludeRecovery()
action on all EPL monitors that have defined this action. It is possible that EPL monitors will see a small number of JMS messages that were received and added to the input queue before the correlator was restarted. To be safe, any required listeners in non-persistent monitors should be set up in onBeginRecovery()
.
Since a batch of messages is acknowledged to the JMS broker as soon as they have been written to the Apama reliable receive datastore, there is no relationship between JMS message acknowledgment to the broker and when the correlator begins or completes a correlator state datastore persistence cycle. The maximum number of messages that may be received from the JMS broker but not yet acknowledged is limited by the configured maxBatchSize
(typically this is 1000 messages).
Apama applications that receive JMS messages can prevent message loss without using correlator persistence by controlling when the application acknowledges the received messages. See Receiving messages with APP_CONTROLLED acknowledgements.
Apama applications that use JMS senders with BEST_EFFORT
reliability can prevent message loss without using correlator persistence by waiting for acknowledgements that all messages sent to a JMS sender context have been sent to the JMS broker. See Sending messages reliably with application flushing notifications.
Apama applications that receive JMS messages can prevent message loss without using correlator persistence by controlling when the application acknowledges the received messages. To do this, use APP_CONTROLLED
reliability mode. With APP_CONTROLLED
reliability mode, an application can tie the sending of the JMS acknowledgement to application-defined strategies for preserving the effect of the messages. For example, an application might need to ensure JMS messages are not acknowledged to the broker until any output resulting from them has been written to a database, a distributed MemoryStore, a downstream JMS destination, or a connected correlator.
An alternative to using APP_CONTROLLED
reliability mode is to use correlator persistence with reliability mode set to AT_LEAST_ONCE
. See Using correlator persistence with correlator-integrated messaging for JMS
When reliability mode is set to APP_CONTROLLED
, applications are still entirely responsible for handling duplicate messages as well as any message re-reordering that occurs. Applications must be able to cope with any message duplication or reordering caused by the JMS provider implementation or failures in the sender, receiver or broker.
Note: If a license file cannot be found, the correlator is limited to BEST_EFFORT
only messaging. See Running Apama without a license file.
In an Apama application, a receiver that is using APP_CONTROLLED
reliability mode goes through the following cycle:
maxBatchSize
and maxBatchIntervalMillis
receiver settings. Note that regardless of the value of maxBatchIntervalMillis
, the receiver will not be suspended while no events are being received. See Advanced configuration bean properties.JMSAppControlledReceivingSuspended
event to the context that is handling the messages.JMSReceiver.appControlledAcknowledgeAndResume()
to acknowledge the message batch and resume receiving. The cycle then starts again.Following is a simple example of the application logic for responding to JMSAppControlledReceivingSuspended
events and allowing the message batch to be acknowledged after the messages have been suitably handed off to another system:
on all JMSAppControlledReceivingSuspended(receiverId="myReceiver")
{
on MyFinishedPersistingReceivedEvents(requestId=persistReceivedEventsSomehow())
{
jms.getReceiver("myReceiver").appControlledAckAndResume();
}
}
The code below shows an example of using APP_CONTROLLED
receiving, together with flush acknowledgements from the JMS sender. See Sending messages reliably with application flushing notifications. With this strategy, received JMS messages are acknowledged to the JMS broker only after the context gets an acknowledgement from the JMS sender that all the associated output messages have been sent to the JMS broker.
on all JMSAppControlledReceivingSuspended(receiverId="myReceiver")
{
on JMSSenderFlushed(requestId =
jmsConnection.getSender("mySender").requestFlush()){
jms.getReceiver("myReceiver").appControlledAckAndResume();
}
}
It is important to use the same context to process the messages from a given receiver and to call appControlledAckAndResume()
.
To improve the throughput of an APP_CONTROLLED
receiver, try adjusting the maxBatchSize
and maxBatchIntervalMillis
receiver settings. The goal is to balance the time spent receiving JMS messages and the time spent committing the results. If the batches are too small then throughput can decrease. If the batches are too large then latency can increase and the JMS broker could use excessive memory to hold the unacknowledged messages.
It is possible to use the APP_CONTROLLED
reliability mode for a receiver in a persistence-enabled correlator. In this case, process the messages and call appControlledAckAndResume()
from a non-persistent monitor. Acknowledgements cannot be controlled from a persistent monitor because the JMS acknowledgement would get out of sync with the monitor state after recovery. If you try to call appControlledAckAndResume()
from a persistent monitor an exception will be thrown.
Note: JMS messages that result in mapping failures cannot be handled by the EPL application so they are usually acknowledged automatically.
Applications that use BEST_EFFORT
reliability to send JMS messages can prevent message loss without using persistent monitors. To do this, each time an application sends a message to the JMS sender channel it also keeps the state required to re-generate the message. Periodically, the application requests the JMS sender to flush a batch of messages to the JMS broker. After all messages in this batch are sent to a JMS broker, the JMS sender sends a flush acknowledgement to the context that requested flushing. When the application receives the flush acknowledgement it executes an application-defined strategy for clearing state associated with the messages that have been sent to the JMS sender channel. This protects the application against failure of the correlator host.
Note: Messages are still asynchronously sent to the JMS broker even when no flushing has been requested. Requesting a flush simply gives the application the ability to be notified when the messages have been handed off to the JMS broker.
The typical behavior of an application that sends messages reliably without using correlator persistence is as follows:
Continuously send messages to the JMS sender channel.
At the same time, the application must keep track of the messages that have been sent to the sender channel but not yet flushed to the JMS broker. These are referred to as outstanding messages.
Also, the applications must reliably keep whatever state is required to re-generate each message. It is important to ensure that the application would not lose data if the outstanding messages were lost due to failure of the correlator node. This is typically achieved by delaying acknowledgement of the incoming JMS messages, Apama events or database/MemoryStore transactions that are generating the sent messages.
Request JMS sender to flush outstanding messages.
Periodically, for example, for every 1000 outstanding messages, the application requests that the sender flush the outstanding messages to the JMS broker. This is accomplished by invoking the JMSSender.requestFlush()
action. After sending the messages to the JMS broker, this action sends a JMSSenderFlushed
acknowledgement event to the context that requested flushing.
The application should set up a listener for the JMSSenderFlushed
event whose requestId
field is equal to the requestId
generated by the requestFlush()
action. Also, this listener needs a reference to whatever state corresponds to this batch of outstanding messages. For example, this might be a transaction id.
You must determine how many messages to send before flushing the batch. Flushing each message is not advised as it would add a noticeable performance overhead. However, you do not want to flush messages so infrequently that excessive memory or buffer space is required to hold the state associated with the outstanding messages.
Be sure to implement any required mechanism for downstream receivers to deal with duplicate messages. Typically, an application does this by adding a unique id to each message.
Continue sending events to the JMS sender channel.
In many cases, it is fine to continue sending new events to the sender channel while waiting for acknowledgement that previous batches have been flushed. That is, it is okay to have multiple batches in flight to the JMS broker at any one time. This improves throughput but is more complicated to implement. Whether it is possible to have multiple flushes in flight simultaneously in your specific application depends on what the application needs to do when it receives a JMSSenderFlushed
acknowledgement event.
Application receives a flush notification event.
When the JMS sender has finished processing all events in a batch that is being flushed to a JMS broker, it sends a JMSSenderFlushed
event to the context that invoked the requestFlush()
action. At this point, the messages are the responsibility of the JMS broker and they are safe from loss even if the correlator or other nodes fail.
The application should now remove any state associated with the messages in this batch. For example, the application can acknowledge the incoming messages that generated the messages sent to the JMS broker, or commit a database or MemoryStore transaction, or send an event that allows some other component to clear associated state from its buffers.
While this feature allows a well-designed application to prevent message loss in the case of a correlator failure, it cannot prevent message loss due to invalid mapping rules or non-existent JMS destinations. Such failures are recorded in the correlator log, but any messages associated with these failures are still included in the next flush acknowledgement, even though sending them to the JMS broker resulted in a failure. This behavior
If a recoverable failure occurs, such as loss of connection to the JMS broker, Apama keeps trying to send the messages until the connection is restored. While this might result in a long delay before the flush acknowledgement can be sent, no messages are lost. The flush acknowledgement is therefore an indication that the message batch has been fully processed by the correlator’s JMS sender to the best of its ability. The flush notification is not a guarantee that every message in the batch was successfully delivered to the broker. For example, problems in the application or in the mapping configuration might have prevented successful delivery to the JMS broker.
Sending messages reliably without using correlator persistence is available only for senders that are using BEST_EFFORT
reliability mode. Senders that are using AT_LEAST_ONCE
or EXACTLY_ONCE
reliability mode use the correlator’s persistence feature and so have no need for manual send notifications.
A call to the requestFlush()
action in a persistent monitor throws an exception. Allowing this call would cause the JMS acknowledgement state to be out of sync with the monitor state after recovery.
The code below provides an example of sending messages reliably with flushing acknowledgements.
using com.apama.correlator.jms.JMSSender;
using com.apama.correlator.jms.JMSConnection;
monitor FlushMessagesToJMSBroker {
...
// Each time the application sends an event to the JMS sender
// channel, increment the number of messages sent but not flushed.
send MyEvent() to jmsConnection.getSender("mySender").getChannel();
sendsSinceLastFlush := sendsSinceLastFlush + 1;
if sendsSinceLastFlush = 1000 {
// Stash state needed to re-send messages in case of correlator
// failure. After receiving a flush acknowledgement, this state can
// be cleared. In this example, keep a transaction id for a database.
integer transactionAssociatedWithFlushRequest := currentTransaction;
// Optionally, allow multiple flushes to be in flight concurrently.
currentTransaction := startNewTransaction();
// Request JMS sender to flush messages to the JMS broker.
// Listen for flush acknowledgement event and ensure that state
// that was saved can be cleared when the listener fires.
on JMSSenderFlushed(requestId =
jmsConnection.getSender("mySender").requestFlush()){
commitTransaction(transactionAssociatedWithFlushRequest);
}
}
}
When using sender flushing, an application can optionally set the JMS sender messageDeliveryMode
property to PERSISTENT
. This ensures that the messages are protected from loss by the JMS broker. See jms:sender
properties in XML configuration bean reference.
When using the EXACTLY_ONCE
, AT_LEAST_ONCE
or APP_CONTROLLED
reliability mode, Apama’s correlator-integrated messaging for JMS provides a “reliable” way to send messages into and out of the correlator such that in the event of a failure, any received messages whose effects were not persisted to stable storage will be redelivered and processed again, and that the events received from the correlator by external systems are consistent with the persisted and recovered state.
EXACTLY_ONCE
reliability mode, correlator-integrated messaging for JMS guarantees no message duplication within a specifically configured window size. The window size, for example, might be set to the last 2000 events or events received in the last two minutes. Note that even with the help of Apama’s EXACTLY_ONCE
functionality, JMS message duplicate detection is not a simple or automatic process and requires careful design. Customers are strongly encouraged to architect their applications to be tolerant of duplicate messages and use the simpler AT_LEAST_ONCE
reliability mode instead of EXACTLY_ONCE
when possible. (Using the APP_CONTROLLED
reliability mode for receivers is an advanced alternative. )JMSXGroupSeq
and JMSXGroupID
message properties to request the chosen JMS provider implementation to provide ordering for a group of related messages. It is not possible to provide ordering across all messages without forcing use of a single consumer, which would reduce throughput scalability.Care must be taken when designing, configuring and testing the application to ensure it can cope with significant fluctuations in message rates, as well as serious failures such as network or component failures that lasts for several minutes, hours or days. Consider using JMS message expiry to avoid flooding queues with unnecessary or stale messages on recovery after a long period of down time.
The correlator input log can be used in applications that use most correlator-integrated messaging for JMS features including sending, receiving and listening for status events. The input log will include a record of all events that were received from JMS so there is no need for JMS to be explicitly enabled with the --jmsConfig
option when performing replay. Instead, the resulting input log can be extracted and used in the normal way, without the --jmsConfig
option. Attempting to perform replay with correlator-integrated messaging for JMS is not supported and is likely to fail, especially with reliable receivers in a persistent correlator.
Note that the “dynamic” capabilities of correlator-integrated messaging for JMS do not currently work in a replay correlator (because an EPL plug-in is used behind the scenes), so if you need to retain the possibility of using an input log you must not use dynamic senders and receivers or call the JMSSender.getOutstandingEvents()
method.
Apama provides an EXACTLY_ONCE
receiver reliability setting that allows a finite number of duplicate messages to be detected and dropped before they get to the correlator. This setting can be used to reduce the chance of duplicates; however with JMS, duplicate detection is a complex process. Therefore, customers are strongly encouraged to architect their applications to be tolerant of duplicate messages and use the simpler AT_LEAST_ONCE
reliability mode instead of EXACTLY_ONCE
when possible.
Configuring duplicate detection is an inexact science given that it depends considerably on the behavior of the sender(s) for a queue, and requires careful architecture and sizing to ensure robust operation in normal use and expected error cases. Moreover it is not possible to guarantee duplicate messages will never be seen without an infinite buffer of duplicates. Give particular attention to architectures where multiple sender processes are writing to the same queue, especially if it is possible that one sender may send a duplicate message it has taken off another failed sender that has not recorded the fact that it is already processed and sent out a given message.
Duplicate detection is a trade-off between probability of an old duplicate not being recognized as such, and the amount of memory and disk required, which will also have an impact on latency and throughput.
Selecting the right value for the dupDetectionExpiryTimeSecs
is a very important aspect of ensuring that the duplicate detection process will operate reliably — detecting duplicates where necessary without running out of memory when something goes wrong. The expiry time used for the duplicate detector should take into account how the JMS provider will deal with several consecutive process or connection failures on the receive side, especially if the JMS provider temporarily holds back messages for failed connections in an attempt to work around temporary network problems. Be sure to consult the documentation for the JMS provider being used to understand how it handles connection failures. It is a good idea to conduct tests to see what happens when the connection between the JMS broker and the correlator goes down. When testing, consider using the “rMaxDeliverySecs=
” value from the “JMS Status:
” line in the correlator log to help understand the minimum expiry time needed to catch redelivered duplicates. Note, however, this is only useful if the JMS provider reliably sets the JMSRedelivered
flag when performing a redelivery. A good rule of thumb is to use an expiry time of two to three times the broker’s redelivery timeout.
Note that although space within the reliable receive (duplicate detection) datastore is reclaimed and reused when older duplicates expire, the file size will not be reduced. There is currently no mechanism for reducing the amount of disk space used by the database, so the on-disk size may grow, bounded by the peak duplicate detector size, but will not shrink.
Messages that are subject to duplicate detection contain:
uniqueMessageId
- an application-level identifier which functions as the key for determining whether a message is functionally equivalent (or identical) to a message already processed, and should therefore be ignored.messageSourceId
- an optional application-specific string which acts as a key to uniquely identify upstream message senders. This could be a standard GUID (globally unique identifier) string. If provided, the messageSourceId
is used to control the expiry of uniqueMessageId
s from the duplicate detection cache, allowing dupDetectionPerSourceExpiryWindowSize
messages to be kept per messageSourceId
. This massively improves the reliability of the duplicate detection while keeping the window size relatively small, since if one sender fails then recovers several hours later, there is no danger of another (non-failed) sender filling up the duplicate detection cache in the meantime and expiring the ids of the first sender causing its duplicates to go undetected.The key configuration options for duplicate detection are:
dupDetectionPerSourceExpiryWindowSize
- The number of messages that will be kept in each duplicate detection domain per messageSourceId
(if messageSourceId
is set on each message by the upstream system - messages without a messageSourceId
will all be grouped together into one window for the entire dupDetectionDomainId
). This property is specified on the global JmsReceiverSettings
bean. It is usually configured based on the characteristics of the upstream JMS sender, and the maximum number of in-doubt messages that it might resend in the case of a failure. The default value in this release is 2000. It can be set to 0 to disable the fixed-size per-sender expiry window.dupDetectionExpiryTimeSecs
- The time for which uniqueMessageId
s will be remembered before they expire. This property is specified on the global JmsReceiverSettings
bean. The default value in this release is 2 minutes. It can be set to 0 to disable the time-based expiry window (which makes it easier to have a fixed bound on the database size, though this is not an option if the JMS provider itself causes duplicates by redelivering messages after a timeout due to network problems).dupDetectionDomainId
- An application-specific string which acts as a key to group together receivers that form a duplication detection domain, for example, a set of receivers that must be able to drop duplicate messages with the same uniqueMessageId
(which may be from one, or multiple upstream senders). This property is specified on the jms:receiver
bean. By default, the duplicate detection domain is always the same as the JMS destination name and connectionId
, so cross-receiver duplicate detection would happen only if multiple receivers in the same connection are concurrently listening to the same queue; duplicates would not be detected if sent to a different queue name, or if sent to the same queue name on a different connection, or if JNDI is used to configure the receiver but the underlying JMS name referenced by the JNDI name changes. Also note that if the message streams processed by each receiver were being partitioned using message selector, unnecessary duplicate detection would be performed in this case. The duplicate detection domain name can be specified on a per-receiver basis to increase, reduce or change the set of receivers across which duplicate detection will be performed. Common values are:
dupDetectionDomainId=connectionId+":"+jmsDestinationName
- the default for queues.dupDetectionDomainId=jmsDestinationName
- if using receivers to access the same queue from multiple separate connections.dupDetectionDomainId=jndiDestinationName
- if using JNDI to configure receiver names, and needing the ability to change the queue or topic that the JNDI name points to.dupDetectionDomainId=connectionId+":"+receiverId
- the default for topics; also used if each receiver should check for duplicates independently of other receivers. This is useful if receivers are already using message selectors to partition the message stream, which implies that cross-receiver duplicates are not possible.dupDetectionDomainId=*<application-defined-name>*
- if using multiple receivers per selector-partitioned message stream. The name is likely to be related to the message selector expression.Duplicate detection only works if the upstream JMS sender has specified a uniqueMessageId
for each message (the uniqueMessageId
is typically as a message property, but could alternatively be embedded within the message body if the mapper is configured to extract it). Any messages that do not have this identifier will not be subject to duplicate detection. The uniqueMessageId
string is expected to be unique across all messages within the configured dupDetectionDomainId
(for example, queue
), including messages with different messageSourceId
s. By default, sent JMS messages would have a uniqueMessageId
of seqNo:messageSourceId
, where *seqNo*
is a contiguous sequence number that is unique for the sender, for example:
uniqueMessageId=1:mymachinename1.domain:1234:567890:S01
uniqueMessageId=2:mymachinename1.domain:1234:567890:S01
uniqueMessageId=3:mymachinename1.domain:1234:567890:S01
uniqueMessageId=1:mymachinename2.domain:4321:987654:S01
uniqueMessageId=2:mymachinename2.domain:4321:987654:S01
uniqueMessageId=1:mymachinename2.domain:4321:987654:S02
uniqueMessageId=2:mymachinename2.domain:4321:987654:S02
...
To reliably perform duplicate detection if there are multiple senders writing to the same queue (without the Apama receiver having to configure a very large and therefore costly time window to prevent premature expiry of ids from a sender that has failed and produces no messages for a while then recovers, possibly sending duplicates as it does so), the upstream senders should be configured to send with a globally-unique messageSourceId
identifying the message source/sender, which should also be configured in the mapping layer of the receiver.
Apama’s duplicate detection involves a set of fixed-size per-sourceId queues, and when the queue is full the oldest items are expired to a shared queue ordered by timestamp (time received by the correlator’s JMS receiver) whose items are expired based on a time window. So the receiver settings controlling duplicate detection window sizes are:
dupDetectionPerSourceExpiryWindowSize
dupDetectionExpiryTimeSecs
uniqueMessageId
s are expired from the per-source queue (and moved to the time-based queue) when it is full of newer ids, or when a newer message with the same uniqueMessageId
already in the queue for that source is received.
uniqueMessageId
s are expired from the time-based queue (and removed from the database permanently) when they are older than the newest item in the time-based queue by more than dupDetectionExpiryTimeSecs
.
When designing an application that uses correlator-integrated messaging for JMS it may be relevant to consider the following topics that relate to performance issues.
There are no guarantees about maximum latency. Persistent JMS messages inevitably incur significant latency compared to unreliable messaging, and Apama’s support for JMS is focused around throughput rather than latency. Messages can be held up unexpectedly by many factors such as: the JMS provider; by connection failures; by waiting a long time for the receive-side commit transaction; by the broker acknowledge()
call taking a long time; or by waiting a long time for the correlator to do an in-memory copy of its state.
Multiple receivers on the same queue may improve performance. But consider that “For PTP, JMS does not specify the semantics of concurrent QueueReceivers
for the same Queue
; however JMS does not prohibit a provider from supporting this. Therefore, message delivery to multiple QueueReceivers
will depend on the JMS provider’s implementation. Applications that depend on delivery to multiple QueueReceivers
are not portable”.
WARN
and ERROR
messages, which may indicate your application or configuration has a connection problem that may be responsible for the performance problem.DEBUG
logging enabled or is logging all messages. Either of these will obviously cause a big performance hit. Apama recommends running the correlator at INFO
log level; this avoids excessive logging, but still retains sufficient information that may be indispensable for tracking problems.MapMessage
or a TextMessage
containing an Apama event string.iq=
") is a strong indicator that the application may not be consuming messages fast enough from JMS.logPerformanceBreakdown
setting in JmsSenderSettings
and JmsReceiverSettings
to provide detailed low-level information about which aspects of sending and receiving are the most costly. This may indicate whether the main bottleneck, and hence the main optimization target, is in the message mapping or in the actual sending or receiving of messages. If mapping is not the main problem, it may be possible to achieve an improvement by customizing some of the advanced sender and receiver properties such as maxBatchSize
and maxBatchIntervalMillis
.maxExtraMappingThreads
to perform the mapping of received JMS messages on one or more separate threads. This is especially useful when dealing with large or complex XML messages.engine_connect
can be used to create a fast channel from the receiver back to the sender, and the test system can be set to send Apama events to the sender channel every 0.5 seconds so it knows how many events have been received so far. This allows better performance testing with a bound on the maximum number of outstanding messages (sent but not yet received) to prevent the broker being overwhelmed.The JmsSenderSettings
and JmsReceiverSettings
configuration objects both contain a property called logPerformanceBreakdown
which can be set to true
to enable measurement of the time taken to perform the various operations required for sending and receiving, with messages logged periodically at INFO
level with a summary of measurements taken since the last log message. The default logging interval is once per minute.
Although this property should not be enabled in a production system where performance is a priority because the gathering of the performance data adds unnecessary overhead, it can be indispensable during development and testing for demonstrating what each sender and receiver thread is spending its time doing. To produce more useful statistics, note that the first batch of messages sent or received after connection may be ignored (which will affect all statistics logged, including the number of messages received and throughput). All times are measured using the standard Java System.nanoTime()
method, which should provide the most accurate time measurements the operating system can achieve, though not usually to nano second accuracy. For more information on the logPerformanceBreakdown
property, see XML configuration bean reference.
Each receiver performance log message has a low-level breakdown of the percentage of thread time spent on various aspects of processing each message and message batch, as well as a summary line stating the (approximate) throughput rate over the previous measurement interval, and an indication of the minimum, mean (average) and maximum number of events in each batch that was received.
The items that may appear in the detailed breakdown are:
RECEIVING
- time spent in the JMS provider’s MessageConsumer.receive()
method call for each message received.MAPPING
- time spent mapping each JMS message to the corresponding Apama event. If maxExtraMappingThreads
is set to a non-zero value then this is the time spent waiting for remaining message mapping jobs to complete on their background thread(s) at the end of each batch.DB_WAIT
- (only for reliable receive modes) time spent waiting for background reliable receive database operations (writes, deletes, etc) to complete, per batch.DB_COMMIT
- (only for reliable receive modes)time spent committing (synching) received messages to disk at the end of each batch.APP_CONTROLLED_BLOCKING
- for receivers that are using APP_CONTROLLED
reliability mode, this is the time spent waiting for the EPL monitor to call the appControlledAckAndResume()
action. A monitor calls this action after it finishes processing a batch of messages from the receiver.ENQUEUING
- (only for BEST_EFFORT
and APP_CONTROLLED
receive mode) time spent adding received messages to each public context’s input queue.JMS_ACK
- time spent in the JMS provider’s Message.acknowledge()
method call at the end of processing each batch of messages.R_TIMEOUTS
- the total time spent waiting for JMS provider to complete MessageConsumer.receive()
method calls that timed out without returning a message from the queue or topic, per batch. Indicates either that Apama is receiving messages faster than they are added to the queue or topic or that the JMS provider is not executing the receive (timeout) call very efficiently or failing to return control at the end of the requested timeout period.FLOW_CONTROL
- the total time spent (before each batch) blocking until the EPL application increases the flow control window size by calling JMSReceiverFlowControlMarker.updateFlowControlWindow(...)
. In normal usage, this should be negligible unless some part of the system has failed or the application is not updating the flow control window correctly.TOTAL
- aggregates the total time taken to process each batch of received messages.Each sender performance log message has a low-level breakdown of the percentage of thread time spent on various aspects of processing each message and message batch, as well as a summary line stating the approximate throughput rate over the previous measurement interval, and an indication of the minimum, mean (average) and maximum number of events in each batch that was sent.
The items that may appear in the detailed breakdown are:
MAPPING
- time spent mapping each Apama event to the corresponding JMS message. This includes the time spent looking up any JMS queue, topic, or JNDI destination names, unless cached.SENDING
- time spent in the JMS provider’s MessageProducer.send()
method call for each message.JMS_COMMIT
- time spent in the JMS provider’s Session.commit()
method call for each batch of sent messages (only if a JMS TRANSACTED_SESSION
is being used to speed up send throughput).WAITING
- the total time spent waiting for the first Apama event to be passed from EPL to the JMS runtime for sending, per batch. This is affected by what the EPL code is doing, and for reliable sender modes, also by the (dynamically tuned) period of successful correlator persist cycles.BATCHING
- the total time spent waiting for enough Apama events to fill each send batch, after the first event has been passed to the JMS runtime.TOTAL
- aggregates the total time taken to process each batch of sent messages.Sometimes it is necessary to specify Java system properties to configure a JMS provider’s client library, or to change JVM options such as the maximum memory heap size. Because these settings inevitably affect all JMS providers that the correlator is connecting to, Java options must be specified on the correlator command line rather than in a JMS connection’s configuration file.
Each Java option to be passed to the correlator should be prefixed with -J
on the command line, for example, -J-Dpropname=propvalue -J-Xmx512m
. To set Java options when starting the correlator from Apama Plugin for Eclipse, edit the Apama launch configuration for your project as described below.
To edit the launch configuration
In the Project Explorer, right-click the project name and select Run As > Run Configurations. The Run Configurations dialog is displayed.
In Run Configurations dialog, in the Project field, make sure the your project is selected.
On Run Configurations dialog’s Components tab, select the correlator to use and click Edit. The Correlator Configuration dialog is displayed.
In the Correlator Configuration dialog, in the Extra command line arguments field, add the system property (for example, -J-Dpropname=propvalue -J-Xmx512m
) and click OK.