Loading [MathJax]/jax/element/mml/optable/BasicLatin.js

mathjax

Friday, February 8, 2013

Statistical Methods for Performance Engineers

A common use case for performance engineers is to determine the effect of a system change relative to an established baseline. If the effect is large and obvious then our work here is done. More often than not, however, effects are not obvious and we are left wondering whether to endorse the change or not. Statistical methods were, in essence, developed to enable us to make statements regarding the behavior of a system (population) given a small glimpse (sample) of how it behaved at one time or another.
Consider 1,000 response time samples collected over an extended period from our web application. This constitutes our performance baseline. We make changes to our application and conduct a quick test, collecting 100 response time samples. The histograms below show how the response times are distributed in each case. The new configuration has some outliers at the higher end of the scale but a narrower distribution around the peak than the baseline. To compare the distributions more equitably our strategy is to repeatedly re-sample the baseline data, calculating the mean of each sample. By the Central Limit Theorem (CLT), this so-called sampling distribution of the sample mean will approach a Gaussian distribution as more and more samples are taken from the baseline data.




As per the CLT, the resulting Gaussian will have the same mean as the original distribution (our baseline data) and variance equal to the original divided by the sample size. In this case our sample size is limited by the size of the test sample i.e. 100. The sequence of graphs below show how our Gaussian distribution forms as the number of bootstrap samples increases, converging on a mean (\mu) of 0.767 and standard deviation (\sigma) 0.0146.




Gaussian in hand, we can leverage the properties of this well-known distribution to make statements regarding our system. The curve can be used to determine the likelihood of our baseline system of generating samples that fall within a specific range. In this case the test mean is 0.793, higher than the baseline mean, so we want to determine the probability of baseline samples being greater than 0.793. In the figure below, this accounts for the shaded area under the right tail which can be calculated using Z-tables or widgets like this. In our case, this area amounts to 0.0367 (3.7%), which is interpreted as the probability that our baseline system would produce a sample like the test. This is very weak support for the notion that our changes have not regressed system performance.

Loosely speaking, we have conducted a one-sided hypothesis test and the principle applies equally for test samples lower than the baseline mean. In such a test we make the null-hypothesis that there has been no change to the system and then determine the strength of evidence that our test results would occur by chance alone i.e. no change to the system. If this support falls below a selected threshold (5% is common), we reject the null-hypothesis.



If one wanted to improve the accuracy of our estimates above, we would need to decrease the variance of our distribution of the sample mean. In the Bayesian sense, this variance represents the uncertainty in our estimate of the actual mean from the data we have available. For the most accurate estimate, we should maximize the sample size since in the sampling distribution of the sample mean, variance is proportional to the inverse of the sample size. Therefore, in order to improve the accuracy of our estimates, we need to collect more test data with the new configuration. Intuitively this should make sense.

Below we show how the variance of the sampling distribution of the sample mean decreases with sample size. These graphs illustrate location and shape only and probabilities (y-axis) should be ignored.





So there you have it: although we do not have a definitive answer regarding the effect of our changes to the system, we have made some steps towards quantifying our uncertainty. This might not seem like a lot to engineers accustomed to micrometers and 3-point precision but it's a fundamental step towards controlling a given process. Uncertainty is inevitable in any system we choose to measure and an important qualifier for any actionable metrics. Ask Nate Silver.

Notes:
In this example we used web server response time as a metric but this analysis applies equally to any system metric. Data representing our two systems was collected by pointing curl at two different websites:

for i in {1..1000}; do curl -o /dev/null -s -w "%{time_total}\n" http://mybaselineurl.com
for i in {1..100}; do curl -o /dev/null -s -w "%{time_total}\n" http://mynewurl.com
view raw gistfile1.sh hosted with ❤ by GitHub
The python code for this example is available as an IPython notebook here

Friday, October 12, 2012

Where did my Tweet go ? Tracking retweets through the social graph.


D3 visualization of a tweet propagation for @OReillyMedia


Everybody tweets these days but not all tweets are created equal, and  neither are tweeters for that matter. One of the truest measures of a tweet and, by proxy, the tweeter, is how far into the social-graph the message propagates via retweeting. Retweeting is an explicit indication of engagement on the part of the retweeter and in most cases, an endorsement of the message and the original author. That all said, it is an interesting exercise to see where in the social-graph a particular tweet reached. The Twitter API makes it easy to tell how many times and by whom a message was retweeted but it takes a bit more legwork to determine the path taken to the recipients.

A simple method to follow the propagation of a tweet is to do a breadth-first traversal of followers links, starting at the message author, until all retweeters have been accounted for. Obviously there are some assumptions wrapped up in this methodology but for the most part evidence supports the results. The Python script below performs this walk through the social graph. For economy against the Twitter API, the script caches follower lists in a Redis server so that they may be re-used for subsequent runs. This scheme works best when examining tweets which are closely related and incorporate many of the same Twitter users.

For visualization purposes, the Python script outputs a JSON file for consumption by a D3 force-directed graph template. D3 expects nodes and links enumerated in separate lists, the link elements making reference to the node elements via node list indices. A sample graph is shown above, visualizing  the path of a tweet from @OReillyMedia. Twitter users are indicated by their avatars and a grey circle with radius proportional to the logarithm of the number of followers. The originator of the message is indicated with a red circle. The graph title gives the text of the tweet, the overall retweet count, and the number of users reached by the message (sum of everyone's followers).

While the ability to gather broad insight with this method is limited by Twitter API rate controls, it could be used to do a focused study on a specific Twitter user, looking for prominent social-graph pathways and individuals that warrant reciprocation. Failing that, the D3 transitions as the graph builds and stabilizes makes fascinating viewing.


#!/usr/bin/python -u
#
# Usage: ./trace.py <tweetId>
#
import sys
import tweepy
import Queue
import time
import json
import redis
CONSUMER_KEY = 'xxx'
CONSUMER_SECRET = 'xxx'
ACCESS_KEY = 'xxx'
ACCESS_SECRET = 'xxx'
REDIS_FOLLOWERS_KEY = "followers:%s"
# Retweeter who have not yet been connected to the social graph
unconnected = {}
# Retweeters connected to the social graph...become seeds for deeper search
connected = Queue.Queue()
# Social graph
links = []
nodes = []
#----------------------------------------
def addUserToSocialGraph (parent, child):
# parent: tweepy.models.User
# child: tweepy.models.User
#----------------------------------------
global links;
if (child):
nodes.append ({'id':child.id,
'screen_name':child.screen_name,
'followers_count':child.followers_count,
'profile_image_url':child.profile_image_url})
# TODO: Find child and parent indices in nodes in order to create the links
if (parent):
print (nodes)
print ("Adding to socialgraph: %s ==> %s" % (parent.screen_name, child.screen_name))
links.append ({'source':getNodeIndex (parent),
'target':getNodeIndex (child)})
#----------------------------------------
def getNodeIndex (user):
# node: tweepy.models.User
#----------------------------------------
global nodes
for i in range(len(nodes)):
if (user.id == nodes[i]["id"]):
return i
return -1
#----------------------------------------
def isFollower (parent, child):
# parent: tweepy.models.User
# child: tweepy.models.User
#----------------------------------------
global red
# Fetch data from Twitter if we dont have it
key = REDIS_FOLLOWERS_KEY % parent.screen_name
if ( not red.exists (key) ):
print ("No follower data for user %s" % parent.screen_name)
crawlFollowers (parent)
cache_count = red.hlen (key)
if ( parent.followers_count > (cache_count*1.1) ):
print ("Incomplete follower data for user %s. Have %d followers but should have %d (exceeds 10% margin for error)."
% (parent.screen_name, cache_count, parent.followers_count))
crawlFollowers (parent)
return red.hexists (key, child.screen_name)
#----------------------------------------
def crawlFollowers (user):
# user: tweepy.models.User
#----------------------------------------
print ("Retrieving followers for %s (%d)" % (user.screen_name, user.followers_count))
count = 0
follower_cursors = tweepy.Cursor (api.followers, id = user.id)
followers_iter = follower_cursors.items()
follower = None
while True:
try:
# We may have to retry a failed follower lookup
if ( follower is None ):
follower = followers_iter.next()
# Add link to Redis
red.hset ("followers:%s" % user.screen_name, follower.screen_name, follower.followers_count)
follower = None
count += 1
except StopIteration:
break
except tweepy.error.TweepError as (err):
print ("Caught TweepError: %s" % (err))
if (err.reason == "Not authorized" ):
print ("Not authorized to see users followers. Skipping.")
break
limit = api.rate_limit_status()
if (limit['remaining_hits'] == 0):
seconds_until_reset = int (limit['reset_time_in_seconds'] - time.time())
print ("API request limit reached. Sleeping for %s seconds" % seconds_until_reset)
time.sleep (seconds_until_reset + 5)
else:
print ("Sleeping a few seconds and then retrying")
time.sleep (5)
print ("Added %d followers of user %s" % (count, user.screen_name))
#----------------------------------------
# Main
#----------------------------------------
tweetId = sys.argv[1]
# Connect to Redis
red = redis.Redis(unix_socket_path="/tmp/redis.sock")
# Connect to Twitter
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)
api = tweepy.API(auth)
print (api.rate_limit_status())
# Get original Tweet details
status = api.get_status (tweetId)
connected.put(status.user)
addUserToSocialGraph (None, status.user)
retweets = api.retweets (status.id)
print ("Tweet %s, originally posted by %s, was retweeted by..." % (status.id, status.user.screen_name))
for retweet in retweets:
print (retweet.user.screen_name)
unconnected[retweet.user.screen_name] = retweet.user;
# Pivot
while not (connected.empty() or len(unconnected)==0):
# Get next user
pivot = connected.get()
# Check followers of this user against unconnected retweeters
print ("Looking through followers of %s" % pivot.screen_name)
for (screen_name, retweeter) in unconnected.items():
if (isFollower(pivot, retweeter)):
print ("%s <=== %s" % (pivot.screen_name, retweeter.screen_name))
connected.put (retweeter)
addUserToSocialGraph (pivot, retweeter)
del unconnected[retweeter.screen_name]
else:
print ("%s <=X= %s" % (pivot.screen_name, retweeter.screen_name))
# Add unconnected nodes to social graph
for (screen_name, user) in unconnected.items():
addUserToSocialGraph (None, user)
# Encode data as JSON
filename = "%s.json" % status.id
print ("\n\nWriting JSON to %s" % filename)
tweet = {'id':status.id,
'retweet_count':status.retweet_count,
'text':status.text,
'author':status.user.id}
f = open (filename, 'w')
f.write (json.dumps({'tweet':tweet, 'nodes':nodes, 'links':links}, indent=2))
f.close
sys.exit()
view raw trace.py hosted with ❤ by GitHub

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script type="text/javascript" src="d3.js"></script>
<style>
.link {
stroke: #ccc;
stroke-width: 2
}
.node text {
pointer-events: none;
font: 10px sans-serif;
}
.originator {
fill: none;
stroke: red;
stroke-width: 2
}
.followers {
fill: none;
stroke: #ccc;
stroke-width: 2
}
.tweet-text {
pointer-events: none;
font: 20px sans-serif;
fill: black
}
.tweet-retweetcount {
pointer-events: none;
font: 15px sans-serif;
fill: #ccc
}
.tweet-reach {
pointer-events: none;
font: 15px sans-serif;
fill: #ccc
}
</style>
</head>
<body>
<script>
function getUrlVars() {
var vars = {};
var parts = window.location.href.replace(/[?&]+([^=&]+)=([^&]*)/gi, function(m,key,value) {
vars[key] = value;
});
return vars;
}
reach = null
tweetId = getUrlVars()["id"]
console.log(tweetId)
d3.json("data/" + tweetId + ".json", function(json){
var width = 960,
height = 960;
var svg = d3.select('body')
.append('svg')
.attr('width', width)
.attr('height', height);
// draw the graph edges
var link = svg.selectAll("link")
.data(json.links)
.enter()
.append("line")
.attr("class","link");
// draw the graph nodes
var node = svg.selectAll("node")
.data(json.nodes)
.enter()
.append("g")
.attr("class", "node")
// create the layout
var force = d3.layout.force()
.gravity(0.1)
.linkDistance(200)
.charge(-800)
.size([width, height])
.nodes(json.nodes)
.links(json.links)
.start();
// define what to do one each tick of the animation
force.on("tick", function() {
link.attr("x1", function(d) { return d.source.x; })
.attr("y1", function(d) { return d.source.y; })
.attr("x2", function(d) { return d.target.x; })
.attr("y2", function(d) { return d.target.y; });
node.attr("transform", function(d) { return "translate(" + d.x + "," + d.y + ")"; });
});
// bind the drag interaction to the nodes
node.call(force.drag);
// Node images
node.append("image")
.attr("xlink:href", function(d) { return d.profile_image_url })
.attr("x", -20)
.attr("y", -20)
.attr("width", 40)
.attr("height", 40);
// Label nodes
node.append("text")
.attr("dx", 20)
.attr("dy", ".35em")
.text(function(d) { return d.screen_name });
// Follower count circles
node.append("circle")
.attr("class", "followers")
.attr("r", function(d) { reach += d.followers_count; return 5*Math.log(d.followers_count+2)})
// Mark originator node
d3.select (".node").append("circle").attr("class", "originator").attr("r", 20)
// Tweet details
svg.append("text")
.attr("class", "tweet-text")
.attr("x", 10)
.attr("y", 20)
.text(json.tweet.text)
svg.append("text")
.attr("class", "tweet-retweetcount")
.attr("x", 10)
.attr("y", 40)
.text("Retweet Count: " + json.tweet.retweet_count)
svg.append("text")
.attr("class", "tweet-reach")
.attr("x", 10)
.attr("y", 60)
.text("Reach: " + reach)
});
</script>
</body>
</html>
view raw retweet.html hosted with ❤ by GitHub

Friday, September 28, 2012

Web Request Rate Analysis with R

During performance testing it's easy to find raw data relating to the response time of a web service or RPC. This information is typically recorded in log files like the server access log or in some form or another in the load-generating client logs. Response time statistics are an important operational parameter but need to be qualified with the request rate to give context to the metric. I use a simple R script generate request rate graphs from Apache JMeter output (.jtl files) but the concept can easily be extended to any format which includes timestamped entries, preferably labelled somehow to distinguish one service type from another.

The basis of the request rate calculation is calculating and inverting the inter-arrival time of requests. If the generation of requests is independent then the request generator follows a Poisson Process and inter-arrival times will follow an exponential distribution. Generally requests directed at a web site will not be entirely independent as a user will be guided by the site navigation constraints, but the approximation lends itself to modelling of the service using queueing theory. One of the simplest of such models is shown below, an M/M/1 queue, which models the queue length in a system having a single server, where arrivals are determined by a Poisson Process and job service times have an exponential distribution.




Moving on with less esoteric endevours, each request from the Jmeter client is recorded as a separate XML element an a jtl file. Request parameters are recorded as attributes on the element. The attributes we are interested in are ts (epoch timestamp) and lb (service label). The shell command below turns the jtl file into a format that R can read into a dataframe. This includes adding column headers to save us having to define these within the R script. These shell commands are the part you would change if importing data from another source.

