It looks like I will be getting much more hands on with Tibco EMS. Since the Tibco EMS system in use will have connections from both .NET platforms and Java platforms, I wanted to write some scripts to run some engineering tests on Tibco EMS. I decided to simulate .NET side connections with F# and Java side connections with Clojure. Taking the sample code from Tibco installation, I created the following F# script that sends messages to a queue from the sample C# code:
#r @"C:\tibco\ems\6.3\bin\TIBCO.EMS.dll" open System open TIBCO.EMS let serverUrl = "tcp://localhost:7222" let producer = "producer" let consumer = "consumer" let password = "testpwd" let queueName = "testQueue" let getQueueTextMessages serverUrl userid password queueName messageProcessor = async { let connection = (userid,password) |> (new QueueConnectionFactory(serverUrl)).CreateQueueConnection let session = connection.CreateQueueSession(false,Session.AUTO_ACKNOWLEDGE) let queue = session.CreateQueue(queueName) let receiver = session.CreateReceiver(queue) connection.Start() printf "Queue connection established!" while true do try receiver.Receive() |> messageProcessor with _ -> () } let sendQueueTextMessages serverUrl userid password queueName messages = let connection = (userid,password) |> (new QueueConnectionFactory(serverUrl)).CreateQueueConnection let session = connection.CreateQueueSession(false,Session.AUTO_ACKNOWLEDGE) let queue = session.CreateQueue(queueName) let sender = session.CreateSender(queue) connection.Start() messages |> Seq.iter (fun item -> session.CreateTextMessage(Text=item) |> sender.Send) connection.Close() // Just dump message to console for now let myMessageProcessor (msg:Message) = msg.ToString() |> printf "%s\n" let consumeMessageAsync = getQueueTextMessages "tcp://localhost:7222" "consumer" "testpwd" let produceMessages queueName messages = sendQueueTextMessages "tcp://localhost:7222" "producer" "testpwd" queueName messages // Start message consumer asynchronously Async.Start(consumeMessageAsync "testQueue" myMessageProcessor) // Send messages to the Tibco EMS [ "Aslund"; "Barrayar"; "Beta Colony"; "Cetaganda"; "Escobar"; "Komarr"; "Marilac"; "Pol"; "Sergyar"; "Vervain"] |> produceMessages "testQueue"
The queue consumer is implemented asynchronously so it won't block executing subsequent statements. To test Tibco JMS from Java, here is the equivalent Clojure code:
(import '(java.util Enumeration) '(com.tibco.tibjms TibjmsQueueConnectionFactory) '(javax.jms Message JMSException Session Queue QueueBrowser QueueConnection QueueReceiver QueueSession QueueSender)) (def serverUrl "tcp://localhost:7222") (def producer "producer") (def consumer "consumer") (def password "testpwd") (def queueName "testQueue") ; Consume Queue Text messages asynchronously (defn get-queue-text-messages [server-url user password queue-name process-message] (future (with-open [connection (-> (TibjmsQueueConnectionFactory. server-url) (.createQueueConnection user password))] (let [session (.createQueueSession connection false Session/AUTO_ACKNOWLEDGE) queue (.createQueue session queue-name)] (with-open [receiver (.createReceiver session queue)] (.start connection) (loop [] (process-message (.receive receiver)) (recur))))))) ; Send multiple Text messages (defn send-queue-text-messages [server-url user password queue-name messages] (with-open [connection (-> (TibjmsQueueConnectionFactory. server-url) (.createQueueConnection user password))] (let [session (.createQueueSession connection false Session/AUTO_ACKNOWLEDGE) queue (.createQueue session queue-name) sender (.createSender session queue)] (.start connection) (doseq [item messages] (let [message (.createTextMessage session)] (.setText message item) (.send sender message)))))) ; Create function aliases with connection information embedded (defn consume-messages [queue-name message-processor] (get-queue-text-messages serverUrl producer password queue-name message-processor)) (defn produce-messages [queue-name messages] (send-queue-text-messages serverUrl producer password queue-name messages)) ; Just dump messages to console for now (defn my-message-processor [message] (println (.toString message))) ; Start consuming messages asynchronously (consume-messages "testQueue" my-message-processor) ; Send messages to queue (def my-messages '("alpha" "beta" "gamma" "delta" "epsilon" "zeta" "eta" "theta" "iota" "kappa" "lambda" "mu" "nu" "xi", "omicron" "pi" "rho" "signma" "tau" "upsilon" "phi", "chi" "psi" "omega")) (produce-messages "testQueue" my-messages)
With these scripts, I can easily swap in different message generators and message processors as needed for any testing purposes. When I fired up both these scripts up to the part where queue consumers are running in both F# and Clojure version and then send the messages, I could see that Tibco EMS send half the messages to my F# script and the other half to my Clojure script. Since both of these scripts run in REPL environment, I can easily adjust my level of testing as I get results.
1 comment:
Interesting post!
Thanks for sharing this sample code.
Post a Comment