Monday, June 24, 2013

Riak CAP Tuning and F#

Riak provides the ability to tune CAP. CAP, which stands for Consistency, Availability, and Partition tolerance, does not seem like controls that are tunable. These terms seem evoke images of binary choices, as in either you have it or you don't. CAP terms by itself is ambiguous in their definitions. I'm not the only one who feels that way as can be seen in Daniel Abadi's blog post. For me, it was more helpful for me to think of tradeoffs as consistency latency (time needed to achieve eventual consistency), performance (read/write latency), and node failure tolerance (how many nodes can fail and still have a working cluster).

Riak exposes their CAP tuning controls via the named variables N, R, and W. These variables are defined as follows:

N
Number of nodes to replicated a piece of data
R
Number of nodes to read data to be considered success (read failure tolerance)
W
Number of nodes to write data to be considered write complete (write fault tolerance)

In addition, Riak exposes these additional tuning controls:

PR
Number of primary, non-fallback nodes that must return results for a successful read
PW
Number of primary, non-fallback nodes that must accept a write
DW
Number of nodes which have received an acknowledgement of the write from the storage backend

Bucket Level CAP Controls in Riak

Here's an example on how to set bucket level CAP settings in Riak with CorrugatedIron:


// Get existing bucket properties
let properties = ciClient.GetBucketProperties("animals",true).Value

// Set # of nodes a write must ultimately replicate to
// This should be set at the creation of the bucket
properties.SetNVal(3u)

// Set number of nodes that must successfully written before successful write response
properties.SetWVal(2u)

// Set # of nodes required to read a value succesfully
properties.SetRVal(1u)

// Set primary read value
properties.SetPrVal(1u)

// Set primary write value
properties.SetPwVal(1u)

// Set durable write value
properties.SetDwVal(1u)

// Change bucket properties with these new CAP control values
ciClient.SetBucketProperties("animals",properties)

Per Request CAP Controls in Riak

Riak allows you to tune CAP controls at per request level:

// Setting W & DW on puts
let options = new RiakPutOptions()
options.SetW(3u).SetDw(1u)
let data = new RiakObject("animals","toto",{nickname="Toto"; breed="Cairn Terrier"; score=5})
ciClient.Put(data,options)

// Get item with R value set to 2
ciClient.Get("animals","toto",1u).Value.GetObject<Animal>()

// Specify quorum
let getOptions = new RiakGetOptions()
getOptions.SetR("quorum")

// Need to convert IRiakClient to RiakClient in order to set RiakGetOptions
let client = ciClient :?> RiakClient
client.Get("animals","toto",getOptions).Value.GetObject<Animal>()

Monday, June 03, 2013

Riak MapReduce with F#

Lately, I have been reading the book Signals and Noise by Nate Silver. His book references an IBM webpage that claims the world is creating 2.5 quintillion (1018) bytes of data a day and that 90% of the data that currently exists the world was created in the past two years. Combine this fact with the rise in multicore processors, would explain the surging interest in functional programming and NoSQL and MapReduce frameworks. There is just too much data to be handled by a single machine alone

Continuing the journey in the book Seven Databases in Seven Weeks as I explore MapReduce. MapReduce framework is popularized by Google's seminal paper, but the genesis of that framework started with ideas in functional programming languages. After all, if functional programming is intended to help one run code on multiple processors on a single server, it's a logical conclusion to extend that computing from multiple processors on a single server to multiple servers. However, it's the recent technology factors and the data tsunami that I believe encourages people to look at functional programming and NoSQL databases.

What I would like to see in the future is that MapReduce capability is integrated with functional programming in such a way that calling a map/reduce function on a typed collection that recognizes that the typed collection is backed by a NoSQL database that it seamlessly spawns the function to the NoSQL stores in a MapReduce fashion.

I want to do something like the following:

Seq.map myfunc myRiakCollection

and the compiler/runtime will automatically convert myfunc into an appropriate form to run on all the data nodes and returns the results. But since we're not there yet, we need to do a little more work.

Some additional items to note, Riak Handbook warns that:

MapReduce in Riak is not exactly the same as Hadoop, where you can practically analyze an infinite amount of data. In Riak, data is pulled out of the system to be analyzed. That alone sets the boundaries of what's possible.

This is further reinforced by this following mailing list entry in a response from Sean Cribbs, developer advocate from Basho:

...Riak's MapReduce is designed for low-latency queries, but it's also designed for "targeted" queries where you don't slurp the entire contents of a bucket from disk. Structure your data so that you can know -- or deterministically guess -- what keys should be involved in your query. Then you won't be trying to fetch and filter all of the data all the time.

I wanted to point this out as this goes against my mental model of how MapReduce should work.


