Monday, June 03, 2013

Riak MapReduce with F#

Lately, I have been reading the book Signals and Noise by Nate Silver. His book references an IBM webpage that claims the world is creating 2.5 quintillion (1018) bytes of data a day and that 90% of the data that currently exists the world was created in the past two years. Combine this fact with the rise in multicore processors, would explain the surging interest in functional programming and NoSQL and MapReduce frameworks. There is just too much data to be handled by a single machine alone

Continuing the journey in the book Seven Databases in Seven Weeks as I explore MapReduce. MapReduce framework is popularized by Google's seminal paper, but the genesis of that framework started with ideas in functional programming languages. After all, if functional programming is intended to help one run code on multiple processors on a single server, it's a logical conclusion to extend that computing from multiple processors on a single server to multiple servers. However, it's the recent technology factors and the data tsunami that I believe encourages people to look at functional programming and NoSQL databases.

What I would like to see in the future is that MapReduce capability is integrated with functional programming in such a way that calling a map/reduce function on a typed collection that recognizes that the typed collection is backed by a NoSQL database that it seamlessly spawns the function to the NoSQL stores in a MapReduce fashion.

I want to do something like the following:

Seq.map myfunc myRiakCollection

and the compiler/runtime will automatically convert myfunc into an appropriate form to run on all the data nodes and returns the results. But since we're not there yet, we need to do a little more work.

Some additional items to note, Riak Handbook warns that:

MapReduce in Riak is not exactly the same as Hadoop, where you can practically analyze an infinite amount of data. In Riak, data is pulled out of the system to be analyzed. That alone sets the boundaries of what's possible.

This is further reinforced by this following mailing list entry in a response from Sean Cribbs, developer advocate from Basho:

...Riak's MapReduce is designed for low-latency queries, but it's also designed for "targeted" queries where you don't slurp the entire contents of a bucket from disk. Structure your data so that you can know -- or deterministically guess -- what keys should be involved in your query. Then you won't be trying to fetch and filter all of the data all the time.

I wanted to point this out as this goes against my mental model of how MapReduce should work.


MapReduce with CorrugatedIron

In this particular example, I will use the Javascript example from the book. First, we'll define some helper functions that provides some syntatic sugar in working with the CorrugatedIron's fluent interface:

// Helper function for CorrugatedIron's job inputs for bucket/key pairs only.
let setinputs data (q:RiakMapReduceQuery) = 
    let inputs = new RiakBucketKeyInput()

    data
    |> List.iter( fun (bucket,key) -> inputs.AddBucketKey(bucket,key)) 

    q.Inputs(inputs)

//  Helper function mapping Javascript functions
let mapjs  src keep (q:RiakMapReduceQuery) =
    q.MapJs(fun m -> m.Source(src).Keep(keep) |> ignore)

// Helper function for mapping stored functions
let mapsf bucket func keep (q:RiakMapReduceQuery) =
    q.MapJs(fun m -> m.BucketKey(bucket,func).Keep(keep) |> ignore)

// Helper function for mapping builtin functions
let mapbuiltins func keep (q:RiakMapReduceQuery) =
    q.MapJs(fun m -> m.Name(func).Keep(true) |> ignore)

// Getting results from CorrugatedIron seems convoluted to me
// Helper function is used to extract the data to a simpler form 
let getResults (results:RiakResult<RiakMapReduceResult>) =
    let getText raw = raw |> System.Text.Encoding.Default.GetString
    results.Value.PhaseResults 
 |> Seq.map (fun phase -> phase.Values |> Seq.map getText ) 
 |> Seq.concat 
 
// Helper function for Reduce operations in Javascript
let reducejs src keep (q:RiakMapReduceQuery) =
    q.ReduceJs(fun m -> m.Source(src).Keep(keep) |> ignore)

// Helper function for link walking with mapreduce
let linkphase bucket keep (q:RiakMapReduceQuery) =
    q.Link(fun l -> l.Keep(keep).Bucket(bucket) |> ignore)
 

