Real Multithreaded parallel execution in activiti

Here is a real scenario:

We are going to implement a  web service using activiti. So we should be synchronous and quick. In this scenario call comes from Camel endoint, but it is not relevant for this discussion and we can ignore it safely without loosing generality.

To conclude the service, we need to call 3 other external web services. External web services take random amount of time to return back the result. After all 3 web services are returned, we need to aggregate their returned results and return the result of aggregation as the return value of our web service.

This diagram shows the outline:

parallelSampleSerial

 

This works fine. As each step is synchronous, activiti executes the next step only when the previous one is returned. The returned values is stored as a process variable. At aggregation we have all the returned values and we can aggregate.

But if the web service calls are independent, we can run them in parallel and save time and resources. And here is where all the problems begin.

BPMN has already a parallel gateway available. The first temptation is  using that feature. something like this:

parallelSampleParallel

 

It looks nice on the paper, but we know that at activiti uses a single thread to run all the three paths. This means that at the end of the day, they are sequenced. There is no differences to the previous one, except that we cannot be sure about the order of the execution of web service calls. Not very promising !

What if we make the parallel services asynchronous? Ok, that is not easy as well. Here is a nice post from Tijs about how it could be done. It might be a little outdated though, thank to the new Async Job Executor starting from version 5.17.0.

With this approach, aggregate part is done properly. Activiti makes sure, the aggregate service is called only when all 3 web service calls are finished. good !

But the problem is that we need to return aggregated result as the return of web service. As the parallel tasks are asynchronous, the control returns to the caller immediately, without waiting for the external web service calls to return. So we don’t have the result yet. We need to wait and synchronize for the result to come. Usual way to do it is by locks and monitors in Java. Not exactly feasible here, as the the very same objects are not passed to the process. They are serialized and deserialized  via database.

To solve that problem, we can maintain a global static array of objects, and only pass the reference to the proper lock object to the activiti. In aggregation service, the static object is found and signaled with the help of passed index. Signaling causes the waiting thread to release and result to return back to web service.

Just as a rough proof of concept there I have created a  sample, available  in   parallelMultiThreaded in github.

In the sample MyUnitTest test class simulates the body of newly created web service call. Service1, Service2, Service3 simulate external web service calls, simply by sleeping. The executed model is the second one above with parallel gateway.

The static lock and condition arrays are defined as:

 public static Condition[] conditions = new Condition[1000];
 public static Lock[] locks = new Lock[1000];

 

index, is given a random number (0, not much random I confess) and passed to the activiti.
After starting the instance, main code goes to an unlimited wait loop, waiting for the corresponding object to be unlocked:

condition.await();

After all the external web services are executed, activiti executes aggregation task, which in turn signals the lock:

condition.signalAll();

The log files clearly show that everything goes fine:

02:35:16,817 [pool-1-thread-1] INFO  org.activiti.Service1  - Service 1 started, time = 70
02:35:16,817 [pool-1-thread-2] INFO  org.activiti.Service2  - Service 2 started, time = 70
02:35:16,817 [pool-1-thread-3] INFO  org.activiti.Service3  - Service 3 started, time = 70
02:35:17,318 [pool-1-thread-3] INFO  org.activiti.Service3  - Service 3 finished, time = 571
02:35:17,519 [pool-1-thread-2] INFO  org.activiti.Service2  - Service 2 finished, time = 772
02:35:17,819 [pool-1-thread-1] INFO  org.activiti.Service1  - Service 1 finished, time = 1072
02:35:17,859 [pool-1-thread-1] INFO  org.activiti.AggregateService  - All external ws calls returned. Try to unlock the lock. time = 1112
02:35:17,859 [main] INFO  org.activiti.MyUnitTest  - wait loop exited. Program finished. total taime spent = 1112

It is clear that tasks are running simultaneously  and aggregation and monitoring are done properly. Also there is no considerable delay for unlocking and signaling.

 

Ok, I am cheating. The time for sleeping are intentionally set to different values with proper distance. Otherwise contention will cause Asynchronous job executor, to retry. This adds several seconds to the whole process. But that is not the subject of this post.

The sample code is not considering optimistic locking problem as is described in the post above, neither it handles timeout in waiting for lock to be release,  but well shows the concept. So with this workaround, it is possible to use activiti parallel gateway for executing external web service really multi-threaded and respond the result synchronously.

But one may suggest to overlook activiti parallel gateways, and do the thread synchronization in a single Java delegate. That way it would be much  conciser and easier to implement. It is much faster, as it does need communicating via database. The cons is that you can not see the real flow in the diagram.

 

Obviously this only works when everything is running in a single jvm. It is possible to implement it in a cluster, but even the current implementation is not much practical to the long delay caused by asynchronous job picking mechanism

PS: Thank to Joram’s hint, I replaced the low level wait and notify to higher level ReenterantLock.

 

 

 

12 thoughts on “Real Multithreaded parallel execution in activiti

      1. علی اسمعیلی

        سلام .
        مثلا
        p-shahidshakeri
        یکی از مشتریای منه میخواد از بلاگفا انتقال بده .
        وقتی میزنم برای انتقال و ایمیل رو وارد میکنم
        یه پیج ارور میاد .
        اگه میشه لطف کنین درستش کنین با تشکر .

        Reply
  1. admin Post author

    سلام
    میشه یک کم دقیقتر توضیح بدید. آدرس اون مشتریتون چیه ؟ چه پیغام خطایی می گیرید ؟

    Reply
    1. علی اسمعیلی

      اقا سعید
      http://blogfa2wordpress.saeidmirzaei.com/
      این آدرس ادرس گرفتن خروجی از مطالب هست دیگه درست ؟
      خب من میرم توش
      p-shahidshakeri
      رو میزنم جای اکانت بلاگفا
      p-shahidshakeri.blogfa.com
      آدرس سایتشم بالا هست.
      مثلا ایمیلشم میزنم
      info@p-shahidshakeri.ir

      قبلش بگم که تمامی کارایی که توی آموزش گفته کردم برای چند تا وب سایت دیگه هم انجام دادم.
      ولی از دیروز که میخوام این وب سایت رو انتقال بدم مشکل دارم.
      بعدش که میزنم تبدیل
      خطا دریافتی به شرح زیر است:

      IndexError at /

      list index out of range

      Request Method: POST
      Request URL: http://blogfa2wordpress.saeidmirzaei.com/
      Django Version: 1.5.1
      Exception Type: IndexError
      Exception Value:

      list index out of range

      Exception Location: /var/www/apps/blgf2wrdprs/exporter/subjectParser.py in handle_starttag, line 35
      Python Executable:
      Python Version: 2.7.3
      Python Path:

      [‘/usr/local/lib/python2.7/site-packages/pip-1.3.1-py2.7.egg’,
      ‘/usr/local/lib/python2.7/site-packages/distribute-0.6.48-py2.7.egg’,
      ‘/usr/local/lib/python27.zip’,
      ‘/usr/local/lib/python2.7’,
      ‘/usr/local/lib/python2.7/plat-linux2’,
      ‘/usr/local/lib/python2.7/lib-tk’,
      ‘/usr/local/lib/python2.7/lib-old’,
      ‘/usr/local/lib/python2.7/lib-dynload’,
      ‘/usr/local/lib/python2.7/site-packages’,
      ‘/var/www/apps’,
      ‘/var/www/apps’]

      Server time: چهارشنبه, 25 فوریه 2015 06:46:38 -0600

      Reply
  2. علی اسمعیلی

    من وب سایت خودمم با این انتقال دادم خدایی دستتون درد نکنه هم خیلی زود کارمو دارید راه میندازید هم خیلی خوب هست.
    http://ooxoo.ir
    این آدرسه 5 ماهی میشه که انتقالش دادم خیلی آسون انتقالش دادم از سایتم یکی از بهترین سایت های تکنولوژی هست اگه یه وقتی مصاحبه کردن ازم میگم یکی از مهمترین راز های موفقیتمون انتقال بود که اقا سعید میرزایی با اپلیکیشن خوبش کمک کرد 😀

    Reply
    1. admin Post author

      ایشالا. یه نسخه از مصاحبه تون رو هم برای من بفرستید. خوشحالم که به دردتون خورده.

      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.