Aggregator Producer

Aggregator Producer

The Aggregator Producer is used for combining several different messages into one message. This producer uses a number of pluggable functionalities for this.

The functionalities determine how the aggregator producer decides to make a composite message, how to determine which incoming messages belong together and where incoming messages are temporarily stored.

The Aggregator producer will receive messages and inspect certain headers. Whenever these headers are present, the message will be collected in order to be aggregated:

  • sequenceSize : total size of set that needs to be aggregator
  • sequenceNumber: the number/positition of this message in the total set
  • correlationId: the unique ID to correlate these messages together to become one set.

The producer will wait for all messages to arrive, regardeless of the order they arrive in. It will automatically correlate these messaging using the sequenceSize and sequenceNumber to place them in the correct order for Aggregation. When all messages are received, the aggregator will put these in order and place all individual message parts into a new ConnectMessage containing 'sequenceSize' messageparts, one for each originally correlated message and this new message will be sent to the Output Flow.

The aggregator will wait for this process to complete as specified in the Timeout property, 0 meaning that it will wait indefinitely. Once timeout has passed, the incomplete message will be sent to the Discard Flow.

In the table below, you will find an explanation of these properties. All attributes with a ‘*’ are mandatory.

Attribute

Description

Name*

By default, we fill this out with the technical ‘tag’, followed by a serial number. Changing the name is optional.

Enabled

Check this box if you want this service to be enabled.

Output Flow*

Reference to a flow were the complete aggregated message is sent to after aggregation.

Discard Flow

Reference to a flow were an incomplete message is sent to after a time-out.

Correlation Property

When no correlation-strategy is set a different correlation property than the default correlationId can be set.

Timeout

Time-out in milliseconds for the incomplete messages to be sent to discard flow. If 0 (default) no timeout is used. Incomplete messages will reside till restart.

MessagePart

Name of the MessagePart in a ConnectMessage where the content of the file is being stored.

Advanced properties

In the next list you will find the Advanced options.

Attribute Description
Aggregator Reference

Reference to an aggregator bean. The bean must be an extension of the AbstractMyESBAggregator. When not set the default MyESBAggregator is used. This implementation will add all found messages in group as separate messageparts to a ConnectMessage. The part id's will be msgprt[idx]. The type of the part will be according to the message type. Headers of individual messages will be set on ConnectMessagePart. A composition of all headers will be set on the ConnectMessage.

MessageStore Name of the MessagePart in a ConnectMessage where the content of the file is being stored.
Correlation Strategy

Reference to a correlation strategy bean. The bean must be an extension of the AbstractMyESBCorrelationStrategy. When not set a default MyESBPropertyCorrelationStrategy is used with default correlation-property correlationId.

Release Strategy

Reference to a release strategy bean. The bean must be an extension of the AbstractMyESBReleaseStrategy. When not seta default SequenceSizeReleaseStrategy is used.

This implementation consults the sequenceNumber and sequenceSize properties of each arriving message to decide when a message group is complete and ready to be aggregated

Header properties

If you want to use the aggregator the following properties must be available in the header of the message.

Properties value
correlationId Unique ID that connects the messages / files with each other.
sequenceNumber Place in the sequence.
sequenceSize Total sequence size.