Here's the script to perform the MapReduce in CorrugatedIrons. In the following snippet, ciClient refers to CorrugatedIron's RiakClient object, in the previous blogs, ciClient was just client:

// Mapreduce example from the book
let src ="function(v) {
     /* From the Riak object, pull data and parse it as JSON */
     var parsed_data = JSON.parse(v.values[0].data);
     var data = {};

     /* Key capacity number by room style string */
     data[parsed_data.style] = parsed_data.capacity;
     return [data];
   }"

let rooms =  [("rooms","101");("rooms","102");("rooms","103");]

let query = new RiakMapReduceQuery()
   |> setinputs rooms
   |> mapjs src true

ciClient.MapReduce(query)
|> getResults

Results:

seq ["[{"suite":3}]"; "[{"king":1}]"; "[{"double":4}]"]

MapReduce with REST API

It's not all that hard to do MapReduce with ASP.NET MVC REST API although your json script becomes more complex. In the following example, I had to escape the double quote(") because, alas, I'm still using F# 2.0. If you're using F# 3.0, you can use the new triple quote string syntax to wrap the MapReduce script.

// This helper function will be used in all the later examples with REST API
let mapred script =
    let post_url = sprintf "%s/mapred" riakurl 
    let content = new StringContent(script)
    content.Headers.ContentType.MediaType <- "application/json"
    let response = restClient.PostAsync(post_url,content)
    response.Wait()
    let results = response.Result.Content.ReadAsStringAsync()
    results

let mapreduceScript =  "{
    \"inputs\":[
        [\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"]
    ],
    \"query\":[
        {\"map\":{
            \"language\":\"javascript\",
            \"source\":
            \"function(v) {
                /* From the Riak object, pull data and parse it as JSON */
                var parsed_data = JSON.parse(v.values[0].data);
                var data = {};
                /* Key capacity number by room style string */
                data[parsed_data.style] = parsed_data.capacity;
                return [data];
            }\"
        }}
      ]
    }"

mapred mapreduceScript 

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 2;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"king":1},{"suite":3},{"double":4}]";
     Status = RanToCompletion;}


Stored Functions with CorrugatedIron

I can store custom Javascript mapreduce functions in a specific bucket/key and call it for use:

let myfunc = "
 function(v) {
  var parsed_data = JSON.parse(v.values[0].data);
  var data = {};
  data[parsed_data.style] = parsed_data.capacity;
  return [data];
 }"

// Store the map function in a bucket value
let putResults = new RiakObject("my_functions","map_capacity", myfunc) 
                 |> ciClient.Put

// Querying using stored map function
let rooms = [("rooms","101");("rooms","102");("rooms","103");("rooms","104");]

let query = new RiakMapReduceQuery()
   |> setinputs rooms
   |> mapsf "my_functions" "map_capacity" true

ciClient.MapReduce(query)
|> getResults

Results:

seq ["[{"suite":3}]"; "[{"king":1}]"; "[{"double":4}]"; "[{"suite":7}]"]

Stored Functions with REST API

With REST API, reusing the mapred function defined previously:

let storedFuncScript = "
{
    \"inputs\":[
        [\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"],[\"rooms\",\"104\"]
    ],
    \"query\":[
        {\"map\":{
        \"language\":\"javascript\",
        \"bucket\":\"my_functions\",
        \"key\":\"map_capacity\"
    }}
    ]
}
"

let savesf bucket key myfunc =
    let post_url = sprintf "%s/riak/%s/%s" riakurl bucket key
    let content = new StringContent(myfunc)
    content.Headers.ContentType.MediaType <- "application/json"
    restClient.PostAsync(post_url,content)

// Store the map function in a bucket value
savesf "my_functions" "map_capacity" myfunc


mapred storedFuncScript

Results:

val it : TaskTask<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 5;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"suite":3},{"double":4},{"king":1},{"suite":7}]";
     Status = RanToCompletion;}

Built-in Functions with CorrugatedIron