# Write column headers to data file
echo "t lt ts s lb rc rm tn dt by" > DATAFILE
# Extract values in quotes
sed -n '/^<httpSample/{s/[^"]*\("[^"]*"\)[^"]*/\1 /gp}' JTLFILE >> $DATAFILE
# Data file looks something like this...
head DATAFILE
t lt ts s lb rc rm tn dt by
"319" "319" "1348598679755" "true" "service-M" "200" "OK" "Thread Group 1-1" "text" "2918"
"2167" "2159" "1348598680583" "true" "service-J" "200" "OK" "Thread Group 1-1" "text" "2853"
"22" "22" "1348598683251" "true" "service-A" "200" "OK" "Thread Group 1-1" "text" "6114"
"31" "31" "1348598685752" "true" "service-M" "200" "OK" "Thread Group 1-2" "text" "2918"
"192" "186" "1348598686284" "true" "service-J" "200" "OK" "Thread Group 1-2" "text" "2853"
"24" "24" "1348598686977" "true" "service-A" "200" "OK" "Thread Group 1-2" "text" "6114"
"4999" "4999" "1348598683774" "true" "service-E" "200" "OK" "Thread Group 1-1" "text" "10869"
"190" "190" "1348598689274" "true" "service-E" "200" "OK" "Thread Group 1-1" "text" "9893"
"9" "9" "1348598689965" "true" "service-B" "200" "OK" "Thread Group 1-1" "text" "6520"
view raw jtl2dat.sh hosted with ❤ by GitHub
Now we run the data file generated above through the R script below. The script begins by loading the data into a dataframe and sorting based on timestamp. We can't take for granted that log entries generated in a buffered multi-threaded environment will be chronologically interleaved. We then iterate over the dataframe, calculating the inter-arrival time between each request and its predecessor. This value is added as a new column v to the dataframe. We then invert and scale the inter-arrival times and plot over time to give us the overall request rate per minute. We then filter the data on service labels and perform the same exercise to get a service-specific request rate breakdown. We have used a locally weighted smoothing function, adjusting the span factor to suit our granularity requirements.

#!/usr/bin/env Rscript
#
# Usage: $0 <datafile>
#
# Expected data file format: t lt ts s lb rc rm tn dt by
#
myspan = 0.1
myargs <- commandArgs(TRUE)
file <- myargs[1]
d <- read.table (file, header=TRUE)
## Order by timestamp so we can get meaningful iats
d <- d[order(d$ts),]
# Inter-arrival time
v = numeric (length(d$ts))
v[1] = 1
for (i in 2:length(dts)) { v[i] = dts[i] - d$ts[i-1] + 1 }
d$v <- v
# Main Plot
image = paste(file, ".png", sep="")
png (image, width=960, height=480)
tsmin = min (d$ts)
ymax=1000*60*(1/mean(dv)+1/sd(dv))
palette <- rainbow (nlevels(d$lb)+1)
d.lo <- loess (dv ~ dts, span=myspan)
# Total
plot ((dts-tsmin)/1000, 1000*60/predict(d.lo, dts), type="l", ylab="req/min", xlab="seconds", main="Request Rate", col=palette[1], ylim=c(0,ymax))
legend("topright", c("total", levels(d$lb)), col=palette, lty=1, cex=0.75)
# Labels
pindex = 2;
palette <- rainbow (nlevels(d$lb))
for (label in levels(d$lb))
{
print (label)
a <- subset(d, lb==label)
v = numeric (length(a$ts))
v[1] = 1
for (i in 2:length(ats)) { v[i] = ats[i] - a$ts[i-1] + 1 }
a$v <- v
a.lo <- loess (av ~ ats, span=myspan)
lines ((ats-tsmin)/1000, 1000*60/predict(a.lo, ats), col=palette[pindex])
rm(a)
rm(v)
pindex = pindex+1
}
grid()
dev.off()
print (paste("Request rate plot written to file", image))
view raw reqrate.R hosted with ❤ by GitHub

A sample graph is shown below. For this test run we used a rate-controlled request generator with a small ramp, accounting for the initial rise and bounded oscillation for each service. Below that is a density plot of the inter-arrival time for this sample data, showing that it does indeed look exponential, as postulated by the Poisson process model.





Friday, September 21, 2012

Apache Web Stats from Access Logs

Apache access logs represent a wealth of information regarding who/what is hitting your web site/service and how you are handling the traffic. There are many tools available to digest this information and generate useful insight, but just in case you have an itch to do it yourself, here's a bit of scripting to get you on your way. I used venerable old sed(1) to pull out the fields we are interested in and R to generate bar graphs, showing the breakdown of hits by source IP address and agent headers.

The first step is to parse the access logs to make it suitable for  import into R. The exact parsing pattern will depend on the Apache LogFormat definitions and which one is active for the access log. In this case we are assuming the combined format ...


apache2.conf: LogFormat "%h ....." combined

%...h:          Remote host
%...l:          Remote logname
%...u:          Remote user 
%...t:          Time, in common log format time format 
%...r:          First line of request
%...s:          Status.  
%...0:          Bytes sent
%...D:          Response time
%{VARNAME}e:    The contents of the environment variable VARNAME.
%{VARNAME}i:    The contents of VARNAME in request header. 
%{VARNAME}n:    The contents of note VARNAME from another module.



Using sed we extract the remote host address, request path, and first word of
User-Agent header. Output format is space-separated columns: ip path agent

sed -n 's/^\S* - - [^]]* "GET\|HEAD \/\S* [^"]*" [^"]*"-" "\S* .*$/\1 \3 \4/p' access.log > access.dat


Next step is to parse and graph in R using the script below. The script imports the normalized log records into a dataframe and then aggregates by ip and agent to generate bar graphs of the hits, broken down by source address and agent-user header repectively.  The aggregates are ordered by count and, for presentation purposes only, truncated after the top 40 classes. Because we do not take timestamps into account, multiple log files can be concatenated in any order and processed together. Below are sample output graphs. Note the prominent representation of Mozilla/5.0 user agent is somewhat misleading. For simplicity sake, the sed expression only extracts the first word of the user agent header which has the effect of grouping together Googlebot, Yandexbot, TweetedTimes, Firefox, and Safari, among others.


#!/usr/bin/env Rscript
#
# Usage: $0 <apache_access_log>
#
# Generate hit statistics bar chart from Apache httpd access logs
#
d <- read.table ("access.dat")
colnames(d) <- c("ip", "path", "agent")
# Hits by IP address
# Aggregate to get counts
a <- (aggregate (dip, list(dip), length))
colnames (a) <- c('ip', 'count')
# Order by count
a <- a[with(a, order(a$count, decreasing=TRUE)),]
# Draw bar plot
png("hitsbyip.png", height=480, width=960)
# Widen bottom margin for labels
par(mar = c(10,4,4,4))
barplot(acount[1:40], names.arg=aip[1:40], las=2, cex.lab=0.5, main="Hits by IP", col=c("green"))
dev.off()
# Hits by User-Agent
# Aggregate to get counts
b <- (aggregate (dagent, list(dagent), length))
colnames (b) <- c('agent', 'count')
# Order by count
b <- b[with(b, order(b$count, decreasing=TRUE)),]
# Draw bar plot
png("hitsbyagent.png", height=480, width=960)
# Widen bottom margin for labels
par(mar = c(10,4,4,4))
barplot(bcount[1:40], names.arg=bagent[1:40], las=2, cex.lab=0.5, main="Hits by Agent", col=c("green"))
dev.off()
view raw hitsbargraph.R hosted with ❤ by GitHub



Friday, June 15, 2012

Bootstrapping an EC2 Hadoop Instance with a User-Data script

Amazon AWS is great for MapReduce jobs. You can take advantage of the AWS Elastic MapReduce service and load and transform data directly from/to S3 buckets, or you can roll your own and create a Hadoop image in EC2 and spin up instances as required.

 There are two ways to create a Hadoop image: you can customize a raw instance and once you have it working the way you would like, take a snapshot of the volume. Further instances can be created by cloning the snapshot. Obviously we need a way to differentiate instances into NameNodes, DataNodes, JobTrackers, and TaskTrackers. The easiest way to do this is probably to set up an initialization script in your image which reads instance user data/tags and customizes the configuration accordingly. User data/tags can be set when the instance is created, either through the AWS console or ec2-api command line.

Alternatively, a startup script can be supplied to a raw instance that will be executed when the instance initializes and configure the instance from scratch. User scripts can be injected via the user-data field in the AWS console or ec2-api command line. How the script is handled exactly depends on the operating system image selected. Official Ubuntu images from Canonical are configured to check the first two characters of the user-data script for #! in which case it executes the user data as a shell script. As in the case above, the initialization script can use user tags and other user data fields to customize the instance into the specific type of Hadoop node required.

In addition to the facilities described above, Canonical Ubuntu images also include a package called cloud-init. Cloud-init is the Ubuntu package that handles early initialization of a cloud instance. It is installed in the UEC Images and also in the official Ubuntu images available on EC2. Cloud-init operates on user data received during initialization and accepts this data in multiple formats: gzipped compressed content, cloud-config scripts, shell scripts, and MIME multi part archive, among others. Shell scripts function pretty much the same as described above while cloud-config scripts use a custom syntax specifically for configuring a Ubuntu instance via the built-in administrative interfaces (mostly APT package manager). Gzip content is just a way to compress the user-data payload and MIME multi part archive is a method to combine cloud-init scripts and shell scripts into a single payload.

The cloud-init script below performs an APT update and upgrade as well as installing specific packages required for the Hadoop install, including OpenJDK. The Oracle/Sun JDK is a bit harder to install since the related packages are no longer distributed by Canonical and setup requires configuring a thirdparty repository and pre-accepting the Oracle license, all tasks which can be performed using Cloud-init fucntions albeit with a bit more effort. The script also sets the server timezone and configures log sinks, very useful for debugging.

#cloud-config
# Provide debconf answers
debconf_selections: |
debconf debconf/priority select low
debconf debconf/frontend select readline
# Update apt database on first boot
apt_update: true
# Upgrade the instance on first boot
apt_upgrade: true
# Install additional packages on first boot
packages:
- lzop
- liblzo2-2
- liblzo2-dev
- git-core
- openjdk-6-jre-headless
- openjdk-6-jdk
- ant
- make
- jflex
# timezone: set the timezone for this instance
# the value of 'timezone' must exist in /usr/share/zoneinfo
timezone: America/Halifax
# manage_etc_hosts:
# default: false
# Setting this config variable to 'true' will mean that on every
# boot, /etc/hosts will be re-written from /etc/cloud/templates/hosts.tmpl
# The strings 'hostname' and 'fqdn' are replaced in the template
# with the appropriate values
manage_etc_hosts: true
# Print message at the end of cloud-init job
final_message: "The system is finally up, after $UPTIME seconds"
# configure where output will go
# 'output' entry is a dict with 'init', 'config', 'final' or 'all'
# entries. Each one defines where
# cloud-init, cloud-config, cloud-config-final or all output will go
# each entry in the dict can be a string, list or dict.
# if it is a string, it refers to stdout and stderr
# if it is a list, entry 0 is stdout, entry 1 is stderr
# if it is a dict, it is expected to have 'output' and 'error' fields
# default is to write to console only
# the special entry "&1" for an error means "same location as stdout"
# (Note, that '&1' has meaning in yaml, so it must be quoted)
output:
init: "> /tmp/cloud-init.out"
config: [ "> /tmp/cloud-config.out", "> /tmp/cloud-config.err" ]
final:
output: "> /tmp/cloud-final.out"
error: "&1"

In addition to the cloud-init script, we created a shell script to download the Hadoop source, compile, install and configure. In this case we configured a single node as a pseudo cluster and have not used user data/tags to customize settings as described above. The script also downloads and installs the LZO compression modules for HDFS, imports SSH keys to allow communication between the JobTracker and TaskManagers, although not required in this single server deployment. Finally, the script disables IPv6, formats the HDFS filesystem and starts the Hadoop daemons.

#!/bin/bash
#
# Install and configure Hadoop on this node
#
# This script will be run as root
#
CONFIGURED_USER=ubuntu
HADOOP_HOME=/usr/local/hadoop
HADOOP_VERSION=0.20.203.0
HADOOP_INSTALL={HADOOP_HOME}/hadoop-{HADOOP_VERSION}
HADOOP_TMP_DIR=${HADOOP_INSTALL}/datastore
export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64
export CFLAGS=-m32
export CXXFLAGS=-m32
export HADOOP_INSTALL={HADOOP_HOME}/hadoop-{HADOOP_VERSION}
export PATH={JAVA_HOME}/bin:{HADOOP_INSTALL}/bin:${PATH}
# Instance meta-data
# Add profile options
cat >>/etc/bash.bashrc <<EOF
# Added by cloud-init
export JAVA_HOME=${JAVA_HOME}
export LD_LIBRARY_PATH={JAVA_HOME}/lib:LD_LIBRARY_PATH
export HADOOP_HOME=${HADOOP_HOME}
export HADOOP_INSTALL=${HADOOP_INSTALL}
export PATH=\$JAVA_HOME/bin:\$HADOOP_INSTALL/bin:\$PATH
EOF
# Create hadoop user
useradd -s /bin/bash -c "Hadoop user" -m -d ${HADOOP_HOME} hadoop
# Download hadoop distribution
wget http://mirror.csclub.uwaterloo.ca/apache//hadoop/common/hadoop-0.20.203.0/hadoop-{HADOOP_VERSION}rc1.tar.gz -O /mnt/hadoop-{HADOOP_VERSION}rc1.tar.gz
tar zxvf /mnt/hadoop-{HADOOP_VERSION}rc1.tar.gz -C {HADOOP_HOME}
# Hadoop config: pseudo cluster
cat >>${HADOOP_INSTALL}/conf/hadoop-env.sh <<EOF
# Added by cloud-init
export JAVA_HOME=${JAVA_HOME}
EOF
cat >${HADOOP_INSTALL}/conf/core-site.xml <<\EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/datastore/hadoop-${user.name}</value>
</property>
</configuration>
EOF
cat >${HADOOP_INSTALL}/conf/mapred-site.xml <<\EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
<property>
<name>mapreduce.jobtracker.staging.root.dir</name>
<value>/user</value>
</property>
</configuration>
EOF
cat >${HADOOP_INSTALL}/conf/hdfs-site.xml <<\EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
EOF
# Download hadoop-lzo distribution
echo "### Downloading hadoop-lzo distibution"
sleep 300
HADOOP_PLATFORM=$(hadoop org.apache.hadoop.util.PlatformName)
pushd /mnt
git clone https://github.com/kevinweil/hadoop-lzo.git
cd hadoop-lzo
ant compile-native tar
cp build/hadoop-lzo-*.jar ${HADOOP_INSTALL}/lib
cp -r build/native/{HADOOP_PLATFORM}/lib/libgplcompression.* {HADOOP_INSTALL}/lib/native/${HADOOP_PLATFORM}
popd
# Change file ownership to hadoop
chown -R hadoop:hadoop ${HADOOP_INSTALL}
# Set up Hadoop tmp dir
mkdir ${HADOOP_TMP_DIR}
chown hadoop:hadoop ${HADOOP_TMP_DIR}
# Clean up
#echo "### Cleaning up hadoop tarball"
#rm -rf /mnt/hadoop-${HADOOP_VERSION}rc1.tar.gz
#rm -rf /mnt/hadoop-lzo
# Get mapreduce code
#su -l ${CONFIGURED_USER} -c 'git clone git://github.com/justinkamerman/fiddlenet.git'
# Set up passphraseless ssh
echo "### Setting up passphraseless ssh"
mkdir ${HADOOP_HOME}/.ssh
cat >${HADOOP_HOME}/.ssh/config <<EOF
Host *
StrictHostKeyChecking no
EOF
cat >${HADOOP_HOME}/.ssh/id_dsa <<EOF
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAqq3q4eQi9xqPRoTa+gsNIfnB1tIvnNIu6P4B/rejcAgmfxNa
TLILqSDBWYwI6xsL3MaH0hBY+c/SghcxrT3JL2z0TKfmexaUtC3+js8h++F+vPv9
fZaXojelNyXsOlgTBa+e4thIn7j/44ozMs/Uvy6PkGVukrPXRV57b9KEriAtEYJF
3DdW/Y6S89+gsV7Scwty3pOJ7eeNJEirTCdl0ox+/aNFNxlM29sOqqB43q1fKLZx
223CginVtd+fO8VLzU0+PdyxFZ9qCMVPuEP+bl3uSWCdlU9cfj+LY5cXch+kFdoN
Key7VqH2aTxXBDL8t1dwssUnlSkwtOt4Wg1qzQIDAQABAoIBAFpFXeNXa/7Rd1HO
1ppE2g9ML29U/4WrzM/B+IAl1DVeui2fqLTDvlMXVevsmpLuXRnJjvBVYRnPBwFz
Dv0Xnp6Mu7EHZGlZihC5+tbBSrITk5qUlH+l9FEBqUo/rm81QepR9nD3/4EqsXxB
Dc8kCNuM3rV6UD8bCxJPZG3CJBaLZId2xueaejWgAnx9L6bx4G8hTfBGgGpPeZ6t
sfCtukMZnAAADZRImjxfIUjDTNbdGVTCmYgWL9cg7j8VsFa7nISbPCmmyyTU6wQa
6nXEdeazejYmGcjZopPlojcQJ6V6uQMljIrI+3mx7UzQuEz/Fjqa4vvTV7H+t6fJ
qSSErgECgYEA3f092vTWF3yRRVV6MSyrvvw4DfPgFZs1IXIP72WxmK4SfWY1vwEe
/pQprSvctd6jW71G9K3SZsFoDQmuDrYmda/66YC61jDgeInbTxb9Cc9Wiy11D4v1
ddeqkKX2yVBUAhd80bhAXFJNDWhesd6nXzgB9QuPHKnmLYSr1WMwO8ECgYEAxNQ5
cJpLH0W2AG94cfRjYAfdxT421tsYX0W/MRtq3Rqj0L74UVeYgG+YOl30GJkg5H1i
D04EWk94hfNviLg6HfIDo5iFqIbFWuzXjI194JzGAMPaEPJi3w/PgJAVc1QiDjkq
DV7BQXgvz/InTQnWznOVIp8sU6SBLB2kUzpa4g0CgYEA3d7kWdlnuaW5FFEwhcGe
Do7L/7YF+9JasgjswFslu/IPbOIhSbx3G/8+AGTcfbH+GAz/xEGPD0CzHITWQMHx
gqLW51bQZpAHarJuTYgudAWU/Bn87AL43EUnptcZ52+v5z9Oc9XyDdP8SzBLpP9i
zZqO6joZWY6+DjSSAf7XEIECgYEAifFaGCpqP45xkTiOJv7prmGU8Sk68bU3DX4q
ElZuvGpxKFjOWuOTA2AyRaWW7q5SuQ+Oa793mXtcsjP7lMvYHyh/mGXKNmPNaH3Y
Sq7W61W0BtE7wOi+linUePuBrQPnoiQ57ojb0/BRQeEp3fnpS2MBv/Ph8vS1ep+D
jLi2/PkCgYAzqknNfAkItNFRljQ1VDRpq8vrCANvQ1OaRa22zuUe7g8Vi6qr8uiB
eAWfWirpwIB2YnvKcpKlgM6DASO5a8t6tyCC6l1iV3BMgXLcUeMwmbi5ocwpCVoy
8mlCSVDCFzkwPSPr7h2OFI+fYPery0y9L3Vs19m+pbcN==
-----END RSA PRIVATE KEY-----
EOF
cat >${HADOOP_HOME}/.ssh/authorized_keys <<EOF
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCqrerh5CL3Go9GhNr6Cw0h+cHW0i+c0i7o/gH+t6NwCCZ/E1pMsgupIMFZjAjrGwvcxofSEFj5z9KCFzGtPckvbPRMp+Z7FpS0Lf6OzyH74X68+/19lpeiN6U3Jew6WBMFr57i2EifuP/jijMyz9S/Lo+QZW6Ss9dFXntv0oSuIC0RgkXcN1b9jpLz36CxXtJzC3Lek4nt540kSKtMJ2XSjH79o0U3GUzb2w6qoHjerV8otnHbbcKCKdW13587xUvNTT493LEVn2oIxU+4Q/5uXe5JYJ2VT1x+P4tjlxdyH6QV2g0p7LtWofZpPFcEMvy3V3CyxSeVKT
EOF
chown -R hadoop:hadoop ${HADOOP_HOME}/.ssh
chmod 700 ${HADOOP_HOME}/.ssh
chmod 600 ${HADOOP_HOME}/.ssh/id_dsa
# Disable ipv6
echo "### Disabling ipv6 stack"
cat >>/etc/sysctl.conf <<EOF
# IPv6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
EOF
sysctl -p
# Namenode
# Format hdfs filesystem
echo "### Formatting hdfs filesystem"
su hadoop -c "${HADOOP_INSTALL}/bin/hadoop namenode -format"
# Start hadoop daemons
echo "### Starting Hadoop daemons"
su hadoop -c "${HADOOP_INSTALL}/bin/start-all.sh"
# Create user directory
echo "### Creating hdfs directory for user ${CONFIGURED_USER}"
su hadoop -c "{HADOOP_INSTALL}/bin/hadoop fs -mkdir /user/{CONFIGURED_USER}"
# Change ownership of user directory
echo "### Changing ownership of hdfs directory for user ${CONFIGURED_USER}"
su hadoop -c "{HADOOP_INSTALL}/bin/hadoop fs -chown {CONFIGURED_USER} /user/${CONFIGURED_USER}"
1
view raw hadoop-setup.sh hosted with ❤ by GitHub

