Publish Messages to Topic
The ITopic
/Topic
interface facilitates creating subscription and publishing of messages against the topic. This also provides event registrations for message delivery failure, receiving messages and deleting topics.
It provides the Publish
method which publishes the message to a specific topic in the cache. It allows the user to specify the message delivery option using an enum DeliveryOption
which has two options:
- All (1): Delivers message to all the registered subscribers.
Any (0): Delivers the message to any one of the registered subscribers.
Moreover, it allows the publisher to get notified if the message has failed to deliver because of any issue using the
NotifyDeliveryFailure
flag.
Note
- To use Maven packages for NCache Professional Edition, change the
<artifactId>
as shown below:<artifactId>ncache-professional-client</artifactId>
- To use Node.js API in NCache Professional Edition, install and include the
ncache-professional-client
npm package in your Node.js application.
Pre-Requisites
- Install the following NuGet packages in your application:
- Include the following namespace in your application:
Alachisoft.NCache.Client
Alachisoft.NCache.Runtime
Alachisoft.NCache.Runtime.Exceptions
- The application must be connected to cache before performing the operation.
- Cache must be running.
- Make sure that the data being added is serializable.
- For API details, refer to: ICache, CacheItem, ITopic, Publish, GetTopic.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
The following code sample does the following:
- Create dedicated topics for Order related messages.
- Register
MessageDeliveryFailure
event for topic. - Register
OnTopicDeleted
event for topic. - Create messages for each topic, enabling expiration and delivery options.
- Publish the messages.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Publish Asynchronously
Note
This feature is only available from NCache 5.0 SP2 onwards.
PublishAsync lets you publish the messages on the topic asynchronously so that the application does not wait for the operation completion for the next operation. The user has the control returned to him immediately for further processing.
The following example lets you publish a message asynchronously.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Publish Bulk Messages
Note
This feature is only available from NCache 5.0 SP2 onwards.
NCache now lets you publish a multiple number of messages in a single call using the PublishBulk method. This improves the performance and memory usage as a bulk of messages will be combined and published in a single call.
The code below takes an instance of an already created topic orderTopic
and shows the bulk publishing of messages on the topic.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Publish Ordered Messages
Note
This feature is only available in NCache 5.1 and onward.
Messages can be published by mentioning a sequence name resulting in publishing the messages in a specific order. In order to specify ordered messages, a string sequence name is added with the chain of the messages which makes sure to publish all the messages belonging to a specific sequence name on the same server node.
In the example given below, sequence name is added with the messages and the messages are then published using the Publish
method.
try
{
// Pre-condition: Cache is already connected
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
}
catch (OperationFailedException ex)
{
// Exception can occur due to:
// Connection Failures
// Operation performed during state transfer
// Operation Timeout
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
}
Register Callbacks
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Additional Resources
NCache provides sample application for Pub/Sub on GitHub.
See Also
Event Notifications in Cache
Pub/Sub Topics
Pub/Sub Messages
Subscribe for Topic Messages
Continuous Query