Thursday, September 29, 2016

How to accelerate your R algorithm with ProActive?

When analyzing sets of data, getting “instant” insight will often be critical for business decision. Which is why one may want to allocate all his/her resources on a set of tasks delivering analysis for stakeholders. Here is an example of a Twitter feed analysis using R (statistical oriented language) and ProActive. It gathers messages, then performs distributed analysis on distinct computing nodes.

How it works

In selected cities, this workflow will search Twitter feeds containing a chosen set of keywords and return a chosen number of tweets. (Due to restrictions from Twitter, the number of towns cannot be greater than 15: “Rate limits are divided into 15 minute intervals [...] 15 calls every 15 minutes.”,

ProActive first creates as many tasks as the number of cities then run them in parallel using ProActive nodes. In those R-based tasks, each node will:

  • connect to Twitter,
  • perform the desired search,
  • analyze the information,
  • store the result in the ProActive dataspace (a shared storage environment adapted to distributed systems).
A last task analyzes all the data before storing them in the above-mentioned dataspace.

Let's code

The workflow can be found here but require some pre-configuration (see initial requirements) otherwise please follow the step by step tutorial below.


The initial requirements for this demo are:

  • Twitter application and valid authentication key and token,
  • ProActive fully installed on deployed (scheduler+nodes),
  • the required R packages on every nodes.
First drag&drop a “replicate”control, then set the tasks’ languages to R language.

Create the workflow variables called “KeyWord”, “NumberofSearch”, and “NumberofTweets”, using the “Variables” tab at workflow level.


Add the following code to the “Split” task, where the cities are set

lats <- c(38.9, 40.7, 37.8, 39, 37.4, 28, 30, 42.4, 48, 36, 32.3, 33.5, 34.7, 33.8, 37.2)
lons <- c(-77, -74, -122, -105.5, -122, -82.5, -98, -71, -122, -115, -86.3, -112, -92.3, -84.4, -93.3)
cities <- c("DC", "New York", "San Francisco", "Colorado", "Mountain View", "Tampa", "Austin", "Boston", "Seattle", "Vegas", "Montgomery", "Phoenix", "Little Rock", "Atlanta", "Springfield")
result <- list(lats, lons, cities)

Other cities can be used as long as latitudes and longitudes are known.

In the replicate control, write the JavaScript code below (or write the same function in your favorite language). A replicate control flow action define how many parallel runs will be executed, by defining the variable runs.

runs = variables.get("NumberofSearch")

Replicated task: actual analysis

Then in the “Process” task, you can use this commented code that will: Take the list from “Split” (as a data frame), said list was passed as the “result” variable and as such is available for the tasks right after (here, all the “Process”s tasks) as results.

Locations <- data.frame( lats=unlist(results[[1]][1]), lons=unlist(results[[1]][2]), cities=unlist(results[[1]][3]) )

Import the necessary packages.

Sys.setlocale("LC_NUMERIC", "C") #Without this line, searchTwitter will fail with code 404.

Connect to Twitter, you need to replace the XXXs with your own key and token.

consumer_key_api <- "XXXXXXXXXX"
consumer_secret_api <- "XXXXXXXXXX"
token <- "XXXXXXXXXX"
token_secret <- "XXXXXXXXXX"
options(httr_oauth_cache=F) #Without this line, R will stop here.
setup_twitter_oauth(consumer_key_api, consumer_secret_api, token , token_secret)

Search, using the coordinates of a town and a radius of 300 km. The radius can be changed easily.

Tweets <- searchTwitter(toString(variables[["KeyWord"]]) , n=variables[["NumberofTweets"]], lang='en', geocode = paste(Locations$lats[variables[["PA_TASK_REPLICATION"]]+1], Locations$lons[variables[["PA_TASK_REPLICATION"]]+1], paste0(300,"km"), sep=",") )

Take only the text from the tweets.

Tweetstext <-sapply(Tweets, function(x) x$getText())
Tweetstext <-unlist(Tweetstext)

Prepare the text to be usable.

corpus <- Corpus(VectorSource(Tweetstext))
#To get rid of all special characters.
corpus <- tm_map(corpus, str_replace_all,"[^[:alnum:]]", " ")
corpus <- tm_map(corpus, removePunctuation)
corpus <- tm_map(corpus, removeWords, stopwords("english"))
corpus <- tm_map(corpus, removeWords, c("http", "https", "amp"))
corpus <- tm_map(corpus, PlainTextDocument)

Plot a wordcloud from processed tweets.

col <- brewer.pal(6,"Dark2")
layout(matrix(c(1, 2), nrow=2), heights=c(1, 4))
par(mar=rep(0, 4))
text(x=0.5, y=0.5, Locations$cities[variables[["PA_TASK_REPLICATION"]]+1], col= "blue")
wordcloud(corpus, scale=c(5,1),rot.per = 0.25, random.color=T, max.word=45, random.order=F, colors=col, main="Title")

Save the file in dataspace.

dev.copy(png,paste(Locations$cities[variables[["PA_TASK_REPLICATION"]]+1], '.png', sep=""))

Send processed tweets to final task for global analysis.

corpus <-
result <- corpus

You might want to add a “Sys.sleep” here to read the wordclouds (they are displayed on their host).

The “Data Management” tab of the task should include *.png using the “transferToUserSpace” Access Mode.

Last task: to wrap it up

In order to analyze the whole set of data, use this code in the “Merge” task:


#Merge all the results.
corpus <- c()
for (i in 1:variables[["Number_of_search"]])
corpus <- c(corpus, results[[i]])

corpus <- Corpus(VectorSource(corpus))

#Plot with Title.
col <- brewer.pal(6,"Dark2")
layout(matrix(c(1, 2), nrow=2), heights=c(1, 4))
par(mar=rep(0, 4))
text(x=0.5, y=0.5, "Main", col= "red")
wordcloud(corpus, scale=c(5,2),rot.per = 0.25, random.color=T, max.word=45, random.order=F, colors=col, main="Title")

#Save to file in dataspace.
dev.copy(png, "Main.png")

Set the “Data Management” tab the same way as the Process task’s tab and you’re done.

The wordclouds can be found in PROACTIVE_HOME/data/defaultuser/USER (or where you set your user space).

No comments:

Post a Comment