Friday, April 26, 2013

Exploring Riak with F# and CorrugatedIron

Many thanks to David and OJ for recommending CorrugatedIron .Net Riak client library. The following blog entry is to document my experiments with Riak CRUD operations using CorrugatedIron.

Pinging RIAK

I tested CorrugatedIron with F# script and here is the setup code along with testing ping capability:

// Needed to load the following libraries to get F# script to work
#r @"c:\dev\FsRiak\packages\CorrugatedIron.1.3.0\lib\net40\CorrugatedIron.dll"
#r @"c:\dev\FsRiak\packages\protobuf-net.2.0.0.621\lib\net40\protobuf-net.dll"
#r @"c:\dev\FsRiak\packages\Newtonsoft.Json.4.5.11\lib\net40\Newtonsoft.Json.dll"

open CorrugatedIron
open CorrugatedIron.Models
open Newtonsoft.Json
open System

// Setup connections
let cluster = RiakCluster.FromConfig("riakConfig", @"c:\dev\FsRiak\App.config");
let client = cluster.CreateClient();

// Ping the Riak Cluster
client.Ping()

Here is my App.config file used by CorrugatedIron:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="riakConfig" type="CorrugatedIron.Config.RiakClusterConfiguration, CorrugatedIron" />
  </configSections>
  <riakConfig nodePollTime="5000" defaultRetryWaitTime="200" defaultRetryCount="3">
    <nodes>
      <node name="dev1"  hostAddress="mydevhost-a" pbcPort="8087" restScheme="http" restPort="8098" poolSize="10" />
      <node name="dev2" hostAddress="mydevhost-b" pbcPort="8087" restScheme="http" restPort="8098" poolSize="10" />
      <node name="dev3" hostAddress="mydevhost-c" pbcPort="8087" restScheme="http" restPort="8098" poolSize="10" />
    </nodes>
  </riakConfig>
</configuration>

Here's the result from running ping:

val it : RiakResult = CorrugatedIron.RiakResult {ErrorMessage = null;
                                                 IsSuccess = true;
                                                 ResultCode = Success;}

Get List of Buckets

The following method call gets you the list of buckets along with metadata for the call status:

client.ListBuckets()

This method returns the following RiakResult object:

val it : RiakResult<seq<string>> =
  CorrugatedIron.RiakResult`1[System.Collections.Generic.IEnumerable`1[System.String]]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = seq ["photos"; "favs"; "animals"; "cages"; ...];}

Get Bucket Keys

Getting a list of keys for a bucket is also simple:

client.ListKeys("animals")

For the novice, this library warns you to not to do this in production environments....

*** [CI] -> ListKeys is an expensive operation and should not be used in Production scenarios. ***
val it : RiakResult<seq<string>> =
  CorrugatedIron.RiakResult`1[System.Collections.Generic.IEnumerable`1[System.String]]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = seq ["ace"; "polly"];}

Retrieve Content from Riak

Getting a value from Riak is pretty easy with this library:

client.Get("animals","ace")

A dump of the return object shows the actual data plus metadata about the Get operation:

val it : RiakResult<Models.RiakObject> =
  CorrugatedIron.RiakResult`1[CorrugatedIron.Models.RiakObject]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = CorrugatedIron.Models.RiakObject;}

A deeper dive into the Value field of RiakResult object gives the following:

val it : Models.RiakObject =
  CorrugatedIron.Models.RiakObject
    {BinIndexes = dict [];
     Bucket = "animals";
     CharSet = null;
     ContentEncoding = null;
     ContentType = "application/json";
     HasChanged = false;
     IntIndexes = dict [];
     Key = "ace";
     LastModified = 1359744019u;
     LastModifiedUsec = 788127u;
     Links = seq [];
     Siblings = seq [];
     UserMetaData = dict [];
     VTag = "7aPFusRQHlQ36ZP6G6GSyE";
     VTags = seq ["7aPFusRQHlQ36ZP6G6GSyE"];
     Value = [|123uy; 32uy; 34uy; 110uy; 105uy; 99uy; 107uy; 110uy; 97uy;
               109uy; 101uy; 34uy; 32uy; 58uy; 32uy; 34uy; 84uy; 104uy; 101uy;
               32uy; 87uy; 111uy; 110uy; 100uy; 101uy; 114uy; 32uy; 68uy;
               111uy; 103uy; 34uy; 32uy; 44uy; 32uy; 34uy; 98uy; 114uy; 101uy;
               101uy; 100uy; 34uy; 32uy; 58uy; 32uy; 34uy; 71uy; 101uy; 114uy;
               109uy; 97uy; 110uy; 32uy; 83uy; 104uy; 101uy; 112uy; 104uy;
               101uy; 114uy; 100uy; 34uy; 32uy; 125uy|];
     VectorClock = [|107uy; 206uy; 97uy; 96uy; 96uy; 96uy; 204uy; 96uy; 202uy;
                     5uy; 82uy; 28uy; 172uy; 90uy; 225uy; 175uy; 2uy; 57uy;
                     59uy; 156uy; 51uy; 152uy; 18uy; 25uy; 243uy; 88uy; 25uy;
                     132uy; 59uy; 26uy; 78uy; 241uy; 101uy; 1uy; 0uy|];}

The data I really wanted is embedded in another Value field where it is represented as an array of bytes, which is not the final format I want. I wanted to get back the JSON representation of the data I put in. In my previous blog, I had rolled my own JSON serializer/deserializer, but now I wanted to leverage the other pieces of library bundle by CorrugateIron, namely Json.NET. To do so,I define the Animal type and use Json.NET serializer/deserializer to convert between the objects and it's corresponding JSON representation

type Animal =
    { nickname: string; breed: string}

// Getting ace from animals bucket 
client.Get("animals","ace").Value.GetObject<Animal>() 

Adding Content to Riak

Adding content to Riak is pretty easy also, after you define a specific type for Json.NET to serialize the fields of that type:

new RiakObject("animals","delta",{nickname="Snoopy"; breed="Beagle"}) 
|> client.Put

If you dump the RiakResult object and the Value field of RiakResult you get the following:


val it : RiakResult =
  CorrugatedIron.RiakResult`1[CorrugatedIron.Models.RiakObject]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = CorrugatedIron.Models.RiakObject;}
  
val it : RiakObject =
  CorrugatedIron.Models.RiakObject
    {BinIndexes = dict [];
     Bucket = "animals";
     CharSet = null;
     ContentEncoding = null;
     ContentType = "application/json";
     HasChanged = false;
     IntIndexes = dict [];
     Key = "delta";
     LastModified = 1366930771u;
     LastModifiedUsec = 271921u;
     Links = seq [];
     Siblings = seq [];
     UserMetaData = dict [];
     VTag = "OvZlH7bsYKdO8zL76QdDY";
     VTags = seq ["OvZlH7bsYKdO8zL76QdDY"];
     Value = [|123uy; 34uy; 110uy; 105uy; 99uy; 107uy; 110uy; 97uy; 109uy;
               101uy; 34uy; 58uy; 34uy; 83uy; 110uy; 111uy; 111uy; 112uy;
               121uy; 34uy; 44uy; 34uy; 98uy; 114uy; 101uy; 101uy; 100uy; 34uy;
               58uy; 34uy; 66uy; 101uy; 97uy; 103uy; 108uy; 101uy; 34uy; 125uy|];
     VectorClock = [|107uy; 206uy; 97uy; 96uy; 96uy; 96uy; 204uy; 96uy; 202uy;
                     5uy; 82uy; 28uy; 169uy; 111uy; 239uy; 241uy; 7uy; 114uy;
                     230uy; 184uy; 103uy; 48uy; 37uy; 50uy; 230uy; 177uy; 50uy;
                     4uy; 27uy; 190uy; 59uy; 197uy; 151uy; 5uy; 0uy|];}

To verify interoperability, I would check the newly added data with curl:

$ curl -X GET http://192.168.56.1:8098/riak/animals/delta
$ {"nickname":"Snoopy","breed":"Beagle"}

Delete Riak Contents

Delete content by calling the intuitively named Delete method:

client.Delete("animals","delta")

One thing that I wasn't sure is how CorrugatedIron talks to the Riak clusters. In my simple REST API example, I know exactly which host I'm talking to since I explicitly specified the url. For CorrugatedIron, I configure a pool of connections in the app.config file. I wasn't sure which of the nodes CorrugatedIron was talking to. I fire up Wireshark and notice that I'm connected to the Riak cluster via port 8087, which is through the Protocol Buffers Client (PBC) interface...which explains the need to load the PBC libraries. This Protocol Buffers client is something new to me and the bundled protobuf-net library seemed to be the code developed by Google (Many thanks to OJ for correcting me on this, this is NOT a Google library but a library written by Marc Gravell). This, in turn led me to look at Google Protocol Buffers . This little side jaunt into CorrugatedIron library led to the discovery (for me) of a whole new set of network communication protocols. In any case, after checking Wireshark output, it seems that the requests are spread out to the different nodes and not restricted to a single node. I'm guessing that there are some builtin load balancer code in CorrugatedIron that sends my request to different nodes in the Riak cluster.

For the basic CRUD operations on Riak, CorrugatedIron has made it easier to work with Riak then having to come up with my own helper functions. This has been a good start and I hope work through more of the Riak examples from the book Seven Databases in Seven Weeks with CorrugatedIron in future blog posts.

Monday, April 15, 2013

Exploring Riak with F#

I have embraced the Polyglot Programming for quite a while already. This year, I wanted to tackle Polyglot Persistence. RDBMS has dominated my world view of persistence layer with everything else as second class citizens. I thought it was time to expand my world view of persistence layers, especially with the burgeoning popularity of NoSQL movement. To begin my exploration, I picked up the book Seven Databases in Seven Weeks by Eric Redmond and Jim Wilson and started with the first NoSQL persistence layer in the book, which was Riak. According to the book:

Riak is a distributed key-value database where values can be anything-from plain text, JSON, or XML to images or video clips-all accessible through a simple HTTP interface.

Setting up Riak

I deployed riak on 3 servers for testing purposes. In setting up the Riak clusters, I ran into the following errors:

10:42:57.339 [error] gen_server riak_core_capability terminated with reason: no function clause matching orddict:fetch('riak@192.168.56.1', [
{'riak@127.0.0.1',[{{riak_core,staged_joins},[true,false]},{{riak_core,vnode_routing},[proxy,legacy]},...]}]) line 72
/users/domains/riak/lib/os_mon-2.2.9/priv/bin/memsup: Erlang has closed.

A quick Google search brought up the following link Googling the web, I got this link: http://blog.alwayshere.info/2012/11/riak-error-genserver-riakcorecapability.html I originally started riak with 127.0.0.1 address. Then I made the modification to the configuration as documented in http://docs.basho.com/riak/latest/cookbooks/Basic-Cluster-Setup/ in trying to setup my 3 server Riak into a cluster. To fix my error, I had to go to ./data/ring folder and delete everything in there, then everything works as expected.

Most of the examples in the book leveraged curl. However, I learn best if I tried to work the examples in another way. I tried the examples in Clojure, using ClojureWerkz's Welle. I liked ClojureWerkz's Welle wrappers to Riak and would probably use it if I had to develop on Java platform. I also wanted to work with Riak from .NET platform and hence I'm using F# to explore Riak. The following examples where done on Visual Studio 2010 with ASP.NET MVC 4 installed. This also gives me a chance to take the REST API in ASP.NET MVC4 for a spin.

Pinging RIAK

The very first example in the book is to ping the Riak cluster, here's how I implemented it in F#

open System.Net.Http
open System.Threading.Tasks

// My 3 instances of Riak
let riak1_url = "http://192.168.56.1:8098"
let riak2_url = "http://192.168.56.2:8098"
let riak3_url = "http://192.168.56.3:8098"

// Pick one to work with
let riakurl = riak1_url

let client = new HttpClient()

let ping () = client.GetAsync(sprintf "%s/ping" riakurl)

ping()

Running the ping, I would get the following response from F#:

