Wikipedia

Search results

Thursday, 12 June 2014

Integration Patterns

http://blogs.mulesoft.org/tag/integration-patterns/

How to build a Processing Grid (An Example)

In his “To ESB or not to ESB” series of post, Ross Mason has identified four common architectures for which Mule is a great fit: ESB, Hub’n'Spoke, API/Service Layer and . In this post we are going to detail an example for the latter architecture. We will use Mule to build a scalable image resizing service.
Here is the overall architecture of what we intend to build:
As you can see, we allow end users to upload images to a set of load balanced Mule instances. Behind the scene, we rely on Amazon S3 for storing images waiting to be resized and Amazon SQS as the queuing mechanism for the pending resizing jobs. We integrate with any SMTP server for sending the resized image back to the end user’s email box. As the number of users will grow, we will grow the processing grid simply by adding more and more Mule instances: indeed, each instance is configured exactly the same way so the solution can easily be scaled horizontally.
Read on to discover how easy Mule makes the construction of such a processing grid…

[ The complete application is available on GitHub. In this post, we will detail and comment some aspects of it. ]
There are two main flows in the application we are building:
  • Job Creation – where an image can be uploaded and a resize request created,
  • Job Worker – where the image resizing is done and the result emailed back to the user.
There is another ancillary flow that kicks in before these two: it’s the flow that takes care of serving the image upload form. Let’s start by looking at it.

Submission Form

Mule is capable of service static content over HTTP, using a message processor called the static resource handler.
With the above flow, we’re able to serve a basic HTML form that can be used in a browser in order to upload an image and provide information about the desired size and email address for receiving the result.
This form submits data to Mule using the multipart/form-data encoding type. Let’s know look at the flow that takes care of receiving these submissions.

Job Creation

The flow below shows what happens when a user uploads an image to Mule:
What happens in this flow consists of the following:
  • The multipart/form-data encoded HTTP request is received by an HTTP inbound endpoint,
  • The image is persisted in S3,
  • The job meta-data (unique ID, target size, user email, image type) is stored in a JSON object and sent over SQS,
  • The success of the operation is logged (including the ID for traceability),
  • A simple textual content is assembled and returned to the caller: it will simply be displayed in the browser of the user.
When this flow is done, it is up to a worker to pick-up the message from SQS and perform the actual image resizing.

Job Worker

The worker flow is the longest one:
In this flow, the following happens:
  • The job meta data is received from SQS, deserialized to a java.util.Map and stored in a flow variable for ulterior access,
  • The source image is retrieved from S3,
  • It is then resized and the result is attached to the current message, which will automatically be used as an email attachment,
  • A simple confirmation text is set as the message payload, which will be used as the email content,
  • The email is sent over SMTP,
  • The success of the operation is recorded,
  • And finally the source image is deleted from S3.
We record success before deleting the image from S3 because, as far as the end user is concerned, at this point we are done and have successfully performed our task.
Note that SQS guarantees that message delivery occurs at least once. This means that it’s possible that a similar job gets delivered more than once. In our particular case, this would mean that we would try to perform the resize several times, potentially not finding the deleted image or, if finding it, sending the resized image several time. This is not tragic so we can live with that but, if that would be a problem, we would simply add an idempotent filter right after the SQS message source, using the unique  job ID as the unique message ID in the filter.

Conclusion

As detailed by Ross in the aforementioned series of posts, Mule’s reach extends way beyond pure integration. We’ve seen in this post that Mule provides a wide range of primitives that can be used to build processing grids.
Have you built such a Mule-powered processing grid? If yes, please share the details in the comment section of this post!

Batch processing performance in the cloud

Today I will introduce our performance test of the Batch Module introduced on the Mule’s December 2013 release. I will guide you through the test scenario and explain all the data collected.
But first, if you don’t know what batch is, please read the great Batch Blog from our star developer Mariano Gonzalez, and for any other concerns you also have the documentation.

Excited? Great! Now we can start with the details, this performance test was run on a ’s Double worker, using the default threading profile of 16 threads. We will compare the on-premise vs cloud performance. Henceforth we will talk about HD vs CQS performance. Why? On-Premise and CloudHub users will be using by default the HardDisk for temporal storage and resilience but, this is not very useful on CloudHub as if for any reason the the worker is restarted, the current job will loose all its messages, then if  Persistent Queues are enabled the Batch module will automatically store all the data with CQS (Cloud Queue Storage) to achieve the expected resilience.
The app used for the test  have a constant processing time for each “step” and “commit” phase, allowing us to decouple, the time variations of using different services or record sizes. It was run modifying with two different parameters:
  • Amount of records: How many records are loaded on the job, all of them with the exact same size.
  • Record size: The normal distribution in bytes that is contained on the records sent to a job.
This will allow us to check the different performance variations when increasing any of this variables.
Now that everything is clear, lets start checking out the HD performance by its own, it was run with jobs of different Record sizes (on a logarithmic base) and 100.000 records each, which we assumed would be a standard load for a job:

The image speaks for itself, the performance is pretty flat, for most of the Record sizes, increasing with really big chunks of data, due to the increase of writing time on the HD.
But what are those different lines:
  • Total time: Time taken since the message enter the job, till it finished.
  • Loading time: Time taken since the message enters the job, till a message is process in the first step.
  • (Total) Processing Time: Its the time taken since a message is processed in the first step, till the job finishes(all messages are processed on all the steps (if applicable)).
  • Steps Processing Time: The time it takes to process all the records in each step, as all the messages are processed in all the steps at a constant time each, then processing all of them is constant to.
  • Batch Processing Time: Its the Processing Time subtracting the Steps Processing Time, which let us with the batch overhead.

Ok, but what about CQS, lets do the same test:

Wow this doesn’t look so good right? This is why it is happening:
The records sent to the job are divided in blocks to reduce the amount of messages sent and received, this block will be send to a queue with a top capacity of 160K per message, when this capacity is exceeded, more messages will be used per block, if any record is bigger than 160K it will be store using a much more slower queue, causing a big loose of performance as seen above.
Lets zoom a bit and check the details:

It looks much better, it only adds an small overhead if you are having average payloads of less than 8KB. In the other hand it will start growing lineally with the input size (don’t get confused with the logarithmic scale!).
At this point we have seen what happens when we increase the record sizes, but what happens when we have more or less records? Lets check its behavior.
This test was run with different constant Records Sizes, increasing the amount of records. The HD case is just one, as it was seen before that it only changes with really big payloads.

As expected with bigger record sizes, there is a bigger gap between HD and CQS. Its very important to highlight that increasing the amount of records, increase the processing time lineally with different constants for each record size, which is something very important as it will take less time to process more data in one job, than dividing it on smaller chunks.
Lets zoom on the smaller record size cases:

Gratefully it show us that processing records of a size less or equal than 10K will add a pretty small overhead on the Cloud with the gratefully addition of  resilience.
How we did it? With the help and passion of our awesome SaaS developers Mariano González at the ESB who pushed the idea from it beginnings and Fernando Federico who made it work quickly on CloudHub.
I hope this information gave you the proper knowledge about what to expect from your batch application, now go on and start batching the cloud!

Mule How-to: Build a Batch Enabled Cloud Connector

When we announced the December 2013 release, an exciting new feature also saw daylight: The Batch Module.
If you haven’t read the post describing the feature’s highlights, you
should, but today I’d like to focus on how the <batch:commit>block
interacts with Anypoint™ Connectors and more specifically, how you can leverage your own connectors to take advantage of the feature.


<batch:commit> overview

In a nutshell, you can use a Batch Commit block to collect a subset
of records for bulk upsert to an external source or service. For
example, rather than upserting each individual contact (i.e. record) to Google Contacts,
you can configure a Batch Commit to collect, lets say 100 records, and
then upsert all of them to Google Contacts in one chunk. Within a batch
step – the only place you can apply it – you can use a Batch Commit to
wrap an outbound message processor. See the example below:









Mixing in

This is all great but what do connectors have to do with this? Well,
the only reason why the example above makes any sense at all is because
the Google
is capable of doing bulk operations. If the connector only supported
updating records one at a time, then there would be no reason for
<batch:commit> to exists.


But wait! The batch module was only released two months ago, yet connectors like Google Contacts, Salesforce, Netsuite, etc, have had bulk operations for years! True that. But what we didn’t have until Batch came along was a construct allowing us to do record level error handling.


Suppose that you’re upserting 200 records in Salesforce. In the past,
if 100 of them failed and the other 100 were successful, it was up to
you to parse the connector response, pull the failed from the successful
apart and take appropiate action. If you wanted to do the same with
Google Contacts, you again found yourself needing to do everything
again, with the extra complexity that you couldn’t reuse your code
because Google and Salesforce APIs use completely different representations to notify the operation’s result.


