In my previous blog post, I showed how to connect to a Tibco EMS Queue with F# and Clojure to represent integration interoperability from .NET and Java platforms. Message queues are a way to implement the Request-Reply pattern, which is one of the many enterprise integration patterns described in the book Enterprise Integration Patterns. Another basic enterprise integration pattern is the Publish-Subscribe pattern, which can be implement via JMS topics. This blog post shows how to connect to JMS topic from .NET and Java using Tibco EMS as the JMS provider.
Here is the F# version:
#r @"C:\tibco\ems\6.3\bin\TIBCO.EMS.dll" open System open TIBCO.EMS let serverUrl = "tcp://localhost:7222" let producer = "producer" let consumer = "consumer" let password = "testpwd" let topicName = "testTopic" let subscribeToTopic serverUrl userid password topicName messageProcessor = async { let connection = (userid,password) |> (new TopicConnectionFactory(serverUrl)).CreateTopicConnection let session = connection.CreateTopicSession(false,Session.AUTO_ACKNOWLEDGE) let topic = session.CreateTopic(topicName) let subscriber = session.CreateSubscriber(topic) connection.Start() printf "Subscriber connected!\n" while true do try subscriber.Receive() |> messageProcessor with _ -> () connection.Close() } let publishTopicMessages serverUrl userid password topicName messages = let connection = (userid,password) |> (new TopicConnectionFactory(serverUrl)).CreateTopicConnection let session = connection.CreateTopicSession(false,Session.AUTO_ACKNOWLEDGE) let topic = session.CreateTopic(topicName) let publisher = session.CreatePublisher(topic) connection.Start() messages |> Seq.iter (fun item -> session.CreateTextMessage(Text=item) |> publisher.Send) connection.Close() // Just dump message to console for now let myMessageProcessor (msg:Message) = msg.ToString() |> printf "%s\n" let consumeMessageAsync = subscribeToTopic "tcp://localhost:7222" "consumer" "testpwd" let produceMessages topicName messages = publishTopicMessages "tcp://localhost:7222" "producer" "testpwd" topicName messages // Asynchronously start the topic subscriber Async.Start(consumeMessageAsync "testTopic" myMessageProcessor) // Publish messages to the Tibco EMS topic [ "Aslund"; "Barrayar"; "Beta Colony"; "Cetaganda"; "Escobar"; "Komarr"; "Marilac"; "Pol"; "Sergyar"; "Vervain"] |> produceMessages "testTopic" printf "Done!"
One thing to point out is that Tibco, unfortunately, did not implement IDisposable for it's Connection objects;
perhaps in it's bid to stay faithful to the Java API. That design choice seems unfortunate to me in the sense that I no longer
can leverage C#'s using
keyword or F#'s use
keyword to automatically close connection.
I suppose it is fairly trivial to subclass the QueueConnection
and TopicConnection
class
and add the IDisposable interface, but I feel that
Tibco should have done this and developed the Tibco .NET API using idioms that are .NET specific.
Putting my rants aside, here is the equivalent Clojure code to connect to Tibco Topics:
(import '(java.util Enumeration) '(com.tibco.tibjms TibjmsTopicConnectionFactory) '(javax.jms Message JMSException Session Topic TopicConnectionFactory TopicConnection TopicSession TopicSubscriber)) (def serverUrl "tcp://localhost:7222") (def producer "producer") (def consumer "consumer") (def password "testpwd") (def topicName "testTopic") ;------------------------------------------------------------------------------ ; Subscribe to Topic asynchronously ;------------------------------------------------------------------------------ (defn subscribe-topic [server-url user password topic-name process-message] (future (with-open [connection (-> (TibjmsTopicConnectionFactory. server-url) (.createTopicConnection user password))] (let [session (.createTopicSession connection false Session/AUTO_ACKNOWLEDGE) topic (.createTopic session topic-name)] (with-open [subscriber (.createSubscriber session topic)] (.start connection) (loop [] (process-message (.receive subscriber)) (recur))))))) ;------------------------------------------------------------------------------ ; Publishing to a Topic ;------------------------------------------------------------------------------ (defn publish-to-topic [server-url user password topic-name messages] (with-open [connection (-> (TibjmsTopicConnectionFactory. server-url) (.createTopicConnection user password))] (let [session (.createTopicSession connection false Session/AUTO_ACKNOWLEDGE) topic (.createTopic session topic-name) publisher (.createPublisher session topic)] (.start connection) (doseq [item messages] (let [message (.createTextMessage session)] (.setText message item) (.publish publisher message)))))) ; Create function aliases with connection information embedded (defn produce-messages [topic-name messages] (publish-to-topic "tcp://localhost:7222" "producer" "testpwd" topic-name messages)) (defn consume-messages [topic-name message-processor] (subscribe-topic "tcp://localhost:7222" "consumer" "testpwd" topic-name message-processor)) ; Just dump messages to console for now (defn my-message-processor [message] (println (.toString message))) ; Start subscribing messages asynchronously (consume-messages "testTopic" my-message-processor) ; Publish to topic (def my-messages '("alpha" "beta" "gamma" "delta" "epsilon" "zeta" "eta" "theta" "iota" "kappa" "lambda" "mu" "nu" "xi", "omicron" "pi" "rho" "signma" "tau" "upsilon" "phi", "chi" "psi" "omega")) (produce-messages "testTopic" my-messages)
When I fire up both scripts, the messages published to the topic would be received by both the .NET and Java clients. With these scripts, I can easily swap out the message generators or message processors as needed for any future testing scenarios.
2 comments:
Hi
Can you plesae provide me with complete details as how to test this script for Tibco EMS performance test? I am getting some errors and need your assistance using this script under clojure.
bash-3.00$ ./clojure
Clojure 1.6.0
user=> (def msg64 (RandomStringUtils/randomAlphanumeric 64))
CompilerException java.lang.RuntimeException: No such namespace: RandomStringUtils, compiling:(NO_SOURCE_PATH:1:12)
You will need to add Apache Commons Lang library to your classpath.
Post a Comment