Riak has some built-in mapreduce javascript functions. I couldn't find any reference documentation on it, the closes I came is the actual Javascript code in github: . Here are the list of builtin Javascript functions from that source code:

  • Riak.getClassName(obj)
  • Riak.filterNotFound(values)
  • Riak.mapValues(value,keyData,arg)
  • Riak.mapValuesJson(value,keyData,arg)
  • Riak.mapByFields(value,keyData,fields)
  • Riak.reduceSum(values,arg)
  • Riak.reduceMin(values,arg)
  • Riak.reduceMax(values,arg)
  • Riak.reduceSort(value,arg)
  • Riak.reduceNumericSort(value, arg)
  • Riak.reduceLimit(value, arg)
  • Riak.reduceSlice(value, arg)

In the following example, we're only going to use Riak.mapValuesJson():

let rooms = [("rooms","101");("rooms","102");("rooms","103");("rooms","104");]


let query = new RiakMapReduceQuery()
   |> setinputs rooms
   |> mapbuiltins "Riak.mapValuesJson" true

let results = ciClient.MapReduce(query)

// Collect the results and flatten the nested collections
results.Value.PhaseResults
|> Seq.map (fun x -> x.GetObjects<IEnumerable<Room>>())
|> Seq.concat
|> Seq.concat

Results:

val it : seq<Room> =
  seq [{style = "double"; capacity = 4;}; 
       {style = "suite";  capacity = 3;}; 
    {style = "suite";  capacity = 7;}; 
    {style = "king";   capacity = 1;}]

Built-in Functions with CorrugatedIron

Calling built-in functions with REST API:

let builtinFunc = "
{
 \"inputs\":[
  [\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"]
 ],
 \"query\":[
  {\"map\":{
  \"language\":\"javascript\",
  \"name\":\"Riak.mapValuesJson\"
 }}
 ]
}
"

mapred builtinFunc

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 6;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"style":"double","capacity":4},
             {"style":"king","capacity":1},
    {"style":"suite","capacity":3}]";
     Status = RanToCompletion;}

Reducing with CorrugatedIron

Reduce operations with CorrguatedIron:

let reduceScript = "
function(v) {
  var totals = {};
  for (var i in v) {
 for(var style in v[i]) {
   if (totals[style]) totals[style] += v[i][style];
   else               totals[style] = v[i][style];
   }
  }
  return [totals];        
}"


let query = (new RiakMapReduceQuery()).Inputs("rooms")
   |> mapsf "my_functions" "map_capacity" false
   |> reducejs reduceScript true


ciClient.MapReduce(query)
|> getResults

Results:

seq["[{"queen":9051,"king":9189,"single":8811,"double":8629,"suite":9231}]"]

Reduce operations with REST API

MapReduce with REST API:

let myReduceScript = "
{
    \"inputs\":\"rooms\",
    \"query\":[
    {\"map\":{
        \"language\":\"javascript\",
        \"bucket\":\"my_functions\",
        \"key\":\"map_capacity\"
    }},
    {\"reduce\":{
        \"language\":\"javascript\",
        \"source\":
            \"function(v) {
                var totals = {};
                for (var i in v) {
                    for(var style in v[i]) {
                        if( totals[style] ) totals[style] += v[i][style];
                        else totals[style] = v[i][style];
                    }
                }
                return [totals];
            }\"
        }}
    ]
}"

mapred myReduceScript

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 7;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"suite":9231,"king":9189,"double":8629,"queen":9051,"single":8811}]";
     Status = RanToCompletion;}

Key Filters with CorrugatedIron

Key filters is used to reduce the input set before the map/reduce operations:

let reduceScript = "
 function(v) {
   var totals = {};
   for (var i in v) {
  for(var style in v[i]) {
    if (totals[style]) totals[style] += v[i][style];
    else               totals[style] = v[i][style];
    }
   }
   return [totals];        
}"

let keyFilter = new Action<RiakFluentKeyFilter>(fun x -> x.StringToInt().LessThan(1000) |> ignore)