Our goal with the batch module is clear: make this stuff simple. We no longer want you struggling to figure out each API’s
representation for a bulk result and handling each failed record
independently – from now on, you can rely on <batch:commit> to do
that for you automatically.


It’s not magic

“A kind of magic” is one of my favorite songs from Queen, specially
the live performance at Wembley Stadium in ’86. Although the magic
described in that song doesn’t apply to batch and connector’s
mechanisms, there’s one phrase in that song which accurately describes
the problem here: “There can only be one”.


If we want the Batch module to understand all types of a bulk
operations results, we need to start by defining a canonical way of
representing it. We did so in a class called BulkOperationResult which
defines the following contact:







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* This class is used to provide item level information about a bulk operation. This
* master entity represents the bulk operation as a whole, while the detail entity
* {@link BulkItem} represents the operation status for each individual data piece.
* The {@link #items} list defines a contract in which the ordering of those items
* needs to match the ordering of the original objects. For example, if the bulk
* operation consisted of 10 person objects in which number X corresponded to the
* person 'John Doe', then the Xth item in the {@link #items} list must reference to
* the result of procesing the same 'John Doe'
*/
public final class BulkOperationResult<T> implements Serializable
{
/**
* The operation id
*/
public Serializable getId();
/**
* Whether or not the operation was successful. Should be <code>true</code> if
* and only if all the child {@link BulkItem} entities were also successful
*/
public boolean isSuccessful();
/**
* An ordered list of {@link BulkItem}, one per each item in the original
* operation, no matter if the record was successful or not
*/
public List<BulkItem<T>> getItems();
/**
* A custom property stored under the given key
*
* @param key the key of the custom property
* @return a {@link Serializable} value
*/
public Serializable getCustomProperty(String key);
}
view raw
gistfile1.java
hosted with ❤ by GitHub
Basically, the above class is a Master-Detail relationship in which:


  • BulkOperationResult represents the operation as a whole, playing the role of the master
  • BulkItem represents the result for each individual record, playing the role of the detail
  • Both classes are immutable
  • There’s an ordering relationship between the master and the detail.
    The first item in the BulkItem list has to correspond to the first
    record in the original bulk. The second has to correspond to the second
    one, and so forth.
In case you’re curious, this is how BulkItem’s contact looks like:







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

/**
* This class represents an individual data piece in the context of a bulk operation
*/
public final class BulkItem<T> implements Serializable
{
/**
* The item id
*/
public Serializable getId();
 
/**
* Wether or not it was successful. Notice that this should be <code>false</code>
* if {@link #exception} is not <code>null</code>, however there might not be an
* exception but the item could still not be successful for other reasons.
*/
public boolean isSuccessful();
 
/**
* Message to add context on this item. Could be an error description, a warning
* or simply some info related to the operation
*/
public String getMessage();
 
/**
* An optional status code
*/
public String getStatusCode();
/**
* An exception if the item was failed
*/
public Exception getException();
 
/**
* The actual data this entity represents
*/
public T getPayload();
 
/**
* A custom property stored under the given key
*
* @param key the key of the custom property
* @return a {@link Serializable} value
*/
public Serializable getCustomProperty(String key);
}
view raw
gistfile1.java
hosted with ❤ by GitHub
So, that’s it? We just modify all connectors to return a
BulkOperationResult object on all bulk operations and we’re done? Not
quite. That would be the recommended practice for new connectors moving
forward, but for existing connectors we would be breaking backwards
compatibility with any existing application written before the release of the Batch module, which are manually handling the output of bulk operations.


What we did in these cases is have those connectors register a
Transformer. Since it’s each connector’s responsibility to understand
each API’s domain, it also makes sense to ask each connector to
translate it’s own bulk operation representation to a
BulkOperationResult object.


Let’s see an example. This is the signature for an operation in the Google Contacts connector which performs a bulk operation:







1

public List<BatchResult> batchContacts(String batchId, List<NestedProcessor> operations) throws Exception;
view raw
gistfile1.java
hosted with ❤ by GitHub
Let’s forget about the implementation of the method right now. The
take away from the above snippet is that the operation will return a
List of BatchResult objects. Let’s see how to register a transformer
that goes from that to a BulkOperationResult:







1
2
3
4
5

@Start
public void init() {
this.muleContext.getRegistry().registerTransformer(new BatchResultToBulkOperationTransformer());
}
view raw
gistfile1.java
hosted with ❤ by GitHub
And for the big finale, the code of the transformer itself:







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class BatchResultToBulkOperationTransformer extends AbstractDiscoverableTransformer {
 
public BatchResultToBulkOperationTransformer() {
this.registerSourceType(DataTypeFactory.create(List.class, BatchResult.class, null));
this.setReturnDataType(DataTypeFactory.create(BulkOperationResult.class));
}
@Override
protected Object doTransform(Object src, String enc) throws TransformerException {
List<BatchResult> results = (List<BatchResult>) src;
BulkOperationResultBuilder<BaseEntry<?>> builder = BulkOperationResult.<BaseEntry<?>>builder();
if (results != null) {
for (BatchResult result : results) {
BatchStatus status = result.getStatus();
int code = status.getCode();
builder.addItem(BulkItem.<BaseEntry<?>>builder()
.setRecordId(result.getId())
.setPayload(result.getEntry())
.setMessage(status.getContent())
.setStatusCode(String.format("%d - %s", code, status.getReason()))
.setSuccessful(code == 200 || code == 201 || code == 204)
);
}
}
return builder.build();
}
 
}
view raw
gistfile1.java
hosted with ❤ by GitHub
Important things to notice about the above transformer:


  • It extends AbstractDiscoverableTransformer. This is so that the batch module can dynamically find it in runtime.
  • It defines the source and target data types on its constructor
  • The doTransform() method does “the magic”
  • Notice how BulkOperationResult and BulkItem classes provide
    convenient Builder objects to decouple their inner representations from
    your connector’s code
And that’s pretty much it! The last consideration to take is: what
happens if I use a bulk operation in a <batch:commit> using a
connector that doesn’t support reporting a BulkOperationResult? Well, in
that case you have two options:


  • Write the transformer and register it yourself at an application level
  • Just let it be and in case of exception, batch will fail all records alike

Wrapping it up

In this article we discussed why it’s important for connectors to
support bulk operations whenever possible (some APIs just can’t do it,
that’s not your fault). For new connectors, we advice to always return
instances of the canonical BulkOperationResult class. If you want to add
batch support to an existing connector without breaking backwards
compatibility, we covered how to register discoverable transformers to
do the trick.


As always, I hope you enjoyed the reading, and since we spoke about
magic so much, this time I’ll say farewell with some music. Enjoy!


Near Real Time Sync with Batch

Damian Sima on Wednesday, May 14, 2014

Near Real Time Sync with Batch

0
The idea of this blog post is to give you a short introduction on how to do Real time sync with Mule ESB. We’ll use several of the newest features that Mule has to offer – like the improved Poll component with watermarking and the . Finally we’ll use one of our Anypoint Templates as an example application to illustrate the concepts.

What is it?

Near Real time sync is the term we’ll use along this blog post to refer to the following scenario:
“When you want to keep data flowing constantly from one system to another”
As you might imagine the keyword in the scenario definition here is constantly. That means, periodically the application will move whatever entity/data you care about from the source system to its destination system. The periodicity really depends on what the application is synchronizing. If you are moving purchase orders for a large retailer you could allow the application a few minutes between each synchronization. Now, if we are dealing with Banking transaction you’ll probably like to change that to something in the order of a few seconds or even a hundred milliseconds, unless you really like to deal with very angry people.

The nice thing about the template we’ll use as example is that such change in the behaviour of the application is very simple, just an small change in a properties file’s line.

How does it work?

As I mentioned before the objective of this template is to move data periodically from one system to another one. In order to do that there are few things to solve. The first decision you have to make is choose between these two approaches:
  • Push: It means that your application will be notified of the fact that there is new data to be synchronized
  • Poll: It means that your application will be in charge of figuring out  what new data is there that needs to be synchronized

Poll & Watermark

For this Template we’ve chosen the Poll Approach. The reason behind this is that not many systems out there have the capability to notify changes, and some of the ones that do offer this capability do so in an unreliable way. That means for instance, that they will try to push a message without knowing wether you application is listening or not, or that they offer no ACK mechanism so they don’t ensure that the destination system actually receive the message successfully. In any event the main problem here is message losses.
A reliable application cannot afford to lose a message signalling the creation or update of an entity.
Last but not least several systems out there do offer ways to query for data which is what we need to implement our Poll approach, thus this solution applies to a broader number of source systems.
Now that we’ve decided to use the Poll approach we can define the Mule components to use, and of course we’ve chosen the Poll component with its watermark feature.
The Poll component in it most simple way of working just runs every certain period of time executing the message processor inside of it, thus triggering a flow and the payload of the message entering the flow will be whatever the result of execution of the message processor was.
As we are going to be polling information from a source system, we must tell it what do we need and here is where we use the watermark functionality. In short the watermark functionality will allow us to delegate in the poll component the responsibility of:
  • Selecting the watermark value to be used in the current run
  • Analyse the returned payload and choose the next watermark to be used in the following run.
I could give you a detailed overview on how Poll and watermarking works but there is already a cool blog out there for that, you can find it here: Link.

Inside

The Batch Module is a really powerful tool and there are tons of documentation to learn how it works, you could start reading these two blogs:
Nevertheless I’ll like to provide a really short introduction to it so you can get a hold of how it works.
The batch module divides its work in three stages:
  • Input Stage
  • Process Stage
  • On complete Stage

The Input Stage

The input stage is the first thing that runs when you execute a batch job. You can do several things inside this stage but you need to ensure that the payload contained in the message after the execution of the last message processor is either an iterable, and Iterator or an array . You can also leave then Input stage empty in which case the previous consideration should be taken for the payload in the message previous to the call of Batch Execute message processor. The final task of the Input Stage, which is implicit i.e. build in into the Batch Module, will transform each item in the iterable/Iterator/array payload into a Record. This is a very important fact to understand, from this point onward you could not access the original payload in this form  you’ll only deal with Records. It is worth noticing that this stage runs on the same thread that called the Batch Execute message processor, but once the stage is done consuming all the items in the payload this thread will return, that is the Process Stage runs in a different thread.

The Process Stage

The process stage is divided into Steps. Each step will receive a Record to process.
This is a the main concept that you may find different the first time you work with the Batch module, each step only deals with records. If you recall a record is each item of the iterable/Iterator/array payload processed during the Input Stage. So please keep that present at all times, each step doesn’t deal with the whole original payload but rather with just one item at the time.
The idea behind this is that each Record will be processed in turn by each Step defined in the Batch Job.

The On complete Stage

This is the final stage of batch, it will be executed ones all the records has been processed. Again this stage is executed in a different thread that those in the Input Stage and the Process Stage.

Template Description

Now that you have a basic knowledge of  Poll and the Batch Module, let’s describe how this particular Template makes use of it.
  1. Fetch all the created/updated entities from the source system
  2. Try to fetch the matching entity in the destination system  for a particular entity and store it
  3. Deal with the orinal entity and we prepare it to push it to the destination system
This is the first flow you’ll find in the application and it is here where you modify the periodicity with which the application run.
Poll Endpoint
Just change the Frequency, as showed below and you’ll be good to go:
Polling Frequency
This is the batch job, were you’ll find steps 2 and 3:

Accounts batch job
As you can see the application is a fairly simple one. Now you may have notice that in the third step we make use of another Batch tool, a scope called Batch Commit. The Batch Commit scope allows you to aggregate records. In this example we aggregate the entities in order to make just one call with as many as 200 entities at the same time, thus reducing the amount of calls to the destinations system and improving the overall performance of the application.
All the records of the original input will be processed, once they are all done then the On Complete Stage will be triggered. At this point the Batch Module offers the user a POJO called Batch Job Result with statistics regarding the final state of the batch process, it expose data such as amount of records processed, amount of records success, etc. You can find more information about the Batch Job Result in this page. You can use this data as you see fit, in this Template we’ve choose just to log this information to the console, but you could send it through email or send a Tweet, or maybe upload a file to your favourite cloud based storage.

Why Batch ?

The Batch Module is a powerful tool offered by Mule ESB. It offers you the capability to process large amounts of data, in a reliable way. That is, it can recover from a crash and keep processing a batch job from where it was left off, even if the power went down and back up.
Now you may be asking why to use something like the Batch module for something like this?
Well because the Batch module offers:
  • Support for large amounts of data
  • Reliability
  • Both at the same time
  • Management (which will be coming shortly in the next Mule ESB release)
I reckon that no one will question why reliability is an important thing, but why do we care about large amounts of data. It’s fair enough to asume that if the application is running periodically then the amount of data to be process shouldn’t be that much, then again the really is that you can not asure that. Easily enough a user can shutdown the application for a day and then start it again, just there you have a day worth of data to process, or you could set up you application very first run to bring data which was update during the last five years, finally there could be 10 other applications generating data in your source system at the same time. In any of the previous events your application should hold and be able to deal with that amount of data.
Although you could do all this with just the original tools that Mule offers you, reality shows that it’s far simpler to do so with the Batch module mainly because you just don’t have to deal with all the specifics that the module solves out of the box.