Monday, April 30, 2012

F#, Clojure and Message Queues on Tibco EMS

It looks like I will be getting much more hands on with Tibco EMS. Since the Tibco EMS system in use will have connections from both .NET platforms and Java platforms, I wanted to write some scripts to run some engineering tests on Tibco EMS. I decided to simulate .NET side connections with F# and Java side connections with Clojure. Taking the sample code from Tibco installation, I created the following F# script that sends messages to a queue from the sample C# code:

#r @"C:\tibco\ems\6.3\bin\TIBCO.EMS.dll"

open System
open TIBCO.EMS

let serverUrl = "tcp://localhost:7222"
let producer = "producer"
let consumer = "consumer"
let password = "testpwd"
let queueName = "testQueue"


let getQueueTextMessages serverUrl  userid password queueName messageProcessor =
    async {
        let connection = (userid,password)
                         |> (new QueueConnectionFactory(serverUrl)).CreateQueueConnection
        let session = connection.CreateQueueSession(false,Session.AUTO_ACKNOWLEDGE)
        let queue = session.CreateQueue(queueName)
        let receiver =  session.CreateReceiver(queue)
        connection.Start()
        printf "Queue connection established!"
        while true do
            try
                receiver.Receive() |> messageProcessor
            with _ ->  ()
    }


let sendQueueTextMessages serverUrl  userid password queueName messages =
    let connection = (userid,password)
                     |> (new QueueConnectionFactory(serverUrl)).CreateQueueConnection
    let session = connection.CreateQueueSession(false,Session.AUTO_ACKNOWLEDGE)
    let queue = session.CreateQueue(queueName)
    let sender = session.CreateSender(queue)
    connection.Start()

    messages
    |> Seq.iter (fun item -> session.CreateTextMessage(Text=item)
                             |> sender.Send)
                             
    connection.Close()



// Just dump message to console for now
let myMessageProcessor (msg:Message) =
    msg.ToString() |> printf "%s\n"


let consumeMessageAsync = getQueueTextMessages "tcp://localhost:7222" "consumer" "testpwd"
let produceMessages queueName messages = sendQueueTextMessages "tcp://localhost:7222" "producer" "testpwd" queueName messages 

// Start message consumer asynchronously
Async.Start(consumeMessageAsync "testQueue" myMessageProcessor)


// Send messages to the Tibco EMS   
[ "Aslund"; "Barrayar"; "Beta Colony"; "Cetaganda"; "Escobar"; "Komarr"; "Marilac"; "Pol"; "Sergyar"; "Vervain"]
|> produceMessages "testQueue"

The queue consumer is implemented asynchronously so it won't block executing subsequent statements. To test Tibco JMS from Java, here is the equivalent Clojure code:

(import '(java.util Enumeration)
        '(com.tibco.tibjms TibjmsQueueConnectionFactory)
        '(javax.jms Message JMSException  Session
                    Queue QueueBrowser 
                    QueueConnection QueueReceiver 
                    QueueSession QueueSender))
                  
(def serverUrl "tcp://localhost:7222")
(def producer "producer")
(def consumer "consumer")
(def password "testpwd")
(def queueName "testQueue")

; Consume Queue Text messages asynchronously
(defn get-queue-text-messages [server-url user password queue-name process-message]
    (future
        (with-open [connection (-> (TibjmsQueueConnectionFactory. server-url)
                                   (.createQueueConnection user password))]
            (let [session (.createQueueSession connection false Session/AUTO_ACKNOWLEDGE)
                  queue (.createQueue session  queue-name)]
                (with-open [receiver (.createReceiver session queue)]              
                    (.start connection)
                    (loop []                       
                        (process-message (.receive receiver))
                        (recur)))))))
                   
; Send multiple Text messages
(defn send-queue-text-messages [server-url user password queue-name messages]
    (with-open [connection (-> (TibjmsQueueConnectionFactory. server-url)
                               (.createQueueConnection user password))]
        (let [session (.createQueueSession connection false Session/AUTO_ACKNOWLEDGE)
              queue (.createQueue session  queue-name)
              sender (.createSender session queue)]
            (.start connection)
            (doseq [item messages]
                (let [message (.createTextMessage session)]
                    (.setText message item)
                    (.send sender message))))))


; Create function aliases with connection information embedded                    
(defn consume-messages [queue-name message-processor]
    (get-queue-text-messages  serverUrl producer password queue-name message-processor))

(defn produce-messages [queue-name messages]
    (send-queue-text-messages  serverUrl producer password queue-name messages))

; Just dump messages to console for now
(defn my-message-processor [message]
    (println (.toString message)))

    
; Start consuming messages asynchronously
(consume-messages "testQueue" my-message-processor)                            

; Send messages to queue
(def my-messages '("alpha" "beta" "gamma" "delta"
                   "epsilon" "zeta" "eta" "theta"
                   "iota" "kappa" "lambda" "mu" "nu"
                   "xi", "omicron" "pi" "rho"
                   "signma" "tau" "upsilon" "phi",
                   "chi" "psi" "omega"))                    

(produce-messages  "testQueue"  my-messages)    

With these scripts, I can easily swap in different message generators and message processors as needed for any testing purposes. When I fired up both these scripts up to the part where queue consumers are running in both F# and Clojure version and then send the messages, I could see that Tibco EMS send half the messages to my F# script and the other half to my Clojure script. Since both of these scripts run in REPL environment, I can easily adjust my level of testing as I get results.

1 comment:

Anders said...

Interesting post!

Thanks for sharing this sample code.