Thursday, September 29, 2016

How to accelerate your R algorithm with ProActive?

When analyzing sets of data, getting “instant” insight will often be critical for business decision. Which is why one may want to allocate all his/her resources on a set of tasks delivering analysis for stakeholders. Here is an example of a Twitter feed analysis using R (statistical oriented language) and ProActive. It gathers messages, then performs distributed analysis on distinct computing nodes.

How it works

In selected cities, this workflow will search Twitter feeds containing a chosen set of keywords and return a chosen number of tweets. (Due to restrictions from Twitter, the number of towns cannot be greater than 15: “Rate limits are divided into 15 minute intervals [...] 15 calls every 15 minutes.”, https://dev.twitter.com/rest/public/rate-limiting)

ProActive first creates as many tasks as the number of cities then run them in parallel using ProActive nodes. In those R-based tasks, each node will:

  • connect to Twitter,
  • perform the desired search,
  • analyze the information,
  • store the result in the ProActive dataspace (a shared storage environment adapted to distributed systems).
A last task analyzes all the data before storing them in the above-mentioned dataspace.


Let's code

The workflow can be found here but require some pre-configuration (see initial requirements) otherwise please follow the step by step tutorial below.

Setup

The initial requirements for this demo are:

  • Twitter application and valid authentication key and token,
  • ProActive fully installed on deployed (scheduler+nodes),
  • the required R packages on every nodes.
First drag&drop a “replicate”control, then set the tasks’ languages to R language.

Create the workflow variables called “KeyWord”, “NumberofSearch”, and “NumberofTweets”, using the “Variables” tab at workflow level.

Init

Add the following code to the “Split” task, where the cities are set

lats <- c(38.9, 40.7, 37.8, 39, 37.4, 28, 30, 42.4, 48, 36, 32.3, 33.5, 34.7, 33.8, 37.2)
lons <- c(-77, -74, -122, -105.5, -122, -82.5, -98, -71, -122, -115, -86.3, -112, -92.3, -84.4, -93.3)
cities <- c("DC", "New York", "San Francisco", "Colorado", "Mountain View", "Tampa", "Austin", "Boston", "Seattle", "Vegas", "Montgomery", "Phoenix", "Little Rock", "Atlanta", "Springfield")
result <- list(lats, lons, cities)

Other cities can be used as long as latitudes and longitudes are known.

In the replicate control, write the JavaScript code below (or write the same function in your favorite language). A replicate control flow action define how many parallel runs will be executed, by defining the variable runs.

runs = variables.get("NumberofSearch")

Replicated task: actual analysis

Then in the “Process” task, you can use this commented code that will: Take the list from “Split” (as a data frame), said list was passed as the “result” variable and as such is available for the tasks right after (here, all the “Process”s tasks) as results.

Locations <- data.frame( lats=unlist(results[[1]][1]), lons=unlist(results[[1]][2]), cities=unlist(results[[1]][3]) )

Import the necessary packages.

Sys.setlocale("LC_NUMERIC", "C") #Without this line, searchTwitter will fail with code 404.
library(twitteR)
library(stringr)
library(tm)
library(wordcloud)
library(textreg)

Connect to Twitter, you need to replace the XXXs with your own key and token.

consumer_key_api <- "XXXXXXXXXX"
consumer_secret_api <- "XXXXXXXXXX"
token <- "XXXXXXXXXX"
token_secret <- "XXXXXXXXXX"
options(httr_oauth_cache=F) #Without this line, R will stop here.
setup_twitter_oauth(consumer_key_api, consumer_secret_api, token , token_secret)

Search, using the coordinates of a town and a radius of 300 km. The radius can be changed easily.

Tweets <- searchTwitter(toString(variables[["KeyWord"]]) , n=variables[["NumberofTweets"]], lang='en', geocode = paste(Locations$lats[variables[["PA_TASK_REPLICATION"]]+1], Locations$lons[variables[["PA_TASK_REPLICATION"]]+1], paste0(300,"km"), sep=",") )

Take only the text from the tweets.

Tweetstext <-sapply(Tweets, function(x) x$getText())
Tweetstext <-unlist(Tweetstext)

Prepare the text to be usable.

corpus <- Corpus(VectorSource(Tweetstext))
#To get rid of all special characters.
corpus <- tm_map(corpus, str_replace_all,"[^[:alnum:]]", " ")
corpus <- tm_map(corpus, removePunctuation)
corpus <- tm_map(corpus, removeWords, stopwords("english"))
corpus <- tm_map(corpus, removeWords, c("http", "https", "amp"))
corpus <- tm_map(corpus, PlainTextDocument)

Plot a wordcloud from processed tweets.

col <- brewer.pal(6,"Dark2")
layout(matrix(c(1, 2), nrow=2), heights=c(1, 4))
par(mar=rep(0, 4))
plot.new()
text(x=0.5, y=0.5, Locations$cities[variables[["PA_TASK_REPLICATION"]]+1], col= "blue")
wordcloud(corpus, scale=c(5,1),rot.per = 0.25, random.color=T, max.word=45, random.order=F, colors=col, main="Title")

Save the file in dataspace.

dev.copy(png,paste(Locations$cities[variables[["PA_TASK_REPLICATION"]]+1], '.png', sep=""))
dev.off()

Send processed tweets to final task for global analysis.

corpus <- convert.tm.to.character(corpus)
result <- corpus

You might want to add a “Sys.sleep” here to read the wordclouds (they are displayed on their host).

The “Data Management” tab of the task should include *.png using the “transferToUserSpace” Access Mode.

Last task: to wrap it up

In order to analyze the whole set of data, use this code in the “Merge” task:

library(wordcloud)
library(tm)

#Merge all the results.
corpus <- c()
for (i in 1:variables[["Number_of_search"]])
corpus <- c(corpus, results[[i]])

corpus <- Corpus(VectorSource(corpus))

#Plot with Title.
col <- brewer.pal(6,"Dark2")
layout(matrix(c(1, 2), nrow=2), heights=c(1, 4))
par(mar=rep(0, 4))
plot.new()
text(x=0.5, y=0.5, "Main", col= "red")
wordcloud(corpus, scale=c(5,2),rot.per = 0.25, random.color=T, max.word=45, random.order=F, colors=col, main="Title")

#Save to file in dataspace.
dev.copy(png, "Main.png")
dev.off()

Set the “Data Management” tab the same way as the Process task’s tab and you’re done.

The wordclouds can be found in PROACTIVE_HOME/data/defaultuser/USER (or where you set your user space).

Monday, August 29, 2016

Deploy a ProActive cluster using Docker

Deploy a ProActive cluster using Docker


The goal of this blog post is to have an overview of how easy it is to deploy a ProActive cluster and what are the benefits you can gain from it.


Docker containers are  a great way to deploy and re-deploy a pre-configured ProActive cluster quickly.





As you can see on the figure above, a ProActive cluster is composed of three different components:
  • the ProActive Scheduler Server
  • the database
  • the ProActive Nodes
So, all these components will be set up in Docker containers thanks to a Containers Orchestrator.





First of all, we suppose that you have several hosts for Docker containers, and that you use a orchestrator for your Docker containers.  For instance, it could be Swarm, Mesos, Kubernetes. So you have a way to abstract network in your cluster thanks to Docker network overlay if you use Swarm (1), any Mesos network (2) or Kubernetes network (3).
The protocol that will be used for communication between ProActive Nodes and the ProActive Scheduler Server will be by default PNP but there are several other protocols available, PNPS, PAMR (ProActive Message Routing)


Once you are sure that you have a network that allow your containers to ping each other across hosts, you can, at first, run your Database container. Obviously, you can save data thanks to Docker Volumes and configure users for the Scheduler Server at the Runtime.


The second step is to launch the Scheduler Server Container and link it to the Database Container. If you access the Web UI on the Docker Host, thanks to the port redirection, you can notice that there is no Node running in the Resource Manager portal. This is the normal behaviour, indeed, our goal is to have Nodes running in others containers.


Finally, the last step is the deployments of the Nodes. You just have to configure them to connect to the Resource Manager and chose how many workers you want per Node. You can launch as many Nodes as you want, on as many hosts as you want.


Obviously, you can also keep data and logs for Nodes and Scheduler Server, in Docker Volumes.


Once everything is running, you can write some Workflows, execute them, look on which nodes these are executed.


And here you are, now, you have an entire cluster ready to execute some jobs and enjoy all the benefits of our generic Scheduler which allow you to run Spark jobs, MapReduce jobs, ETL processes…

Friday, August 26, 2016

Submitting ProActive Workflows with Linux cURL


Using the cURL in a linux command line (bash) is a very convenient and resource efficient way to submit workflows.
We need to login and store the current session id with the command:

sessionid=$(curl --data 'username=admin&password=admin' \ http://localhost:8080/rest/scheduler/login)


One can login with curl using username and password as header parameter, transmitted with -H. The result is written into the sessionid variable. The session id can be displayed with echo $sessionid.


Workflows can be submitted with cURL:

curl --header "sessionid:$sessionid" \
 --form \ "file=@filename.xml;type=application/xml" \
 http://trydev.activeeon.com:8080/rest/scheduler/submit




The session id variable is inserted into the header and the @ notation allows to send files directly to the server.


Advanced: Workflow Submission with Variables



Workflow variables can be send in the submission URL. Those variables will be replaced.

curl --header "sessionid:$sessionid" \
 --form "file=@file.xml;type=application/xml" \
 “http://trydev.activeeon.com:8080/rest/scheduler/submit;variable1=test1;variable2=test2”

Important: the URL is now embedded in double quotes "", only then the matrix parameters are properly transferred. Variables are separated by semicolon ;

Wednesday, November 25, 2015

ProActive in Docker containers

ProActive Workflows & Scheduling 7.0.0 is available on the Docker hub, now!

How to run ProActive Workflows & Scheduling inside Docker

1) Have Docker and Docker Compose installed
2) Get the ProActive Docker Compose file here
3) Save the ProActive Docker Compose file as docker-compose.yml
4) Run "docker-compose up" in the same directory as the ProActive Docker Compose file
5) Wait  for the "*** Get started at http:/[IP]:8080 ***" message
6) Browse to http://[IP]:8080
7) Login with default login: admin:admin


Get 10 days free support: registering through activeeon.com 

Be part of our open source community on github






Tuesday, November 24, 2015

Add Docker to your legacy systems with ProActive

ProActive Workflows & Scheduling 7.0 is out! Which has an exciting new feature, Docker containers are supported inside Workflows.

tl;dr:
Docker container support > now < in ProActive Workflows & Scheduling 7.0. It combines the strength of supporting, keeping and combining legacy systems with the newly added Docker containers support. That enables everyone to combine legacy systems with new cutting edge technology/systems.
Get access through:
containers.activeeon.com

What is ProActive Workflows & Scheduling?

Quick introduction to our:

  • Workflow based Scheduler
  • Resource Manager
  • Web interfaces

Quick introduction

Studio
Our Workflow Scheduler allows to compose bigger tasks out of smaller components, by creating Workflows. Workflows consist out of many languages and can contain services and batch tasks. So they provide a lot of freedom in solving problems, collecting and analyzing data and much more. The best part about it is, it is automatically distributed, but more about that in the Scheduler section.


Scheduler
The Scheduler knows all about jobs and tasks, a ProActive Workflow is represented as a job and the Workflow's building blocks are called tasks. It communicates with the Resource Manager to distribute all the work. Resources are maximally utilized because the Scheduler knows what needs to go first for an efficient resource utilization, while not breaking the constraints of the Workflows. So your workload is automatically distributed and the advanced fault tolerance ensures error-recovery after hardware and software failures.


Resource Manager
The Resource Manager knows all resources and everything about them. Resources can be removed and added at runtime, ProActive’s fault tolerance will ensure proper execution of Workflows even if nodes are removed or fail. Best about it is: it is very easy to scale, just one command adds or removes new resources.


Many more features

ProActive Workflows & Scheduling has many more features than introduced in the speedy introduction. If you are interested in something particular, contact us or visit our website.


Legacy systems? - ProActive has you covered

ProActive Workflows & Scheduling is a perfect fit with your legacy system. We at Activeeon have many years of experience with bringing legacy systems together and adding new functionality to the mix. Legacy systems can be re-written inside the ProActive Studio or can be accessed inside Workflows, without touching them. Due to the flexibility of Workflows, new systems can be added just fine.

Working many years with customers brought a wide support of languages for our Studio. Including:

  • Python
  • Bash
  • R
  • Java
  • Ruby
  • And more....

Our ProActive Workflows allow you to combine all types of languages in one Workflow, to reach your goal faster, and without changing your working legacy code.

If you are familiar with High Performance Computing, you might know:

  • SLURM
  • IBM Platform LSF
  • PBS Pro

We connect all of them and combine access in our single web interface.
If you have worked with legacy systems, you know that you better keep them running as long as they do, because changes can be expensive.
But Docker has shaken the IT world and made many things easier and faster than before, and now, with ProActive Workflows & Scheduling you can use the newest technologies next to your legacy systems.

Docker support in Workflows & Scheduling

Finally the day arrived to announce Docker support inside ProActive Workflows & Scheduling.
It gives you many advantages:

  • Advanced ProActive Workflows & Scheduling fault tolerance for your Docker containers
  • Run Docker and legacy systems without updating existing code
  • Improve your resource utilization significantly

ProActive Workflows & Scheduling brings advanced fault tolerance features to Docker containers:

  • Resilient to hardware failures
  • Resilient to software errors

Re-execute your Docker containers automatically on different hardware if one or more fail.
If your software experiences errors, re-execute X times in Y different environments.


Run Docker containers alongside legacy systems:

  • Integrate Docker containers in your legacy system mix
  • Don't touch any legacy system
  • ProActive Workflows & Scheduling combines legacy and new systems

Improve your resource utilization significantly
Docker containers, in a distributed environment, need efficient resource management, otherwise machines will be over- or under-utilized.

ProActive Workflows & Scheduling has an effective resource management which reaches high efficiency, and scales your infrastructure according to the current demand.

Want to have a try?

Open source and free!
No installation required, as a web-app or download at:
containers.activeeon.com






Tuesday, July 21, 2015

Portfolio risk diversification with Spark & ProActive Workflows


Authors: Michael Benguigui  and Iyad Alshabani


