Publish Messages to a Topic in a Pub/Sub Model
The ITopic
/Topic
interface in Pub/Sub model facilitates the publishing of messages against the Topic. This also provides event registrations for message delivery failure, receiving messages, and deleting topics.
Note
This feature is also available in NCache Professional.
It provides the Publish
method, which publishes the message to a specific Topic in the cache. While publishing messages to a Topic, the publisher can set the delivery option for messages, message expiration, message delivery failure notification, and Topic deletion notification.
Here, we describe how to publish messages, publish messages asynchronously, publish bulk messages, and publish ordered messages.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client-side features including Pub/Sub model, please refer to the given page on Client-Side API Prerequisites.
- For API details, refer to: ICache, CacheItem, ITopic, Publish, GetTopic, PublishAsync, ExpirationTime, MessageDeliveryFailure, OnTopicDeleted, DeliveryOption, Message, PublishBulk, MessageFailedEventArgs, TopicDeleteEventArgs.
Publish Messages
The following code sample does the following:
- Create dedicated topics for Order related messages.
- Register the
MessageDeliveryFailure
event for the Topic. - Register the
OnTopicDeleted
event for the Topic. - Create messages for each Topic, enabling expiration and delivery options.
- Publish the messages.
// Precondition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Publish Asynchronously
Messages can be published on the Topic asynchronously using PublishAsync so that the application does not wait for the operation completion to perform the next operation. The user has the control returned to them immediately for further processing. The following example lets you publish a message asynchronously.
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
Publish Bulk Messages
Multiple messages can be published in a single call using the PublishBulk method. This improves the performance and memory usage as a bulk of messages will be combined and published in a single call. The code below takes an instance of an already created Topic orderTopic, and shows the bulk publishing of messages to the Topic.
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
Publish Ordered Messages
Note
This feature is only available in NCache 5.2 and onwards.
Messages can be published by mentioning a sequence name resulting in the messages being published in a specific order. To specify ordered messages, a string sequence name is added to the chain of the messages that makes sure to publish all the messages belonging to a specific sequence name on the same server node. In the example given below, a sequence name is added with the messages, and the messages are then published using the Publish
method.
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
Register Callbacks
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
Additional Resources
NCache provides sample application for Pub/Sub on GitHub.
See Also
.NET: Alachisoft.NCache.Runtime.Caching namespace.
Java: com.alachisoft.ncache.runtime.caching namespace.
Python: ncache.client.services class.
Node.js: Topic class.