The two scripts above are packaged as a MIME multi part message to be delivered to cloud-init via the EC2 user data payload. The cloud-init script write-mime-multipart is used to perform this function as shown in the simple Makefile below.

WRITE-MIME_MULTIPART=./bin/write-mime-multipart
.PHONY: clean
cloud-config.txt: ubuntu-config.txt hadoop-setup.sh
(WRITE-MIME_MULTIPART) --output=@ $^
clean:
$(RM) cloud-config.txt
view raw gistfile1.mak hosted with ❤ by GitHub

Tuesday, March 27, 2012

Data-Intensive Text Processing with MapReduce

This book is compact and intense but is an insightful and powerful demonstration as to how a problem may be decomposed to fit the MapReduce paradigm. Equally important, it describes the types of problem that are not suited to decomposition as MapReduce jobs. It covers in detail the use of MapReduce in text indexing, graph algorithms, and expectation maximization, but the techniques described could easily be applied to a wide range of applications. I was able to turn the pseudo code snippets, together with Hadoop: The Definitive Guide, into working examples in a relatively short time.

For me, this book filled in the blanks with respect to how to apply MapReduce to my own algorithms and data.

Thursday, March 22, 2012

Text Indexing with Aho-Corasick

The Aho-Corasick string matching algorithm is a kind of dictionary matching search algorithm. It was originally proposed as an alternative to indexing as a means of speeding up bibliographic search. That was back in 1975 before the World Wide Web and ensuing information explosion demanding indexing in some form or other to make real-time information retrieval practical. The Aho-Corasick algorithm, however, has some interesting properties which make it attractive for use as an indexing scanner.

The algorithm constructs a state machine from a collection of dictionary words. The state machine is in-effect, a reduced-grammar regular expression parser and can be used to scan text for the dictionary words in a single pass. The machine state transitions (edges) trigger on encountering a specific letter in the input stream. Machine states (nodes) can emit one or more dictionary words if the path leading to the state encodes all of the letters of the dictionary word in order. Failure edges transition from a state for which no outgoing edge matches the next next letter in the input stream, to a state from which it it still may be possible to match a dictionary word given the letters already encountered in the stream.

The time taken to construct the state machine is proportional to the sum of the lengths of all dictionary words. This cost however can be amortized over the life of the state machine and a single state machine can be used to parse multiple texts concurrently if the implementation uses independent iterators to track state transitions through the machine. The number of state transitions required for an Aho-Corasick state machine to scan a document is independent of the size of the dictionary. This means that Aho-Corasick method scales very well to large dictionaries, the limiting factor being the space required to hold the state machine in memory.

As a proof-of-concept, we implemented the Aho-Corasick algorithm in Java and ran some benchmark tests. For debugging puposes we implemented a method to dump a state machine to Graphviz DOT format. The visualization of a state machine constructed with dictionary [he, she, his, hers] is shown in Figure 1. The background image for this blog title is the visualization of a state machine constructed with a 100 word dictionary - not very practical to follow but makes an interesting graphic.

Figure 1: Aho-Corasick state machine for dictionary [he, she, his, hers]

Figure 2 shows how time taken to construct the state machine varies with the number of dictionary words. Only 3 data points were taken but the relationship is clearly linear.

Figure 2: Aho-Corasick state machine construction time

Figure 3 shows how the performance of the Aho-Corasick implementation varies as the size of the corpus increases. The relationship appears linear and, for the most part, insensitive to the dictionary size. Deviations are likely attributable to poor sampling and high variance between test runs.




Source code for this implementation is available here.