MapReduce with CorrugatedIron

In this particular example, I will use the Javascript example from the book. First, we'll define some helper functions that provides some syntatic sugar in working with the CorrugatedIron's fluent interface:

// Helper function for CorrugatedIron's job inputs for bucket/key pairs only.
let setinputs data (q:RiakMapReduceQuery) = 
    let inputs = new RiakBucketKeyInput()

    data
    |> List.iter( fun (bucket,key) -> inputs.AddBucketKey(bucket,key)) 

    q.Inputs(inputs)

//  Helper function mapping Javascript functions
let mapjs  src keep (q:RiakMapReduceQuery) =
    q.MapJs(fun m -> m.Source(src).Keep(keep) |> ignore)

// Helper function for mapping stored functions
let mapsf bucket func keep (q:RiakMapReduceQuery) =
    q.MapJs(fun m -> m.BucketKey(bucket,func).Keep(keep) |> ignore)

// Helper function for mapping builtin functions
let mapbuiltins func keep (q:RiakMapReduceQuery) =
    q.MapJs(fun m -> m.Name(func).Keep(true) |> ignore)

// Getting results from CorrugatedIron seems convoluted to me
// Helper function is used to extract the data to a simpler form 
let getResults (results:RiakResult<RiakMapReduceResult>) =
    let getText raw = raw |> System.Text.Encoding.Default.GetString
    results.Value.PhaseResults 
 |> Seq.map (fun phase -> phase.Values |> Seq.map getText ) 
 |> Seq.concat 
 
// Helper function for Reduce operations in Javascript
let reducejs src keep (q:RiakMapReduceQuery) =
    q.ReduceJs(fun m -> m.Source(src).Keep(keep) |> ignore)

// Helper function for link walking with mapreduce
let linkphase bucket keep (q:RiakMapReduceQuery) =
    q.Link(fun l -> l.Keep(keep).Bucket(bucket) |> ignore)
 

Here's the script to perform the MapReduce in CorrugatedIrons. In the following snippet, ciClient refers to CorrugatedIron's RiakClient object, in the previous blogs, ciClient was just client:

// Mapreduce example from the book
let src ="function(v) {
     /* From the Riak object, pull data and parse it as JSON */
     var parsed_data = JSON.parse(v.values[0].data);
     var data = {};

     /* Key capacity number by room style string */
     data[parsed_data.style] = parsed_data.capacity;
     return [data];
   }"

let rooms =  [("rooms","101");("rooms","102");("rooms","103");]

let query = new RiakMapReduceQuery()
   |> setinputs rooms
   |> mapjs src true

ciClient.MapReduce(query)
|> getResults

Results:

seq ["[{"suite":3}]"; "[{"king":1}]"; "[{"double":4}]"]

MapReduce with REST API

It's not all that hard to do MapReduce with ASP.NET MVC REST API although your json script becomes more complex. In the following example, I had to escape the double quote(") because, alas, I'm still using F# 2.0. If you're using F# 3.0, you can use the new triple quote string syntax to wrap the MapReduce script.

// This helper function will be used in all the later examples with REST API
let mapred script =
    let post_url = sprintf "%s/mapred" riakurl 
    let content = new StringContent(script)
    content.Headers.ContentType.MediaType <- "application/json"
    let response = restClient.PostAsync(post_url,content)
    response.Wait()
    let results = response.Result.Content.ReadAsStringAsync()
    results

let mapreduceScript =  "{
    \"inputs\":[
        [\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"]
    ],
    \"query\":[
        {\"map\":{
            \"language\":\"javascript\",
            \"source\":
            \"function(v) {
                /* From the Riak object, pull data and parse it as JSON */
                var parsed_data = JSON.parse(v.values[0].data);
                var data = {};
                /* Key capacity number by room style string */
                data[parsed_data.style] = parsed_data.capacity;
                return [data];
            }\"
        }}
      ]
    }"

mapred mapreduceScript 

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 2;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"king":1},{"suite":3},{"double":4}]";
     Status = RanToCompletion;}


Stored Functions with CorrugatedIron

I can store custom Javascript mapreduce functions in a specific bucket/key and call it for use:

let myfunc = "
 function(v) {
  var parsed_data = JSON.parse(v.values[0].data);
  var data = {};
  data[parsed_data.style] = parsed_data.capacity;
  return [data];
 }"

// Store the map function in a bucket value
let putResults = new RiakObject("my_functions","map_capacity", myfunc) 
                 |> ciClient.Put

// Querying using stored map function
let rooms = [("rooms","101");("rooms","102");("rooms","103");("rooms","104");]

let query = new RiakMapReduceQuery()
   |> setinputs rooms
   |> mapsf "my_functions" "map_capacity" true

ciClient.MapReduce(query)
|> getResults

Results:

seq ["[{"suite":3}]"; "[{"king":1}]"; "[{"double":4}]"; "[{"suite":7}]"]

Stored Functions with REST API

With REST API, reusing the mapred function defined previously:

let storedFuncScript = "
{
    \"inputs\":[
        [\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"],[\"rooms\",\"104\"]
    ],
    \"query\":[
        {\"map\":{
        \"language\":\"javascript\",
        \"bucket\":\"my_functions\",
        \"key\":\"map_capacity\"
    }}
    ]
}
"

let savesf bucket key myfunc =
    let post_url = sprintf "%s/riak/%s/%s" riakurl bucket key
    let content = new StringContent(myfunc)
    content.Headers.ContentType.MediaType <- "application/json"
    restClient.PostAsync(post_url,content)

// Store the map function in a bucket value
savesf "my_functions" "map_capacity" myfunc


mapred storedFuncScript

Results:

val it : TaskTask<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 5;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"suite":3},{"double":4},{"king":1},{"suite":7}]";
     Status = RanToCompletion;}

Built-in Functions with CorrugatedIron

Riak has some built-in mapreduce javascript functions. I couldn't find any reference documentation on it, the closes I came is the actual Javascript code in github: . Here are the list of builtin Javascript functions from that source code:

  • Riak.getClassName(obj)
  • Riak.filterNotFound(values)
  • Riak.mapValues(value,keyData,arg)
  • Riak.mapValuesJson(value,keyData,arg)
  • Riak.mapByFields(value,keyData,fields)
  • Riak.reduceSum(values,arg)
  • Riak.reduceMin(values,arg)
  • Riak.reduceMax(values,arg)
  • Riak.reduceSort(value,arg)
  • Riak.reduceNumericSort(value, arg)
  • Riak.reduceLimit(value, arg)
  • Riak.reduceSlice(value, arg)

In the following example, we're only going to use Riak.mapValuesJson():

let rooms = [("rooms","101");("rooms","102");("rooms","103");("rooms","104");]


let query = new RiakMapReduceQuery()
   |> setinputs rooms
   |> mapbuiltins "Riak.mapValuesJson" true

let results = ciClient.MapReduce(query)

// Collect the results and flatten the nested collections
results.Value.PhaseResults
|> Seq.map (fun x -> x.GetObjects<IEnumerable<Room>>())
|> Seq.concat
|> Seq.concat

Results:

val it : seq<Room> =
  seq [{style = "double"; capacity = 4;}; 
       {style = "suite";  capacity = 3;}; 
    {style = "suite";  capacity = 7;}; 
    {style = "king";   capacity = 1;}]

Built-in Functions with CorrugatedIron

Calling built-in functions with REST API:

let builtinFunc = "
{
 \"inputs\":[
  [\"rooms\",\"101\"],[\"rooms\",\"102\"],[\"rooms\",\"103\"]
 ],
 \"query\":[
  {\"map\":{
  \"language\":\"javascript\",
  \"name\":\"Riak.mapValuesJson\"
 }}
 ]
}
"

mapred builtinFunc

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 6;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"style":"double","capacity":4},
             {"style":"king","capacity":1},
    {"style":"suite","capacity":3}]";
     Status = RanToCompletion;}

Reducing with CorrugatedIron

Reduce operations with CorrguatedIron:

let reduceScript = "
function(v) {
  var totals = {};
  for (var i in v) {
 for(var style in v[i]) {
   if (totals[style]) totals[style] += v[i][style];
   else               totals[style] = v[i][style];
   }
  }
  return [totals];        
}"


let query = (new RiakMapReduceQuery()).Inputs("rooms")
   |> mapsf "my_functions" "map_capacity" false
   |> reducejs reduceScript true


ciClient.MapReduce(query)
|> getResults

Results:

seq["[{"queen":9051,"king":9189,"single":8811,"double":8629,"suite":9231}]"]

Reduce operations with REST API

MapReduce with REST API:

let myReduceScript = "
{
    \"inputs\":\"rooms\",
    \"query\":[
    {\"map\":{
        \"language\":\"javascript\",
        \"bucket\":\"my_functions\",
        \"key\":\"map_capacity\"
    }},
    {\"reduce\":{
        \"language\":\"javascript\",
        \"source\":
            \"function(v) {
                var totals = {};
                for (var i in v) {
                    for(var style in v[i]) {
                        if( totals[style] ) totals[style] += v[i][style];
                        else totals[style] = v[i][style];
                    }
                }
                return [totals];
            }\"
        }}
    ]
}"

