Monday, May 14, 2012

Working with Tibco EMS Message Topics using F# and Clojure

In my previous blog post, I showed how to connect to a Tibco EMS Queue with F# and Clojure to represent integration interoperability from .NET and Java platforms. Message queues are a way to implement the Request-Reply pattern, which is one of the many enterprise integration patterns described in the book Enterprise Integration Patterns. Another basic enterprise integration pattern is the Publish-Subscribe pattern, which can be implement via JMS topics. This blog post shows how to connect to JMS topic from .NET and Java using Tibco EMS as the JMS provider.

Here is the F# version:

#r @"C:\tibco\ems\6.3\bin\TIBCO.EMS.dll"

open System
open TIBCO.EMS

let serverUrl = "tcp://localhost:7222"
let producer = "producer"
let consumer = "consumer"
let password = "testpwd"
let topicName = "testTopic"


let subscribeToTopic serverUrl userid password topicName messageProcessor =
    async {
        let connection = (userid,password)
                         |> (new TopicConnectionFactory(serverUrl)).CreateTopicConnection
        let session = connection.CreateTopicSession(false,Session.AUTO_ACKNOWLEDGE)
        let topic = session.CreateTopic(topicName)
        let subscriber =  session.CreateSubscriber(topic)
        connection.Start()
        printf "Subscriber connected!\n"
        while true do
            try
                subscriber.Receive() |> messageProcessor
            with _ ->  ()
        connection.Close()
    }

let publishTopicMessages serverUrl  userid password topicName messages =
    let connection = (userid,password)
                     |> (new TopicConnectionFactory(serverUrl)).CreateTopicConnection
    let session = connection.CreateTopicSession(false,Session.AUTO_ACKNOWLEDGE)
    let topic = session.CreateTopic(topicName)
    let publisher = session.CreatePublisher(topic)
    connection.Start()

    messages
    |> Seq.iter (fun item -> session.CreateTextMessage(Text=item)
                             |> publisher.Send)
                             
    connection.Close()

// Just dump message to console for now
let myMessageProcessor (msg:Message) =
    msg.ToString() |> printf "%s\n"

let consumeMessageAsync = subscribeToTopic "tcp://localhost:7222" "consumer" "testpwd"


let produceMessages topicName messages = publishTopicMessages "tcp://localhost:7222" "producer" "testpwd" topicName messages 


// Asynchronously start the topic subscriber
Async.Start(consumeMessageAsync "testTopic" myMessageProcessor)


// Publish messages to the Tibco EMS topic
[ "Aslund"; "Barrayar"; "Beta Colony"; "Cetaganda"; "Escobar"; "Komarr"; "Marilac"; "Pol"; "Sergyar"; "Vervain"]
|> produceMessages "testTopic"


printf "Done!"

One thing to point out is that Tibco, unfortunately, did not implement IDisposable for it's Connection objects; perhaps in it's bid to stay faithful to the Java API. That design choice seems unfortunate to me in the sense that I no longer can leverage C#'s using keyword or F#'s use keyword to automatically close connection. I suppose it is fairly trivial to subclass the QueueConnection and TopicConnection class and add the IDisposable interface, but I feel that Tibco should have done this and developed the Tibco .NET API using idioms that are .NET specific.

Putting my rants aside, here is the equivalent Clojure code to connect to Tibco Topics:

(import '(java.util Enumeration)
        '(com.tibco.tibjms TibjmsTopicConnectionFactory)
        '(javax.jms Message JMSException  Session
                    Topic TopicConnectionFactory
                    TopicConnection TopicSession
                    TopicSubscriber))
                  
(def serverUrl "tcp://localhost:7222")
(def producer "producer")
(def consumer "consumer")
(def password "testpwd")
(def topicName "testTopic")

;------------------------------------------------------------------------------
; Subscribe to Topic asynchronously
;------------------------------------------------------------------------------
(defn subscribe-topic [server-url user password topic-name process-message]
    (future
        (with-open [connection (-> (TibjmsTopicConnectionFactory. server-url)
                                   (.createTopicConnection user password))]
            (let [session (.createTopicSession connection false Session/AUTO_ACKNOWLEDGE)
                  topic (.createTopic session  topic-name)]
                (with-open [subscriber (.createSubscriber session topic)]
                    (.start connection)
                    (loop []                       
                        (process-message (.receive subscriber))
                        (recur)))))))

;------------------------------------------------------------------------------
; Publishing to a Topic
;------------------------------------------------------------------------------
(defn publish-to-topic [server-url user password topic-name messages]
    (with-open [connection (-> (TibjmsTopicConnectionFactory. server-url)
                               (.createTopicConnection user password))]
        (let [session (.createTopicSession connection false Session/AUTO_ACKNOWLEDGE)
              topic (.createTopic session  topic-name)
              publisher (.createPublisher session topic)]
            (.start connection)
            (doseq [item messages]
                (let [message (.createTextMessage session)]
                    (.setText message item)
                    (.publish publisher message))))))
                    
                      
; Create function aliases with connection information embedded                    
(defn produce-messages [topic-name messages]
    (publish-to-topic "tcp://localhost:7222" "producer" "testpwd" topic-name messages))

(defn consume-messages [topic-name message-processor]
    (subscribe-topic "tcp://localhost:7222" "consumer" "testpwd" topic-name message-processor))

; Just dump messages to console for now
(defn my-message-processor [message]
    (println (.toString message)))
    
; Start subscribing messages asynchronously
(consume-messages "testTopic" my-message-processor)                            
    
; Publish to topic
(def my-messages '("alpha" "beta" "gamma" "delta"
                   "epsilon" "zeta" "eta" "theta"
                   "iota" "kappa" "lambda" "mu" "nu"
                   "xi", "omicron" "pi" "rho"
                   "signma" "tau" "upsilon" "phi",
                   "chi" "psi" "omega"))                    

(produce-messages  "testTopic"  my-messages)    

When I fire up both scripts, the messages published to the topic would be received by both the .NET and Java clients. With these scripts, I can easily swap out the message generators or message processors as needed for any future testing scenarios.

2 comments:

Kannan said...

Hi
Can you plesae provide me with complete details as how to test this script for Tibco EMS performance test? I am getting some errors and need your assistance using this script under clojure.
bash-3.00$ ./clojure
Clojure 1.6.0
user=> (def msg64 (RandomStringUtils/randomAlphanumeric 64))
CompilerException java.lang.RuntimeException: No such namespace: RandomStringUtils, compiling:(NO_SOURCE_PATH:1:12)

John Liao said...

You will need to add Apache Commons Lang library to your classpath.