val it : Task =
  System.Threading.Tasks.Task`1[System.Net.Http.HttpResponseMessage]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 4;
     IsCanceled = false;
     IsCompleted = false;
     IsFaulted = false;
     Result = StatusCode: 200, ReasonPhrase: 'OK', Version: 1.1, Content: System.Net.Http.StreamContent, Headers:
{
  Vary: Accept-Encoding
  Date: Wed, 30 Jan 2013 22:20:11 GMT
  Server: MochiWeb/1.1
  Server: WebMachine/1.9.0
  Server: (someone had painted it blue)
  Content-Length: 422
  Content-Type: application/json
};
     Status = RanToCompletion;}

Adding Content to RIAK

Let's start by putting some stuff into Riak with the following snippet of code

let put bucket key content =
    let put_url= sprintf "%s/riak/%s/%s" riakurl bucket key
    client.PutAsync(put_url,content)


let put_html bucket key html =
    let put_url= sprintf "%s/riak/%s/%s" riakurl bucket key
    let content = new StringContent(html)
    content.Headers.ContentType.MediaType <- "text/html"
    put bucket key content


"<html><body><h1>My latest favorite DB is RIAK</h1></body></html>"
|> put_html "favs" "db"

Running the above script gets the following response:

val it : Task =
  System.Threading.Tasks.Task`1[System.Net.Http.HttpResponseMessage]
    {AsyncState = null;
     CreationOptions = None;
     Exception = null;
     Id = 5;
     IsCanceled = false;
     IsCompleted = false;
     IsFaulted = false;
     Result = StatusCode: 204, ReasonPhrase: 'No Content', Version: 1.1, Content: System.Net.Http.StreamContent, Headers:
{
  Vary: Accept-Encoding
  Date: Fri, 01 Feb 2013 18:07:24 GMT
  Server: MochiWeb/1.1
  Server: WebMachine/1.9.0
  Server: (someone had painted it blue)
  Content-Length: 0
  Content-Type: text/html; charset=utf-8
};
     Status = RanToCompletion;}

We got the 204 code as explained in the book. To test that Riak has stored this new content, simply point to any of the Riak instances, (e.g. http://192.168.56.3:8098/riak/favs/db) with a browser and you should see the webpage that was put into the first Riak server.

Here's the sample code to put JSON data into Riak:

// Simple utility to generate JSON - should really use a real JSON library
let tojson data = 
    data |> Seq.map (fun (k,v) -> sprintf "\"%s\" : \"%s\"" k v)
         |> Seq.reduce (sprintf "%s , %s")
         |> sprintf "{ %s }" 


let put_json bucket key jsondata =
    let content = new StringContent(tojson jsondata)
    content.Headers.ContentType.MediaType <- "application/json"
    put bucket key content

[("nickname","The Wonder Dog"); ("breed","German Shepherd")]
|> put_json "animals" "ace"

Again, you can check that it's stored in Riak by pointing the browser to: http://192.168.56.3:8098/riak/animals/ace and you should get back:

{ "nickname" : "The Wonder Dog" , "breed" : "German Shepherd" }

Removing Content from RIAK

Here's a snippet of script to remove a content from Riak


let delete bucket key =
    let delete_url= sprintf "%s/riak/%s/%s" riakurl bucket key 
    client.DeleteAsync(delete_url)

delete "animals" "ace"

Getting Bucket Keys

To get all keys in a bucket

let get_keys bucket =
    let get_url = sprintf "%s/riak/%s?keys=true" riakurl bucket
    get_url |> client.GetStringAsync

let results = get_keys "animals"
printfn "%s" results.Result

The above script would return the following (reformatted for legibility purposes):

{"props":{"name":"animals",
          "allow_mult":false,
    "basic_quorum":false,
    "big_vclock":50,
    "chash_keyfun":{"mod":"riak_core_util",
                    "fun":"chash_std_keyfun"},
    "dw":"quorum",
    "last_write_wins":false,
    "linkfun":{"mod":"riak_kv_wm_link_walker",
               "fun":"mapreduce_linkfun"},
    "n_val":3,
    "notfound_ok":true,
    "old_vclock":86400,
    "postcommit":[],
    "pr":0,
    "precommit":[],
    "pw":0,
    "r":"quorum",
    "rw":"quorum",
    "small_vclock":50,
    "w":"quorum",
    "young_vclock":20},
    "keys":["ace","polly"]}

Retrieving Content from Riak

To retrieve content:

let get bucket key = 
    let get_url = sprintf "%s/riak/%s/%s/" riakurl bucket key
    get_url |> client.GetStringAsync

let results = get "animals" "ace"
printfn "%s" results.Result