In order to design a financial tool to diversificate risks, we describe the following specific spark jobs orchestrated by ProActive Scheduler. Since we need to process huge amounts of asset prices and achieve streaming k-means to keep models up to date, Spark (http://spark.apache.org/) is definitely the right technology. Firstly, MLlib (Spark’s scalable machine learning library https://spark.apache.org/mllib/) offers functionalities for streaming data mining. Secondly, Spark allows to work with advanced map/reduce instructions on RDDs (Resilient Distributed Dataset), i.e. partitioned collections of elements that can be operated in parallel. All these jobs required a powerful framework to be deployed, orchestrated and monitored, as provided by ProActive.

 

Spark streaming jobs 


 

Spark Cluster

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program). Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers to allocate resources across applications: Spark’s own standalone cluster manager, Mesos (http://mesos.apache.org/), YARN (http://fr.hortonworks.com/hadoop/yarn/). Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks for the executors to run.


ProActive Workflow, Scheduler & Resource Manager

Spark jobs are created and deployed using Proactive Workflows, offering task templatization to facilitate workflow elaboration. Indeed, Proactive Workflow & Scheduling is an integrated solution for deploying, executing and managing complex workflows in the cloud and on premise as well. All jobs are orchestrated by the ProActive Scheduler and the ProActive Resource Manager virtualizes and monitors  resources.
The ProActive Scheduler and Resource Manager are deployed in a way that monitors each Spark Cluster which, in its turn, monitors its applications. The ProActive scheduler is used as a meta scheduler of the set of Spark clusters. Indeed, each Spark cluster has its own Spark scheduler. ProActive then orchestrates the set of clusters and jobs as shown with the next figure.

 
 
Architecture of our application dedicated to diversificate risks
 
 

Spark Streaming jobs

We focus on the CAC40 stocks correlations analysis over short periods. Indeed, our spark streaming jobs require a batch duration parameter to be set. This parameter controls the time period in seconds, during which the job stream input data, before processing. The main Spark jobs of our financial tool are described as follows:
  • The first streaming job queries data from Yahoo! or Google web services (user choice), cleans data for the following jobs, and writes data in a shared directory. The batch duration is set by the user, and only successive distinct quotes are stored. For instance, considering a 10s batch duration, quotes will be queried during 10s before being processed and written in a shared directory.
  • A second Spark streaming job keeps up to date the correlation matrix coefficients (“Spearman” correlations in MLlib, against the commonly used “Pearson” correlations not applicable, if we consider the price processes are lognormally distributed as the Black & Scholes model states) by streaming the processed stock quotes from the previous job.
  • A specific Scala job is in charge of drawing a heatmap from a matrix. Applied to the estimated correlations, it depicts the constantly updated correlation coefficients with associated colors.


CAC40 correlation heatmap


  • Relying on KMeans algorithm (MLlib), another Spark streaming job clusterizes correlations to build a well-diversified portfolio. KMeans clustering aims to partition a set of observations into k clusters in which each observation belongs to the cluster with the nearest mean.
  • Here, the drawing job is used for the clusterized correlations representation: companies with similar correlations (considering as many features/correlations as stocks, i.e. 40) are assigned to the same cluster.


CAC40 clusterized correlation heatmap


Perspectives
We could use resulting correlations to simulate the underlying stocks of an option, and moreover through Monte Carlo simulations, the delta greek to delta hedge it (i.e. find positions on the market to reduce the exposition of the underlying stocks variations). For sure, all embarrassingly parallel jobs can benefit from the mapReduce paradigm offered by Spark. In the automated trading context, it could be interesting to let ProActive automatically deploy such application involving specialized streaming spark jobs, and let ProActive dynamically orchestrate them: some streaming jobs would be specific to trading strategies, and other streaming jobs would be in charge of market data analysis. Proactive Workflow & Scheduling afford such dynamic deployment and orchestration, in a fully scalable way.

Thursday, June 18, 2015

Execute your parallel workflows through a simple REST API

Coming versions of ProActive Workflows & Scheduling will offer a feature that will make the integration between its API and your business software an even simpler task, specially if you care only about your business and its parallelization.
We observed from some of our clients the need to maintain a clear isolation between the workflow development process, and the workflow execution process. It is just that users that deal with the core of the business often care (almost exclusively) about the workflow execution and its results. They usually submit the same workflow again and again, simply changing their inputs.
Our scheduler has been adapted for such use cases. Now you can create the workflow once (see how easy it is with our ProActive Studio after you created a free account here), and execute it with parameters through an intuitive API. No need to have the workflow on you, no need to even know there is a workflow that parallelizes the execution.

How to use such API?

So simple… Use our REST API as follows to execute workflows available in the Studio templates.
Using bash and curl, first you need to log in:
# Log in and keep the session ID somewhere
$ sessionid=`curl -k -d "username=admin&password=admin" http://localhost:8080/rest/scheduler/login`
Using the session ID, you can now list all the available templates on the server (same can be done to obtain the list of private workflows, but using /rest/studio/workflows instead of /rest/studio/templates):
# List all available templates (the ones created with the ProActive Studio will be available too)
$ curl -X GET -H "sessionid:$sessionid" http://localhost:8080/rest/studio/templates/
RESPONSE:
[
   {
       "id": 2,
       "metadata": "...",
       "name": "Variable Propagation",
       "xml": "..."
   },
   {
       "id": 1,
       "metadata": "...",
       "name": "Pre-Post-Clean Scripts",
       "xml": "..."
   }
]
Now, let’s say we are interested in submitting our template 1, you can get more details about it by doing the following:
# Choose one template and get information about it
curl -X GET -H "sessionid:$sessionid" http://localhost:8080/rest/studio/templates/1
Or even get its XML content:
# Choose one template and get its xml content
curl -X GET -H "sessionid:$sessionid" http://localhost:8080/rest/studio/templates/1/xml
Now you can execute the workflow from the scheduler providing through the header “Link” the workflow URL we have been using:
# Submit the workflow to the scheduler
curl -X POST -H "Link: http://localhost:8080/rest/studio/workflows/1/content" -H "sessionid:$sessionid" http://localhost:8080/rest/scheduler/jobs/
And that is all!!!
Marilyn_Monroe_in_Gentlemen_Prefer_Blondes_trailer.jpg
I know, it is just so simple.
Hope you use it!