It looks like I will be getting much more hands on with Tibco EMS. Since the Tibco EMS system in use will have connections from both .NET platforms and Java platforms, I wanted to write some scripts to run some engineering tests on Tibco EMS. I decided to simulate .NET side connections with F# and Java side connections with Clojure. Taking the sample code from Tibco installation, I created the following F# script that sends messages to a queue from the sample C# code:
#r @"C:\tibco\ems\6.3\bin\TIBCO.EMS.dll"
open System
open TIBCO.EMS
let serverUrl = "tcp://localhost:7222"
let producer = "producer"
let consumer = "consumer"
let password = "testpwd"
let queueName = "testQueue"
let getQueueTextMessages serverUrl userid password queueName messageProcessor =
async {
let connection = (userid,password)
|> (new QueueConnectionFactory(serverUrl)).CreateQueueConnection
let session = connection.CreateQueueSession(false,Session.AUTO_ACKNOWLEDGE)
let queue = session.CreateQueue(queueName)
let receiver = session.CreateReceiver(queue)
connection.Start()
printf "Queue connection established!"
while true do
try
receiver.Receive() |> messageProcessor
with _ -> ()
}
let sendQueueTextMessages serverUrl userid password queueName messages =
let connection = (userid,password)
|> (new QueueConnectionFactory(serverUrl)).CreateQueueConnection
let session = connection.CreateQueueSession(false,Session.AUTO_ACKNOWLEDGE)
let queue = session.CreateQueue(queueName)
let sender = session.CreateSender(queue)
connection.Start()
messages
|> Seq.iter (fun item -> session.CreateTextMessage(Text=item)
|> sender.Send)
connection.Close()
// Just dump message to console for now
let myMessageProcessor (msg:Message) =
msg.ToString() |> printf "%s\n"
let consumeMessageAsync = getQueueTextMessages "tcp://localhost:7222" "consumer" "testpwd"
let produceMessages queueName messages = sendQueueTextMessages "tcp://localhost:7222" "producer" "testpwd" queueName messages
// Start message consumer asynchronously
Async.Start(consumeMessageAsync "testQueue" myMessageProcessor)
// Send messages to the Tibco EMS
[ "Aslund"; "Barrayar"; "Beta Colony"; "Cetaganda"; "Escobar"; "Komarr"; "Marilac"; "Pol"; "Sergyar"; "Vervain"]
|> produceMessages "testQueue"
The queue consumer is implemented asynchronously so it won't block executing subsequent statements. To test Tibco JMS from Java, here is the equivalent Clojure code:
(import '(java.util Enumeration)
'(com.tibco.tibjms TibjmsQueueConnectionFactory)
'(javax.jms Message JMSException Session
Queue QueueBrowser
QueueConnection QueueReceiver
QueueSession QueueSender))
(def serverUrl "tcp://localhost:7222")
(def producer "producer")
(def consumer "consumer")
(def password "testpwd")
(def queueName "testQueue")
; Consume Queue Text messages asynchronously
(defn get-queue-text-messages [server-url user password queue-name process-message]
(future
(with-open [connection (-> (TibjmsQueueConnectionFactory. server-url)
(.createQueueConnection user password))]
(let [session (.createQueueSession connection false Session/AUTO_ACKNOWLEDGE)
queue (.createQueue session queue-name)]
(with-open [receiver (.createReceiver session queue)]
(.start connection)
(loop []
(process-message (.receive receiver))
(recur)))))))
; Send multiple Text messages
(defn send-queue-text-messages [server-url user password queue-name messages]
(with-open [connection (-> (TibjmsQueueConnectionFactory. server-url)
(.createQueueConnection user password))]
(let [session (.createQueueSession connection false Session/AUTO_ACKNOWLEDGE)
queue (.createQueue session queue-name)
sender (.createSender session queue)]
(.start connection)
(doseq [item messages]
(let [message (.createTextMessage session)]
(.setText message item)
(.send sender message))))))
; Create function aliases with connection information embedded
(defn consume-messages [queue-name message-processor]
(get-queue-text-messages serverUrl producer password queue-name message-processor))
(defn produce-messages [queue-name messages]
(send-queue-text-messages serverUrl producer password queue-name messages))
; Just dump messages to console for now
(defn my-message-processor [message]
(println (.toString message)))
; Start consuming messages asynchronously
(consume-messages "testQueue" my-message-processor)
; Send messages to queue
(def my-messages '("alpha" "beta" "gamma" "delta"
"epsilon" "zeta" "eta" "theta"
"iota" "kappa" "lambda" "mu" "nu"
"xi", "omicron" "pi" "rho"
"signma" "tau" "upsilon" "phi",
"chi" "psi" "omega"))
(produce-messages "testQueue" my-messages)
With these scripts, I can easily swap in different message generators and message processors as needed for any testing purposes. When I fired up both these scripts up to the part where queue consumers are running in both F# and Clojure version and then send the messages, I could see that Tibco EMS send half the messages to my F# script and the other half to my Clojure script. Since both of these scripts run in REPL environment, I can easily adjust my level of testing as I get results.


