• en | es

Spring Integration based Dispatcher-Worker with Worker Queues

In the back-office world the central concept in most of the systems is one of a Trade. A Trade has many events (e.g. Inception, Amend, Novation, Termination). Generally events from different trades can be processed in parallel because they have no interdependencies, however, events from the same trade cannot be processed in parallel due to the fact that they modify the same internal instance of a Trade.

A useful pattern for this kind of scenario is dispatcher-worker with worker queues. Each worker has a job queue which it processes in a sequential fashion. Each job queue only contains events for a single trade. This allows parallel processing across trades while maintaining sequential processing on events for a single trade.


I've developed simple version of this concept using Spring Integration. The first step is to create a Router that routes inbound trade events into channels that are specific to a trade. If the channel doesn't exist then the Router will create a new one and register it with the Spring framework.

public String dispatch(CustomMessage inputMessage) {
  String channelName = inputMessage.getId() + CHANNEL_SUFFIX;

synchronized (channelName.intern()) { if (activeChannels.get(channelName) == null) { QueueChannel activeChannel = createNewChannel(channelName); PollingConsumer activeConsumer = createConsumerWithWorker(inputMessage, activeChannel); activeConsumer.start(); } }

return channelName; }

Creation of a channel is the only place where synchronisation is required. We only synchronise on the channel name which corresponds to the trade id. Hence contention is minimal. I also attach a Polling Consumer to the channel at the point that the channel is created. Creation of a channel and registering it to Spring framework is quite straight forward as shown in the snippet below:

private QueueChannel createNewChannel(String channelName) {
  QueueChannel activeChannel = new QueueChannel();
  activeChannels.put(channelName, activeChannel);
  applicationContext.getBeanFactory().registerSingleton(channelName, activeChannel);
  return activeChannel;

Although I attach a Polling Consumer to each channel. We don't have to have a thread per channel. We can use a Task Executor to run the polling consumers which will allow much better control over the number of concurrent threads in the system using a thread pool:

private void startConsumingFromChannel(final String consumerName, final PollingConsumer activeConsumer) {
  applicationContext.getBeanFactory().registerSingleton(consumerName, activeConsumer);

Finally (not yet implemented) you can run a Reaper Thread that can remove channels and consumers that have not seen activity for a specified threshold. You can also back the inbound channel with a Message Store to ensure that the system can come backup in a consistent state on failure.

The source code is at Github.

About the author

Mash is a pragmatic software craftsman always looking to improve his software creation skills and helping others do the same. He firmly believes that a well-rounded software craftsman must have a keen interest in all aspects of software creation, including; process, people, technology, user experience, development, operation, maintenance, and social impact. He relishes the daily challenges that Codurance brings to him–stretching his existing knowledge and expertise allowing him to constantly grow as a professional.

Mash is an advisor and a leader. During his diverse career, he has succeeded in invigorating large ailing software projects as well as creating highly effective software teams and departments. His broad and deep technical knowledge, organisational skills, craft focus, and empathy to people involved have been integral to his success. He has worked in many roles for charities, investment banks, consultancies, government, media and cloud providers. He prides himself at being a hands-on software developer and believes that software development skills are very hard to learn and the best way to maintain them is to apply them.