http://blogs.mulesoft.org/tag/integration-patterns/
Wikipedia
Search results
Thursday, 12 June 2014
How to build a Processing Grid (An Example)
Here is the overall architecture of what we intend to build:
Read on to discover how easy Mule makes the construction of such a processing grid…
[ The complete application is available on GitHub. In this post, we will detail and comment some aspects of it. ]There are two main flows in the application we are building:
- Job Creation – where an image can be uploaded and a resize request created,
- Job Worker – where the image resizing is done and the result emailed back to the user.
Submission Form
Mule is capable of service static content over HTTP, using a message processor called the static resource handler.multipart/form-data encoding type. Let’s know look at the flow that takes care of receiving these submissions.Job Creation
The flow below shows what happens when a user uploads an image to Mule:- The
multipart/form-dataencoded HTTP request is received by an HTTP inbound endpoint, - The image is persisted in S3,
- The job meta-data (unique ID, target size, user email, image type) is stored in a JSON object and sent over SQS,
- The success of the operation is logged (including the ID for traceability),
- A simple textual content is assembled and returned to the caller: it will simply be displayed in the browser of the user.
Job Worker
The worker flow is the longest one:- The job meta data is received from SQS, deserialized to a
java.util.Mapand stored in a flow variable for ulterior access, - The source image is retrieved from S3,
- It is then resized and the result is attached to the current message, which will automatically be used as an email attachment,
- A simple confirmation text is set as the message payload, which will be used as the email content,
- The email is sent over SMTP,
- The success of the operation is recorded,
- And finally the source image is deleted from S3.
Note that SQS guarantees that message delivery occurs at least once. This means that it’s possible that a similar job gets delivered more than once. In our particular case, this would mean that we would try to perform the resize several times, potentially not finding the deleted image or, if finding it, sending the resized image several time. This is not tragic so we can live with that but, if that would be a problem, we would simply add an idempotent filter right after the SQS message source, using the unique job ID as the unique message ID in the filter.
Conclusion
As detailed by Ross in the aforementioned series of posts, Mule’s reach extends way beyond pure integration. We’ve seen in this post that Mule provides a wide range of primitives that can be used to build processing grids.Have you built such a Mule-powered processing grid? If yes, please share the details in the comment section of this post!
Batch processing performance in the cloud
But first, if you don’t know what batch is, please read the great Batch Blog from our star developer Mariano Gonzalez, and for any other concerns you also have the documentation.
Excited? Great! Now we can start with the details, this performance test was run on a CloudHub’s Double worker, using the default threading profile of 16 threads. We will compare the on-premise vs cloud performance. Henceforth we will talk about HD vs CQS performance. Why? On-Premise and CloudHub users will be using by default the HardDisk for temporal storage and resilience but, this is not very useful on CloudHub as if for any reason the the worker is restarted, the current job will loose all its messages, then if Persistent Queues are enabled the Batch module will automatically store all the data with CQS (Cloud Queue Storage) to achieve the expected resilience.
The app used for the test have a constant processing time for each “step” and “commit” phase, allowing us to decouple, the time variations of using different services or record sizes. It was run modifying with two different parameters:
- Amount of records: How many records are loaded on the job, all of them with the exact same size.
- Record size: The normal distribution in bytes that is contained on the records sent to a job.
Now that everything is clear, lets start checking out the HD performance by its own, it was run with jobs of different Record sizes (on a logarithmic base) and 100.000 records each, which we assumed would be a standard load for a job:
The image speaks for itself, the performance is pretty flat, for most of the Record sizes, increasing with really big chunks of data, due to the increase of writing time on the HD.
But what are those different lines:
- Total time: Time taken since the message enter the job, till it finished.
- Loading time: Time taken since the message enters the job, till a message is process in the first step.
- (Total) Processing Time: Its the time taken since a message is processed in the first step, till the job finishes(all messages are processed on all the steps (if applicable)).
- Steps Processing Time: The time it takes to process all the records in each step, as all the messages are processed in all the steps at a constant time each, then processing all of them is constant to.
- Batch Processing Time: Its the Processing Time subtracting the Steps Processing Time, which let us with the batch overhead.
Ok, but what about CQS, lets do the same test:
Wow this doesn’t look so good right? This is why it is happening:
The records sent to the job are divided in blocks to reduce the amount of messages sent and received, this block will be send to a queue with a top capacity of 160K per message, when this capacity is exceeded, more messages will be used per block, if any record is bigger than 160K it will be store using a much more slower queue, causing a big loose of performance as seen above.
Lets zoom a bit and check the details:
It looks much better, it only adds an small overhead if you are having average payloads of less than 8KB. In the other hand it will start growing lineally with the input size (don’t get confused with the logarithmic scale!).
At this point we have seen what happens when we increase the record sizes, but what happens when we have more or less records? Lets check its behavior.
This test was run with different constant Records Sizes, increasing the amount of records. The HD case is just one, as it was seen before that it only changes with really big payloads.
As expected with bigger record sizes, there is a bigger gap between HD and CQS. Its very important to highlight that increasing the amount of records, increase the processing time lineally with different constants for each record size, which is something very important as it will take less time to process more data in one job, than dividing it on smaller chunks.
Lets zoom on the smaller record size cases:
Gratefully it show us that processing records of a size less or equal than 10K will add a pretty small overhead on the Cloud with the gratefully addition of resilience.
How we did it? With the help and passion of our awesome SaaS developers Mariano González at the ESB who pushed the idea from it beginnings and Fernando Federico who made it work quickly on CloudHub.
I hope this information gave you the proper knowledge about what to expect from your batch application, now go on and start batching the cloud!
Mule How-to: Build a Batch Enabled Cloud Connector
When we announced the December 2013 release, an exciting new feature also saw daylight: The Batch Module.
If you haven’t read the post describing the feature’s highlights, you
should, but today I’d like to focus on how the <batch:commit>block
interacts with Anypoint™ Connectors and more specifically, how you can leverage your own connectors to take advantage of the feature.
of records for bulk upsert to an external source or service. For
example, rather than upserting each individual contact (i.e. record) to Google Contacts,
you can configure a Batch Commit to collect, lets say 100 records, and
then upsert all of them to Google Contacts in one chunk. Within a batch
step – the only place you can apply it – you can use a Batch Commit to
wrap an outbound message processor. See the example below:


