Tuesday, July 21, 2015

Portfolio risk diversification with Spark & ProActive Workflows

Authors: Michael Benguigui  and Iyad Alshabani

In order to design a financial tool to diversificate risks, we describe the following specific spark jobs orchestrated by ProActive Scheduler. Since we need to process huge amounts of asset prices and achieve streaming k-means to keep models up to date, Spark (http://spark.apache.org/) is definitely the right technology. Firstly, MLlib (Spark’s scalable machine learning library https://spark.apache.org/mllib/) offers functionalities for streaming data mining. Secondly, Spark allows to work with advanced map/reduce instructions on RDDs (Resilient Distributed Dataset), i.e. partitioned collections of elements that can be operated in parallel. All these jobs required a powerful framework to be deployed, orchestrated and monitored, as provided by ProActive.


Spark streaming jobs 


Spark Cluster

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program). Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers to allocate resources across applications: Spark’s own standalone cluster manager, Mesos (http://mesos.apache.org/), YARN (http://fr.hortonworks.com/hadoop/yarn/). Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks for the executors to run.

ProActive Workflow, Scheduler & Resource Manager

Spark jobs are created and deployed using Proactive Workflows, offering task templatization to facilitate workflow elaboration. Indeed, Proactive Workflow & Scheduling is an integrated solution for deploying, executing and managing complex workflows in the cloud and on premise as well. All jobs are orchestrated by the ProActive Scheduler and the ProActive Resource Manager virtualizes and monitors  resources.
The ProActive Scheduler and Resource Manager are deployed in a way that monitors each Spark Cluster which, in its turn, monitors its applications. The ProActive scheduler is used as a meta scheduler of the set of Spark clusters. Indeed, each Spark cluster has its own Spark scheduler. ProActive then orchestrates the set of clusters and jobs as shown with the next figure.

Architecture of our application dedicated to diversificate risks

Spark Streaming jobs

We focus on the CAC40 stocks correlations analysis over short periods. Indeed, our spark streaming jobs require a batch duration parameter to be set. This parameter controls the time period in seconds, during which the job stream input data, before processing. The main Spark jobs of our financial tool are described as follows:
  • The first streaming job queries data from Yahoo! or Google web services (user choice), cleans data for the following jobs, and writes data in a shared directory. The batch duration is set by the user, and only successive distinct quotes are stored. For instance, considering a 10s batch duration, quotes will be queried during 10s before being processed and written in a shared directory.
  • A second Spark streaming job keeps up to date the correlation matrix coefficients (“Spearman” correlations in MLlib, against the commonly used “Pearson” correlations not applicable, if we consider the price processes are lognormally distributed as the Black & Scholes model states) by streaming the processed stock quotes from the previous job.
  • A specific Scala job is in charge of drawing a heatmap from a matrix. Applied to the estimated correlations, it depicts the constantly updated correlation coefficients with associated colors.

CAC40 correlation heatmap

  • Relying on KMeans algorithm (MLlib), another Spark streaming job clusterizes correlations to build a well-diversified portfolio. KMeans clustering aims to partition a set of observations into k clusters in which each observation belongs to the cluster with the nearest mean.
  • Here, the drawing job is used for the clusterized correlations representation: companies with similar correlations (considering as many features/correlations as stocks, i.e. 40) are assigned to the same cluster.

CAC40 clusterized correlation heatmap

We could use resulting correlations to simulate the underlying stocks of an option, and moreover through Monte Carlo simulations, the delta greek to delta hedge it (i.e. find positions on the market to reduce the exposition of the underlying stocks variations). For sure, all embarrassingly parallel jobs can benefit from the mapReduce paradigm offered by Spark. In the automated trading context, it could be exciting to let ProActive dynamically orchestrate different trading strategies: some streaming jobs would be specific to trading strategies, and other streaming jobs would be in charge of market data analysis. Proactive Workflow & Scheduling afford such dynamic orchestration.

Thursday, June 18, 2015

Execute your parallel workflows through a simple REST API

Coming versions of ProActive Workflows & Scheduling will offer a feature that will make the integration between its API and your business software an even simpler task, specially if you care only about your business and its parallelization.
We observed from some of our clients the need to maintain a clear isolation between the workflow development process, and the workflow execution process. It is just that users that deal with the core of the business often care (almost exclusively) about the workflow execution and its results. They usually submit the same workflow again and again, simply changing their inputs.
Our scheduler has been adapted for such use cases. Now you can create the workflow once (see how easy it is with our ProActive Studio after you created a free account here), and execute it with parameters through an intuitive API. No need to have the workflow on you, no need to even know there is a workflow that parallelizes the execution.

How to use such API?

So simple… Use our REST API as follows to execute workflows available in the Studio templates.
Using bash and curl, first you need to log in:
# Log in and keep the session ID somewhere
$ sessionid=`curl -k -d "username=admin&password=admin" http://localhost:8080/rest/scheduler/login`
Using the session ID, you can now list all the available templates on the server (same can be done to obtain the list of private workflows, but using /rest/studio/workflows instead of /rest/studio/templates):
# List all available templates (the ones created with the ProActive Studio will be available too)
$ curl -X GET -H "sessionid:$sessionid" http://localhost:8080/rest/studio/templates/
       "id": 2,
       "metadata": "...",
       "name": "Variable Propagation",
       "xml": "..."
       "id": 1,
       "metadata": "...",
       "name": "Pre-Post-Clean Scripts",
       "xml": "..."
Now, let’s say we are interested in submitting our template 1, you can get more details about it by doing the following:
# Choose one template and get information about it
curl -X GET -H "sessionid:$sessionid" http://localhost:8080/rest/studio/templates/1
Or even get its XML content:
# Choose one template and get its xml content
curl -X GET -H "sessionid:$sessionid" http://localhost:8080/rest/studio/templates/1/xml
Now you can execute the workflow from the scheduler providing through the header “Link” the workflow URL we have been using:
# Submit the workflow to the scheduler
curl -X POST -H "Link: http://localhost:8080/rest/studio/workflows/1/content" -H "sessionid:$sessionid" http://localhost:8080/rest/scheduler/jobs/
And that is all!!!
I know, it is just so simple.
Hope you use it!

Wednesday, June 3, 2015

Using ProActive Scheduler from a Python application

For a collaborative project we are involved in, we had to integrate with a Python application. Fortunately this is made easy by the REST API of ProActive Scheduler.
We hope this piece of code will prove useful in integrating your Python application with ProActive!

In Python, it turns out the requests library is very to use to build HTTP requests. For instance here is how to retrieve the version of the Scheduler:

import requests
r = requests.get("http://try.activeeon.com/rest/scheduler/version")

which outputs { "scheduler" : "6.1.0", "rest" : "6.1.0"}.
It is also very easy to manipulate JSON data with r.json() that returns a dictionary.

We put together a small project that should be considered more as an example rather than a full client for the Scheduler. It is available on Github:
  • example.py shows various interactions with the Scheduler API, logging in, submitting a job and waiting for it to finish.
  • scheduler.py contains the code to query the REST API

You can also take a look at our REST API documentation to see how it would be possible to implement more features in this small client.

Tuesday, May 19, 2015

Integrating services with Apache Camel

One of our clients came not long ago with a problem: a complex system with lots of information exchange between remote services… We simply did not know how to handle such situation in a neat and simple manner. We had to consider the different potential APIs and their protocols, what would be easier to integrate into existing blocks, clients implementations and their development status, maintenance… What was worse is that probably nobody would even understand the resulting architecture nor be able to maintain it easily. But after some head-scratching we found a lovely theoretical solution: messaging patterns, or, more precisely, Enterprise Integration Patterns (or EIPs for short).

Why are EIPs useful?

Usually enterprise environments are made of a bunch of services that must interact between them to provide another service, let’s say an integrated service. Thinking through this integration used to take lots of time and effort from engineers. But someone came up with an idea.
Yep, a great idea: well thought patterns to address these exact integration problems. EIP had just born.
EIPs are useful to avoid re-inventing the wheel by sticking to existing standards to produce better software in an simpler manner. Software that is more readable, more intuitive, more flexible, more documented.

More about EIP?

For EIP, information between different services has the shape of messages. If there are messages in the picture, there must be also producers and consumers. Along their way to the consumers, messages pass through different elements: queues, processors, routers, among others. These transform messages, route them, creates additional messages, etc. For instance this is how a route looks like.

This is a very simple example. There is a producer of messages, that will go through a translator element, and a router that will set their way to reach either Queue A or Queue B.
EIP establishes very intuitive patterns that allow interconnection of different services, from message producers until message consumers. Some popular known patterns are publish-subscribe, pipes and filters (or pipeline). There are some other lesser known patterns, for instance the very helpful Dead Letter Channel that specifies a way to treat messages that for some reason failed to be delivered.

Too theoretical, how do I proceed with my integration?

If you like making your own way through the jungle, you just need to read Enterprise Integration Patterns by Gregor Hohpe (2004) and implement whatever EIPs you need. Sticking to existing patterns is always a great idea for many reasons.
However, for those who like highways, there are is a very good framework I would like you to befriend. Camel: the reader. The reader: Camel.
How does Camel fit with my existing software? How could we use all this if our system already uses existent technologies? Adapting Camel to your scenario might not be as painful as you think.

What is really Camel?

Camel is an open source integration framework based on EIPs. Apart from providing the possibility to implement lots of EIP patterns, you can benefit from their more than hundred connector components, which makes it really easy for developers to integrate it to existing software.

Technologies supported by Camel?

More concretely, a component allows you to connect your EIP to specific endpoints technologies. For instance you use a local storage File component to generate one message per file in a given directory of your filesystem. It usually works two-ways for most of the components, which means that for instance using the same File component you can also dump messages into files in your filesystem.
Similarly, there are components for remote storage: FTP, FTPS, Dropbox and even S3 (Simple Storage Service) Amazon Services!
If planning to use queueing technologies, you’re not out of luck because Camel supports JMS and AMQP queues, and also AWS-SQS (Amazon Simple Queue Service).
Messages could be also generated from / dumped to items on different databases: SQL, Cassandra, Elasticsearch and CouchDB.
Some management connectors include JMX, SSH. Also there are some crazy connectors you might want to take a look at: Facebook (to access Facebook API), Docker (to communicate with Docker), Exec (for executing system commands).

What else?

Camel is an open source Apache project, developed in Java over Spring framework. Since its first commits in 2007, it is actively being improved in this repository by a growing community (now including me!). Don’t miss the chance to give it a try!

Happy integration!

Wednesday, April 29, 2015

ProActive and Docker: schedule and orchestrate Docker with ProActive and Docker Compose + variable substitution

ProActive workflows and orchestration

The ProActive Scheduler is an open source software to orchestrate tasks in your hybrid cloud infrastructure. With an easy to use GUI and an easy setup on master and nodes (just executing one sh file or why not getting it in containers directly) it fits very well into the Docker ecosystem, which I find very user friendly.

Getting Docker Compose support into ProActive

The ProActive software uses the concept of JSR 223 script engines. I used that fact to implement a Docker Compose executing JSR 223 compatible script engine, which now drives the Docker Compose support. To add Docker Compose support simply add a jar file to your ProActive installation. You can find the Docker Compose script engine here, but don't worry it is already inside the containers we will use in this walk-through.

Running ProActive

You need to have docker 1.6  and Docker Compose installed, if not: follow the installation guides for docker and Docker Compose.

Run ProActive with Docker Compose! I prepared a yaml file (download it here):
image: tobwiens/proactive-scheduler
    - "8080:8080"
    - "64738"
  image: tobwiens/proactive-node
  command: -r pnp://proactiveScheduler:64738
    - proactiveScheduler
    - /var/run/docker.sock:/var/run/docker.sock

The ‘proactiveScheduler’ start the ProActive master with a web interface which will be accessible on port 8080. Additionally it exposes the port 64738, to which the ProActive node will connect. The scheduler manages the nodes and keeps track of the tasks.
The node will execute and monitor tasks, so we need at least one node to execute our Docker Compose tasks.
The ‘proactiveNode’ is linked to the proactiveScheduler and connects with the command ‘-r pnp://proactiveScheduler:64738’; check out the node's and scheduler's dockerfile.
The docker.sock is mounted inside the container to execute containers on your local docker daemon.
Now: Execute the yaml file!

Executing a Docker Compose script with the ProActive GUI

After executing the yaml file wait until the web interface is started and go to and log in with the standard account (admin/admin). Click on create, to create a new workflow:

After creating your first workflow click on open (green button):

Now that the workflow is open: import the Docker Compose task by clicking import; download it here.
Note: The Docker Compose script engine is not selectable by the GUI, which will change soon, so for now, the xml files have to be edited in a text editor and imported.
More details: The GUI checks whether the script engine is valid as soon as the script is edited, that will remove the language="docker-compose" in the xml file.

After importing you will see a task which contains a Docker Compose script: 
  image: dockerfile/ubuntu
  command: /bin/bash -c 'sleep 5 && echo It worked!!!!'
It executes (in the dockerfile/ubuntu image), which will print "It worked!!!!" on the screen:
sleep 5 && echo It worked!!!!
 Note: I added the ->sleep 5<- because the echo command returned so fast that Docker Compose tried to attach to the container which had already stopped, that cause it do wait forever.

After clicking execute got to to see the progress of the Docker Compose script. Select the submitted job and you will see:

After the job is finished you can access the output by clicking the ‘Fetch output’ button in the Output tab:
Not glamorous but we can see the ‘It worked!!!’ in the line ‘[36mubuntuEcho_1 | [0mIt worked!!!’

Variables in Docker Compose workflows 

The new yaml file inside the task is:
  image: dockerfile/ubuntu
  command: /bin/bash -c 'sleep 5 && echo "$output_variable" '
The output of the echo has been replace with a variable: $output_variable.

  1. Create a new workflow 
  2. import the xml file (download it here). 
You should see a variable called output_variable in the 'Job Variables' tab:

Execute the workflow and check the output in the interface. The variable has been replaced and the the echo will now output the content of the variable.