Here is a real scenario:
We are going to implement a web service using activiti. So we should be synchronous and quick. In this scenario call comes from Camel endoint, but it is not relevant for this discussion and we can ignore it safely without loosing generality.
To conclude the service, we need to call 3 other external web services. External web services take random amount of time to return back the result. After all 3 web services are returned, we need to aggregate their returned results and return the result of aggregation as the return value of our web service.
This diagram shows the outline:
This works fine. As each step is synchronous, activiti executes the next step only when the previous one is returned. The returned values is stored as a process variable. At aggregation we have all the returned values and we can aggregate.
But if the web service calls are independent, we can run them in parallel and save time and resources. And here is where all the problems begin.
BPMN has already a parallel gateway available. The first temptation is using that feature. something like this:
It looks nice on the paper, but we know that at activiti uses a single thread to run all the three paths. This means that at the end of the day, they are sequenced. There is no differences to the previous one, except that we cannot be sure about the order of the execution of web service calls. Not very promising !
What if we make the parallel services asynchronous? Ok, that is not easy as well. Here is a nice post from Tijs about how it could be done. It might be a little outdated though, thank to the new Async Job Executor starting from version 5.17.0.
With this approach, aggregate part is done properly. Activiti makes sure, the aggregate service is called only when all 3 web service calls are finished. good !
But the problem is that we need to return aggregated result as the return of web service. As the parallel tasks are asynchronous, the control returns to the caller immediately, without waiting for the external web service calls to return. So we don’t have the result yet. We need to wait and synchronize for the result to come. Usual way to do it is by locks and monitors in Java. Not exactly feasible here, as the the very same objects are not passed to the process. They are serialized and deserialized via database.
To solve that problem, we can maintain a global static array of objects, and only pass the reference to the proper lock object to the activiti. In aggregation service, the static object is found and signaled with the help of passed index. Signaling causes the waiting thread to release and result to return back to web service.
Just as a rough proof of concept there I have created a sample, available in parallelMultiThreaded in github.
In the sample MyUnitTest test class simulates the body of newly created web service call. Service1, Service2, Service3 simulate external web service calls, simply by sleeping. The executed model is the second one above with parallel gateway.
The static lock and condition arrays are defined as:
public static Condition conditions = new Condition; public static Lock locks = new Lock;
index, is given a random number (0, not much random I confess) and passed to the activiti.
After starting the instance, main code goes to an unlimited wait loop, waiting for the corresponding object to be unlocked:
After all the external web services are executed, activiti executes aggregation task, which in turn signals the lock:
The log files clearly show that everything goes fine:
02:35:16,817 [pool-1-thread-1] INFO org.activiti.Service1 - Service 1 started, time = 70 02:35:16,817 [pool-1-thread-2] INFO org.activiti.Service2 - Service 2 started, time = 70 02:35:16,817 [pool-1-thread-3] INFO org.activiti.Service3 - Service 3 started, time = 70 02:35:17,318 [pool-1-thread-3] INFO org.activiti.Service3 - Service 3 finished, time = 571 02:35:17,519 [pool-1-thread-2] INFO org.activiti.Service2 - Service 2 finished, time = 772 02:35:17,819 [pool-1-thread-1] INFO org.activiti.Service1 - Service 1 finished, time = 1072 02:35:17,859 [pool-1-thread-1] INFO org.activiti.AggregateService - All external ws calls returned. Try to unlock the lock. time = 1112 02:35:17,859 [main] INFO org.activiti.MyUnitTest - wait loop exited. Program finished. total taime spent = 1112
It is clear that tasks are running simultaneously and aggregation and monitoring are done properly. Also there is no considerable delay for unlocking and signaling.
Ok, I am cheating. The time for sleeping are intentionally set to different values with proper distance. Otherwise contention will cause Asynchronous job executor, to retry. This adds several seconds to the whole process. But that is not the subject of this post.
The sample code is not considering optimistic locking problem as is described in the post above, neither it handles timeout in waiting for lock to be release, but well shows the concept. So with this workaround, it is possible to use activiti parallel gateway for executing external web service really multi-threaded and respond the result synchronously.
But one may suggest to overlook activiti parallel gateways, and do the thread synchronization in a single Java delegate. That way it would be much conciser and easier to implement. It is much faster, as it does need communicating via database. The cons is that you can not see the real flow in the diagram.
Obviously this only works when everything is running in a single jvm. It is possible to implement it in a cluster, but even the current implementation is not much practical to the long delay caused by asynchronous job picking mechanism
PS: Thank to Joram’s hint, I replaced the low level wait and notify to higher level ReenterantLock.