the only reason why the example above makes any sense at all is because
the Google Connector
is capable of doing bulk operations. If the connector only supported
updating records one at a time, then there would be no reason for
<batch:commit> to exists.
But wait! The batch module was only released two months ago, yet connectors like Google Contacts, Salesforce, Netsuite, etc, have had bulk operations for years! True that. But what we didn’t have until Batch came along was a construct allowing us to do record level error handling.
Suppose that you’re upserting 200 records in Salesforce. In the past,
if 100 of them failed and the other 100 were successful, it was up to
you to parse the connector response, pull the failed from the successful
apart and take appropiate action. If you wanted to do the same with
Google Contacts, you again found yourself needing to do everything
again, with the extra complexity that you couldn’t reuse your code
because Google and Salesforce APIs use completely different representations to notify the operation’s result.
Our goal with the batch module is clear: make this stuff simple. We no longer want you struggling to figure out each API’s
representation for a bulk result and handling each failed record
independently – from now on, you can rely on <batch:commit> to do
that for you automatically.
the live performance at Wembley Stadium in ’86. Although the magic
described in that song doesn’t apply to batch and connector’s
mechanisms, there’s one phrase in that song which accurately describes
the problem here: “There can only be one”.
If we want the Batch module to understand all types of a bulk
operations results, we need to start by defining a canonical way of
representing it. We did so in a class called BulkOperationResult which
defines the following contact:
Basically, the above class is a Master-Detail relationship in which:
If you haven’t read the post describing the feature’s highlights, you
should, but today I’d like to focus on how the <batch:commit>block
interacts with Anypoint™ Connectors and more specifically, how you can leverage your own connectors to take advantage of the feature.
<batch:commit> overview
In a nutshell, you can use a Batch Commit block to collect a subsetof records for bulk upsert to an external source or service. For
example, rather than upserting each individual contact (i.e. record) to Google Contacts,
you can configure a Batch Commit to collect, lets say 100 records, and
then upsert all of them to Google Contacts in one chunk. Within a batch
step – the only place you can apply it – you can use a Batch Commit to
wrap an outbound message processor. See the example below:
Mixing in Connectors
This is all great but what do connectors have to do with this? Well,the only reason why the example above makes any sense at all is because
the Google Connector
is capable of doing bulk operations. If the connector only supported
updating records one at a time, then there would be no reason for
<batch:commit> to exists.
But wait! The batch module was only released two months ago, yet connectors like Google Contacts, Salesforce, Netsuite, etc, have had bulk operations for years! True that. But what we didn’t have until Batch came along was a construct allowing us to do record level error handling.
Suppose that you’re upserting 200 records in Salesforce. In the past,
if 100 of them failed and the other 100 were successful, it was up to
you to parse the connector response, pull the failed from the successful
apart and take appropiate action. If you wanted to do the same with
Google Contacts, you again found yourself needing to do everything
again, with the extra complexity that you couldn’t reuse your code
because Google and Salesforce APIs use completely different representations to notify the operation’s result.
Our goal with the batch module is clear: make this stuff simple. We no longer want you struggling to figure out each API’s
representation for a bulk result and handling each failed record
independently – from now on, you can rely on <batch:commit> to do
that for you automatically.
It’s not magic
“A kind of magic” is one of my favorite songs from Queen, speciallythe live performance at Wembley Stadium in ’86. Although the magic
described in that song doesn’t apply to batch and connector’s
mechanisms, there’s one phrase in that song which accurately describes
the problem here: “There can only be one”.
If we want the Batch module to understand all types of a bulk
operations results, we need to start by defining a canonical way of
representing it. We did so in a class called BulkOperationResult which
defines the following contact:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
|
- BulkOperationResult represents the operation as a whole, playing the role of the master
- BulkItem represents the result for each individual record, playing the role of the detail
- Both classes are immutable
- There’s an ordering relationship between the master and the detail.
The first item in the BulkItem list has to correspond to the first
record in the original bulk. The second has to correspond to the second
one, and so forth.
In case you’re curious, this is how BulkItem’s contact looks like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
So, that’s it? We just modify all connectors to return a
BulkOperationResult object on all bulk operations and we’re done? Not
quite. That would be the recommended practice for new connectors moving
forward, but for existing connectors we would be breaking backwards
compatibility with any existing Mule application written before the release of the Batch module, which are manually handling the output of bulk operations.
BulkOperationResult object on all bulk operations and we’re done? Not
quite. That would be the recommended practice for new connectors moving
forward, but for existing connectors we would be breaking backwards
compatibility with any existing Mule application written before the release of the Batch module, which are manually handling the output of bulk operations.
What we did in these cases is have those connectors register a
Transformer. Since it’s each connector’s responsibility to understand
each API’s domain, it also makes sense to ask each connector to
translate it’s own bulk operation representation to a
BulkOperationResult object.
Transformer. Since it’s each connector’s responsibility to understand
each API’s domain, it also makes sense to ask each connector to
translate it’s own bulk operation representation to a
BulkOperationResult object.
Let’s see an example. This is the signature for an operation in the Google Contacts connector which performs a bulk operation:
1 |
|
Let’s forget about the implementation of the method right now. The
take away from the above snippet is that the operation will return a
List of BatchResult objects. Let’s see how to register a transformer
that goes from that to a BulkOperationResult:
take away from the above snippet is that the operation will return a
List of BatchResult objects. Let’s see how to register a transformer
that goes from that to a BulkOperationResult:
1 2 3 4 5 |
|
And for the big finale, the code of the transformer itself:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
Important things to notice about the above transformer:
- It extends AbstractDiscoverableTransformer. This is so that the batch module can dynamically find it in runtime.
- It defines the source and target data types on its constructor
- The doTransform() method does “the magic”
- Notice how BulkOperationResult and BulkItem classes provide
convenient Builder objects to decouple their inner representations from
your connector’s code
And that’s pretty much it! The last consideration to take is: what
happens if I use a bulk operation in a <batch:commit> using a
connector that doesn’t support reporting a BulkOperationResult? Well, in
that case you have two options:
happens if I use a bulk operation in a <batch:commit> using a
connector that doesn’t support reporting a BulkOperationResult? Well, in
that case you have two options:
- Write the transformer and register it yourself at an application level
- Just let it be and in case of exception, batch will fail all records alike
Wrapping it up
In this article we discussed why it’s important for connectors to
support bulk operations whenever possible (some APIs just can’t do it,
that’s not your fault). For new connectors, we advice to always return
instances of the canonical BulkOperationResult class. If you want to add
batch support to an existing connector without breaking backwards
compatibility, we covered how to register discoverable transformers to
do the trick.
support bulk operations whenever possible (some APIs just can’t do it,
that’s not your fault). For new connectors, we advice to always return
instances of the canonical BulkOperationResult class. If you want to add
batch support to an existing connector without breaking backwards
compatibility, we covered how to register discoverable transformers to
do the trick.
As always, I hope you enjoyed the reading, and since we spoke about
magic so much, this time I’ll say farewell with some music. Enjoy!
magic so much, this time I’ll say farewell with some music. Enjoy!
Near Real Time Sync with Batch
What is it?
Near Real time sync is the term we’ll use along this blog post to refer to the following scenario:
“When you want to keep data flowing constantly from one system to another”
As you might imagine the keyword in the scenario definition here is constantly. That means, periodically the application will move
whatever entity/data you care about from the source system to its
destination system. The periodicity really depends on what the
application is synchronizing. If you are moving purchase orders for a
large retailer you could allow the application a few minutes between
each synchronization. Now, if we are dealing with Banking transaction
you’ll probably like to change that to something in the order of a few
seconds or even a hundred milliseconds, unless you really like to deal
with very angry people.The nice thing about the template we’ll use as example is that such change in the behaviour of the application is very simple, just an small change in a properties file’s line.
How does it work?
As I mentioned before the objective of this template is to move data periodically from one system to another one. In order to do that there are few things to solve. The first decision you have to make is choose between these two approaches:- Push: It means that your application will be notified of the fact that there is new data to be synchronized
- Poll: It means that your application will be in charge of figuring out what new data is there that needs to be synchronized
Poll & Watermark
For this Template we’ve chosen the Poll Approach. The reason behind this is that not many systems out there have the capability to notify changes, and some of the ones that do offer this capability do so in an unreliable way. That means for instance, that they will try to push a message without knowing wether you application is listening or not, or that they offer no ACK mechanism so they don’t ensure that the destination system actually receive the message successfully. In any event the main problem here is message losses.
A reliable application cannot afford to lose a message signalling the creation or update of an entity.
Last but not least several systems out there do offer ways to query for data which is what we need to implement our Poll approach, thus this solution applies to a broader number of source systems.Now that we’ve decided to use the Poll approach we can define the Mule components to use, and of course we’ve chosen the Poll component with its watermark feature.
The Poll component in it most simple way of working just runs every certain period of time executing the message processor inside of it, thus triggering a flow and the payload of the message entering the flow will be whatever the result of execution of the message processor was.
As we are going to be polling information from a source system, we must tell it what do we need and here is where we use the watermark functionality. In short the watermark functionality will allow us to delegate in the poll component the responsibility of:
- Selecting the watermark value to be used in the current run
- Analyse the returned payload and choose the next watermark to be used in the following run.
Inside Batch
The Batch Module is a really powerful tool and there are tons of documentation to learn how it works, you could start reading these two blogs:Nevertheless I’ll like to provide a really short introduction to it so you can get a hold of how it works.
The batch module divides its work in three stages:
- Input Stage
- Process Stage
- On complete Stage
The Input Stage
The input stage is the first thing that runs when you execute a batch job. You can do several things inside this stage but you need to ensure that the payload contained in the message after the execution of the last message processor is either an iterable, and Iterator or an array . You can also leave then Input stage empty in which case the previous consideration should be taken for the payload in the message previous to the call of Batch Execute message processor. The final task of the Input Stage, which is implicit i.e. build in into the Batch Module, will transform each item in the iterable/Iterator/array payload into a Record. This is a very important fact to understand, from this point onward you could not access the original payload in this form you’ll only deal with Records. It is worth noticing that this stage runs on the same thread that called the Batch Execute message processor, but once the stage is done consuming all the items in the payload this thread will return, that is the Process Stage runs in a different thread.The Process Stage
The process stage is divided into Steps. Each step will receive a Record to process.This is a the main concept that you may find different the first time you work with the Batch module, each step only deals with records. If you recall a record is each item of the iterable/Iterator/array payload processed during the Input Stage. So please keep that present at all times, each step doesn’t deal with the whole original payload but rather with just one item at the time.
The idea behind this is that each Record will be processed in turn by each Step defined in the Batch Job.
The On complete Stage
This is the final stage of batch, it will be executed ones all the records has been processed. Again this stage is executed in a different thread that those in the Input Stage and the Process Stage.Template Description
Now that you have a basic knowledge of Poll and the Batch Module, let’s describe how this particular Template makes use of it.- Fetch all the created/updated entities from the source system
- Try to fetch the matching entity in the destination system for a particular entity and store it
- Deal with the orinal entity and we prepare it to push it to the destination system
Just change the Frequency, as showed below and you’ll be good to go:
All the records of the original input will be processed, once they are all done then the On Complete Stage will be triggered. At this point the Batch Module offers the user a POJO called Batch Job Result with statistics regarding the final state of the batch process, it expose data such as amount of records processed, amount of records success, etc. You can find more information about the Batch Job Result in this page. You can use this data as you see fit, in this Template we’ve choose just to log this information to the console, but you could send it through email or send a Tweet, or maybe upload a file to your favourite cloud based storage.
Why Batch ?
The Batch Module is a powerful tool offered by Mule ESB. It offers you the capability to process large amounts of data, in a reliable way. That is, it can recover from a crash and keep processing a batch job from where it was left off, even if the power went down and back up.Now you may be asking why to use something like the Batch module for something like this?
Well because the Batch module offers:
- Support for large amounts of data
- Reliability
- Both at the same time
- Management (which will be coming shortly in the next Mule ESB release)
Although you could do all this with just the original tools that Mule offers you, reality shows that it’s far simpler to do so with the Batch module mainly because you just don’t have to deal with all the specifics that the module solves out of the box.
Subscribe to:
Comments (Atom)