mapred myReduceScript

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 7;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"suite":9231,"king":9189,"double":8629,"queen":9051,"single":8811}]";
     Status = RanToCompletion;}

Key Filters with CorrugatedIron

Key filters is used to reduce the input set before the map/reduce operations:

let reduceScript = "
 function(v) {
   var totals = {};
   for (var i in v) {
  for(var style in v[i]) {
    if (totals[style]) totals[style] += v[i][style];
    else               totals[style] = v[i][style];
    }
   }
   return [totals];        
}"

let keyFilter = new Action<RiakFluentKeyFilter>(fun x -> x.StringToInt().LessThan(1000) |> ignore)

let query = (new RiakMapReduceQuery()).Inputs("rooms").Filter(keyFilter)
   |> mapsf "my_functions" "map_capacity" false
   |> reducejs reduceScript true


ciClient.MapReduce(query)
|> getResults

Results:

val it : seq<string> =
  seq ["[{"queen":780,"suite":968,"king":838,"single":791,"double":758}]"]

Key Filters with REST API

Adding key filters with REST API:

let keyFilterScript = "
{
    \"inputs\":{
        \"bucket\":\"rooms\",
        \"key_filters\":[[\"string_to_int\"], [\"less_than\", 1000]]},    
    \"query\":[
    {\"map\":{
        \"language\":\"javascript\",
        \"bucket\":\"my_functions\",
        \"key\":\"map_capacity\"
    }},
    {\"reduce\":{
        \"language\":\"javascript\",
        \"source\":
            \"function(v) {
                var totals = {};
                for (var i in v) {
                    for(var style in v[i]) {
                        if( totals[style] ) totals[style] += v[i][style];
                        else totals[style] = v[i][style];
                    }
                }
                return [totals];
            }\"
        }}
    ]
}"

mapred keyFilterScript

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 10;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{"queen":780,"king":838,"suite":968,"single":791,"double":758}]";
     Status = RanToCompletion;}

MapReduce Link Walking with CorrugatedIron

In my previous blog, I talked about the fact I couldn't figure out how to specify the keep flag in the link walk examples. With map reduce link walking in CorrugatedIron, I now can now specify the keep flag for each of the link-map-reduce phases:


let keyFilter = new Action<RiakFluentKeyFilter>(fun x -> x.Equal("2") |> ignore)

let query = (new RiakMapReduceQuery()).Inputs("cages").Filter(keyFilter)
   |> linkphase "animals" false
   |> mapjs "function(v) { return [v]; }" true

ciClient.MapReduce(query)
|> getResults

Results:


val it : seq<seq<string>> =
  seq
    [seq [];
     seq ["[{
    "bucket":"animals",
    "key":"ace",
    "vclock":"a85hYGBgymDKBVIcrFrhrwI5O5wzmBKZ8lgZlj/hPM0HlVLm/PczkDP1FlRqHUgqCwA=",
    "values":
      [{"metadata":
     {"X-Riak-VTag":"4XZxrmgvKzan8X4FKz5hti",
      "content-type":"application/json",
   "index":[],
   "X-Riak-Last-Modified":"Thu, 16 May 2013 23:15:58 GMT"},
   "data":"{\"nickname\":\"The Wonder Dog\",\"breed\":\"German Shepherd\"}"}]}]"]] 

MapReduce Link Walking with REST API

With REST API:

// Code for Link Walking with MapReduce
let mrLinkWalk = "
{
 \"inputs\":{
  \"bucket\":\"cages\",
  \"key_filters\":[[\"eq\", \"2\"]]
 },
 \"query\":[
  {\"link\":{
   \"bucket\":\"animals\",
   \"keep\":false
  }},
  {\"map\":{
   \"language\":\"javascript\",
   \"source\":
    \"function(v) { return [v]; }\"
  }}
 ]
}
"

mapred mrLinkWalk

Results:

val it : Task<string> =
  System.Threading.Tasks.Task`1[System.String]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 13;
     IsCanceled = false;
     IsCompleted = true;
     IsFaulted = false;
     Result = "[{
   "bucket":"animals",
   "key":"ace",
   "vclock":"a85hYGBgymDKBVIcrFrhrwI5O5wzmBKZ8lgZlj/hPM0HlVLm/PczkDP1FlRqHUgqCwA=",
   "values":[{
     "metadata":{
    "X-Riak-VTag":"4XZxrmgvKzan8X4FKz5hti",
    "content-type": "application/json",
    "index":[],
    "X-Riak-Last-Modified":"Thu, 16 May 2013 23:15:58 GMT"},
      "data":"{\"nickname\":\"The Wonder Dog\",\"breed\":\"German Shepherd\"}"}]}]";
     Status = RanToCompletion;}