Lately, I have been reading the book
Signals and Noise by Nate Silver. His book references an IBM
webpage
that claims the world is creating 2.5 quintillion (10
18) bytes of data a day and that 90% of the data that currently exists the world was created in the past two years. Combine this fact with the rise in
multicore processors, would explain the surging interest in functional programming and NoSQL and MapReduce frameworks. There is just too much data to be handled by a single machine alone
Continuing the journey in the book
Seven Databases in Seven Weeks as I explore MapReduce. MapReduce framework is popularized by Google's seminal paper, but the genesis of that framework started with ideas in functional programming languages. After all, if functional programming is intended to help one run code on multiple processors on a single server, it's a logical conclusion to extend that computing from multiple processors on a single server to multiple servers. However, it's the recent technology factors and the data tsunami that I believe encourages people to look at functional programming and NoSQL databases.
What I would like to see in the future is that MapReduce capability is integrated with functional programming in such a way that calling a map/reduce function on a typed collection that recognizes that the typed collection is backed by a NoSQL database that it seamlessly spawns the function to the NoSQL stores in a MapReduce fashion.
I want to do something like the following:
Seq.map myfunc myRiakCollection
and the compiler/runtime will automatically convert myfunc
into an appropriate form to run on all the data nodes and returns the results. But since we're not there yet, we need to do a little more work.
Some additional items to note, Riak Handbook warns that:
MapReduce in Riak is not exactly the same as Hadoop, where you can practically analyze
an infinite amount of data. In Riak, data is pulled out of the system to be
analyzed. That alone sets the boundaries of what's possible.
This is further reinforced by this following mailing list entry in a response from Sean Cribbs, developer advocate from Basho:
...Riak's MapReduce is designed for low-latency queries, but it's also designed for "targeted" queries where you don't slurp the entire contents of a bucket from disk. Structure your data so that you can know -- or deterministically guess -- what keys should be involved in your query. Then you won't be trying to fetch and filter all of the data all the time.
I wanted to point this out as this goes against my mental model of how MapReduce should work.
MapReduce with CorrugatedIron
In this particular example, I will use the Javascript example from the book. First, we'll define some helper functions that provides some syntatic sugar in working with the CorrugatedIron's fluent interface:
// Helper function for CorrugatedIron's job inputs for bucket/key pairs only.
let setinputs data (q:RiakMapReduceQuery) =
let inputs = new RiakBucketKeyInput()
data
|> List.iter( fun (bucket,key) -> inputs.AddBucketKey(bucket,key))
q.Inputs(inputs)
// Helper function mapping Javascript functions
let mapjs src keep (q:RiakMapReduceQuery) =
q.MapJs(fun m -> m.Source(src).Keep(keep) |> ignore)
// Helper function for mapping stored functions
let mapsf bucket func keep (q:RiakMapReduceQuery) =
q.MapJs(fun m -> m.BucketKey(bucket,func).Keep(keep) |> ignore)
// Helper function for mapping builtin functions
let mapbuiltins func keep (q:RiakMapReduceQuery) =
q.MapJs(fun m -> m.Name(func).Keep(true) |> ignore)
// Getting results from CorrugatedIron seems convoluted to me
// Helper function is used to extract the data to a simpler form
let getResults (results:RiakResult<RiakMapReduceResult>) =
let getText raw = raw |> System.Text.Encoding.Default.GetString
results.Value.PhaseResults
|> Seq.map (fun phase -> phase.Values |> Seq.map getText )
|> Seq.concat
// Helper function for Reduce operations in Javascript
let reducejs src keep (q:RiakMapReduceQuery) =
q.ReduceJs(fun m -> m.Source(src).Keep(keep) |> ignore)
// Helper function for link walking with mapreduce
let linkphase bucket keep (q:RiakMapReduceQuery) =
q.Link(fun l -> l.Keep(keep).Bucket(bucket) |> ignore)
Here's the script to perform the MapReduce in CorrugatedIrons. In the following snippet, ciClient
refers to CorrugatedIron's RiakClient
object, in the previous blogs, ciClient
was just client
:
// Mapreduce example from the book
let src ="function(v) {
/* From the Riak object, pull data and parse it as JSON */
var parsed_data = JSON.parse(v.values[0].data);
var data = {};
/* Key capacity number by room style string */
data[parsed_data.style] = parsed_data.capacity;
return [data];
}"
let rooms = [("rooms","101");("rooms","102");("rooms","103");]
let query = new RiakMapReduceQuery()
|> setinputs rooms
|> mapjs src true
ciClient.MapReduce(query)
|> getResults
Results:
seq ["[{"suite":3}]"; "[{"king":1}]"; "[{"double":4}]"]
MapReduce with REST API
It's not all that hard to do MapReduce with ASP.NET MVC REST API although your json script becomes more complex. In the following example, I had to escape the double quote(") because, alas, I'm still using F# 2.0. If you're using F# 3.0, you can use the new triple quote string syntax to wrap the MapReduce script.
// This helper function will be used in all the later examples with REST API
let mapred script =
let post_url = sprintf "%s/mapred" riakurl
let content = new StringContent(script)
content.Headers.ContentType.MediaType <- "application/json"
let response = restClient.PostAsync(post_url,content)
response.Wait()
let results = response.Result.Content.ReadAsStringAsync()
results
let mapreduceScript = "{
\"inputs\":[
[\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"]
],
\"query\":[
{\"map\":{
\"language\":\"javascript\",
\"source\":
\"function(v) {
/* From the Riak object, pull data and parse it as JSON */
var parsed_data = JSON.parse(v.values[0].data);
var data = {};
/* Key capacity number by room style string */
data[parsed_data.style] = parsed_data.capacity;
return [data];
}\"
}}
]
}"
mapred mapreduceScript
Results:
val it : Task<string> =
System.Threading.Tasks.Task`1[System.String]
{AsyncState = null;
CreationOptions = None;
Exception = null;
Id = 2;
IsCanceled = false;
IsCompleted = true;
IsFaulted = false;
Result = "[{"king":1},{"suite":3},{"double":4}]";
Status = RanToCompletion;}
Stored Functions with CorrugatedIron
I can store custom Javascript mapreduce functions in a specific bucket/key and call it for use:
let myfunc = "
function(v) {
var parsed_data = JSON.parse(v.values[0].data);
var data = {};
data[parsed_data.style] = parsed_data.capacity;
return [data];
}"
// Store the map function in a bucket value
let putResults = new RiakObject("my_functions","map_capacity", myfunc)
|> ciClient.Put
// Querying using stored map function
let rooms = [("rooms","101");("rooms","102");("rooms","103");("rooms","104");]
let query = new RiakMapReduceQuery()
|> setinputs rooms
|> mapsf "my_functions" "map_capacity" true
ciClient.MapReduce(query)
|> getResults
Results:
seq ["[{"suite":3}]"; "[{"king":1}]"; "[{"double":4}]"; "[{"suite":7}]"]
Stored Functions with REST API
With REST API, reusing the mapred
function defined previously:
let storedFuncScript = "
{
\"inputs\":[
[\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"],[\"rooms\",\"104\"]
],
\"query\":[
{\"map\":{
\"language\":\"javascript\",
\"bucket\":\"my_functions\",
\"key\":\"map_capacity\"
}}
]
}
"
let savesf bucket key myfunc =
let post_url = sprintf "%s/riak/%s/%s" riakurl bucket key
let content = new StringContent(myfunc)
content.Headers.ContentType.MediaType <- "application/json"
restClient.PostAsync(post_url,content)
// Store the map function in a bucket value
savesf "my_functions" "map_capacity" myfunc
mapred storedFuncScript
Results:
val it : TaskTask<string> =
System.Threading.Tasks.Task`1[System.String]
{AsyncState = null;
CreationOptions = None;
Exception = null;
Id = 5;
IsCanceled = false;
IsCompleted = true;
IsFaulted = false;
Result = "[{"suite":3},{"double":4},{"king":1},{"suite":7}]";
Status = RanToCompletion;}
Built-in Functions with CorrugatedIron
Riak has some built-in mapreduce javascript functions. I couldn't find any reference documentation on it, the closes I came is the actual Javascript code in github: . Here are the list of builtin Javascript functions from that source code:
Riak.getClassName(obj)
Riak.filterNotFound(values)
Riak.mapValues(value,keyData,arg)
Riak.mapValuesJson(value,keyData,arg)
Riak.mapByFields(value,keyData,fields)
Riak.reduceSum(values,arg)
Riak.reduceMin(values,arg)
Riak.reduceMax(values,arg)
Riak.reduceSort(value,arg)
Riak.reduceNumericSort(value, arg)
Riak.reduceLimit(value, arg)
Riak.reduceSlice(value, arg)
In the following example, we're only going to use Riak.mapValuesJson()
:
let rooms = [("rooms","101");("rooms","102");("rooms","103");("rooms","104");]
let query = new RiakMapReduceQuery()
|> setinputs rooms
|> mapbuiltins "Riak.mapValuesJson" true
let results = ciClient.MapReduce(query)
// Collect the results and flatten the nested collections
results.Value.PhaseResults
|> Seq.map (fun x -> x.GetObjects<IEnumerable<Room>>())
|> Seq.concat
|> Seq.concat
Results:
val it : seq<Room> =
seq [{style = "double"; capacity = 4;};
{style = "suite"; capacity = 3;};
{style = "suite"; capacity = 7;};
{style = "king"; capacity = 1;}]
Built-in Functions with CorrugatedIron
Calling built-in functions with REST API:
let builtinFunc = "
{
\"inputs\":[
[\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"]
],
\"query\":[
{\"map\":{
\"language\":\"javascript\",
\"name\":\"Riak.mapValuesJson\"
}}
]
}
"
mapred builtinFunc
Results:
val it : Task<string> =
System.Threading.Tasks.Task`1[System.String]
{AsyncState = null;
CreationOptions = None;
Exception = null;
Id = 6;
IsCanceled = false;
IsCompleted = true;
IsFaulted = false;
Result = "[{"style":"double","capacity":4},
{"style":"king","capacity":1},
{"style":"suite","capacity":3}]";
Status = RanToCompletion;}
Reducing with CorrugatedIron
Reduce operations with CorrguatedIron:
let reduceScript = "
function(v) {
var totals = {};
for (var i in v) {
for(var style in v[i]) {
if (totals[style]) totals[style] += v[i][style];
else totals[style] = v[i][style];
}
}
return [totals];
}"
let query = (new RiakMapReduceQuery()).Inputs("rooms")
|> mapsf "my_functions" "map_capacity" false
|> reducejs reduceScript true
ciClient.MapReduce(query)
|> getResults
Results:
seq["[{"queen":9051,"king":9189,"single":8811,"double":8629,"suite":9231}]"]
Reduce operations with REST API
MapReduce with REST API:
let myReduceScript = "
{
\"inputs\":\"rooms\",
\"query\":[
{\"map\":{
\"language\":\"javascript\",
\"bucket\":\"my_functions\",
\"key\":\"map_capacity\"
}},
{\"reduce\":{
\"language\":\"javascript\",
\"source\":
\"function(v) {
var totals = {};
for (var i in v) {
for(var style in v[i]) {
if( totals[style] ) totals[style] += v[i][style];
else totals[style] = v[i][style];
}
}
return [totals];
}\"
}}
]
}"
mapred myReduceScript
Results:
val it : Task<string> =
System.Threading.Tasks.Task`1[System.String]
{AsyncState = null;
CreationOptions = None;
Exception = null;
Id = 7;
IsCanceled = false;
IsCompleted = true;
IsFaulted = false;
Result = "[{"suite":9231,"king":9189,"double":8629,"queen":9051,"single":8811}]";
Status = RanToCompletion;}
Key Filters with CorrugatedIron
Key filters is used to reduce the input set before the map/reduce operations:
let reduceScript = "
function(v) {
var totals = {};
for (var i in v) {
for(var style in v[i]) {
if (totals[style]) totals[style] += v[i][style];
else totals[style] = v[i][style];
}
}
return [totals];
}"
let keyFilter = new Action<RiakFluentKeyFilter>(fun x -> x.StringToInt().LessThan(1000) |> ignore)
let query = (new RiakMapReduceQuery()).Inputs("rooms").Filter(keyFilter)
|> mapsf "my_functions" "map_capacity" false
|> reducejs reduceScript true
ciClient.MapReduce(query)
|> getResults
Results:
val it : seq<string> =
seq ["[{"queen":780,"suite":968,"king":838,"single":791,"double":758}]"]
Key Filters with REST API
Adding key filters with REST API:
let keyFilterScript = "
{
\"inputs\":{
\"bucket\":\"rooms\",
\"key_filters\":[[\"string_to_int\"], [\"less_than\", 1000]]},
\"query\":[
{\"map\":{
\"language\":\"javascript\",
\"bucket\":\"my_functions\",
\"key\":\"map_capacity\"
}},
{\"reduce\":{
\"language\":\"javascript\",
\"source\":
\"function(v) {
var totals = {};
for (var i in v) {
for(var style in v[i]) {
if( totals[style] ) totals[style] += v[i][style];
else totals[style] = v[i][style];
}
}
return [totals];
}\"
}}
]
}"
mapred keyFilterScript
Results:
val it : Task<string> =
System.Threading.Tasks.Task`1[System.String]
{AsyncState = null;
CreationOptions = None;
Exception = null;
Id = 10;
IsCanceled = false;
IsCompleted = true;
IsFaulted = false;
Result = "[{"queen":780,"king":838,"suite":968,"single":791,"double":758}]";
Status = RanToCompletion;}
MapReduce Link Walking with CorrugatedIron
In my previous blog, I talked about the fact I couldn't figure out how to specify the keep
flag in the link walk examples. With map reduce link walking in CorrugatedIron, I now can now specify the keep
flag for each of the link-map-reduce phases:
let keyFilter = new Action<RiakFluentKeyFilter>(fun x -> x.Equal("2") |> ignore)
let query = (new RiakMapReduceQuery()).Inputs("cages").Filter(keyFilter)
|> linkphase "animals" false
|> mapjs "function(v) { return [v]; }" true
ciClient.MapReduce(query)
|> getResults
Results:
val it : seq<seq<string>> =
seq
[seq [];
seq ["[{
"bucket":"animals",
"key":"ace",
"vclock":"a85hYGBgymDKBVIcrFrhrwI5O5wzmBKZ8lgZlj/hPM0HlVLm/PczkDP1FlRqHUgqCwA=",
"values":
[{"metadata":
{"X-Riak-VTag":"4XZxrmgvKzan8X4FKz5hti",
"content-type":"application/json",
"index":[],
"X-Riak-Last-Modified":"Thu, 16 May 2013 23:15:58 GMT"},
"data":"{\"nickname\":\"The Wonder Dog\",\"breed\":\"German Shepherd\"}"}]}]"]]
MapReduce Link Walking with REST API
With REST API:
// Code for Link Walking with MapReduce
let mrLinkWalk = "
{
\"inputs\":{
\"bucket\":\"cages\",
\"key_filters\":[[\"eq\", \"2\"]]
},
\"query\":[
{\"link\":{
\"bucket\":\"animals\",
\"keep\":false
}},
{\"map\":{
\"language\":\"javascript\",
\"source\":
\"function(v) { return [v]; }\"
}}
]
}
"
mapred mrLinkWalk
Results:
val it : Task<string> =
System.Threading.Tasks.Task`1[System.String]
{AsyncState = null;
CreationOptions = None;
Exception = null;
Id = 13;
IsCanceled = false;
IsCompleted = true;
IsFaulted = false;
Result = "[{
"bucket":"animals",
"key":"ace",
"vclock":"a85hYGBgymDKBVIcrFrhrwI5O5wzmBKZ8lgZlj/hPM0HlVLm/PczkDP1FlRqHUgqCwA=",
"values":[{
"metadata":{
"X-Riak-VTag":"4XZxrmgvKzan8X4FKz5hti",
"content-type": "application/json",
"index":[],
"X-Riak-Last-Modified":"Thu, 16 May 2013 23:15:58 GMT"},
"data":"{\"nickname\":\"The Wonder Dog\",\"breed\":\"German Shepherd\"}"}]}]";
Status = RanToCompletion;}