let query = (new RiakMapReduceQuery()).Inputs("rooms").Filter(keyFilter)
   |> mapsf "my_functions" "map_capacity" false
   |> reducejs reduceScript true


ciClient.MapReduce(query)
|> getResults

Results:

val it : seq<string> =
  seq ["[{"queen":780,"suite":968,"king":838,"single":791,"double":758}]"]

Key Filters with REST API

Adding key filters with REST API:

let keyFilterScript = "
{
    \"inputs\":{
        \"bucket\":\"rooms\",
        \"key_filters\":[[\"string_to_int\"], [\"less_than\", 1000]]},    
    \"query\":[
    {\"map\":{
        \"language\":\"javascript\",
        \"bucket\":\"my_functions\",
        \"key\":\"map_capacity\"
    }},
    {\"reduce\":{
        \"language\":\"javascript\",
        \"source\":
            \"function(v) {
                var totals = {};
                for (var i in v) {
                    for(var style in v[i]) {
                        if( totals[style] ) totals[style] += v[i][style];
                        else totals[style] = v[i][style];
                    }
                }
                return [totals];
            }\"
        }}
    ]
}"

mapred keyFilterScript

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 10;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"queen":780,"king":838,"suite":968,"single":791,"double":758}]";
     Status = RanToCompletion;}

MapReduce Link Walking with CorrugatedIron

In my previous blog, I talked about the fact I couldn't figure out how to specify the keep flag in the link walk examples. With map reduce link walking in CorrugatedIron, I now can now specify the keep flag for each of the link-map-reduce phases:


let keyFilter = new Action<RiakFluentKeyFilter>(fun x -> x.Equal("2") |> ignore)

let query = (new RiakMapReduceQuery()).Inputs("cages").Filter(keyFilter)
   |> linkphase "animals" false
   |> mapjs "function(v) { return [v]; }" true

ciClient.MapReduce(query)
|> getResults

Results:


val it : seq<seq<string>> =
  seq
    [seq [];
     seq ["[{
    "bucket":"animals",
    "key":"ace",
    "vclock":"a85hYGBgymDKBVIcrFrhrwI5O5wzmBKZ8lgZlj/hPM0HlVLm/PczkDP1FlRqHUgqCwA=",
    "values":
      [{"metadata":
     {"X-Riak-VTag":"4XZxrmgvKzan8X4FKz5hti",
      "content-type":"application/json",
   "index":[],
   "X-Riak-Last-Modified":"Thu, 16 May 2013 23:15:58 GMT"},
   "data":"{\"nickname\":\"The Wonder Dog\",\"breed\":\"German Shepherd\"}"}]}]"]] 

MapReduce Link Walking with REST API

With REST API:

// Code for Link Walking with MapReduce
let mrLinkWalk = "
{
 \"inputs\":{
  \"bucket\":\"cages\",
  \"key_filters\":[[\"eq\", \"2\"]]
 },
 \"query\":[
  {\"link\":{
   \"bucket\":\"animals\",
   \"keep\":false
  }},
  {\"map\":{
   \"language\":\"javascript\",
   \"source\":
    \"function(v) { return [v]; }\"
  }}
 ]
}
"

mapred mrLinkWalk

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 13;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{
   "bucket":"animals",
   "key":"ace",
   "vclock":"a85hYGBgymDKBVIcrFrhrwI5O5wzmBKZ8lgZlj/hPM0HlVLm/PczkDP1FlRqHUgqCwA=",
   "values":[{
     "metadata":{
    "X-Riak-VTag":"4XZxrmgvKzan8X4FKz5hti",
    "content-type": "application/json",
    "index":[],
    "X-Riak-Last-Modified":"Thu, 16 May 2013 23:15:58 GMT"},
      "data":"{\"nickname\":\"The Wonder Dog\",\"breed\":\"German Shepherd\"}"}]}]";
     Status = RanToCompletion;}

No comments: