Publish Messages to Topic
The ITopic interface facilitates creating subscription and publishing of messages against the topic. This also provides event registrations for message delivery failure, receiving messages and deleting topics.
It provides the Publish method which publishes the message to a specific topic in the cache. It allows the user to specify the message delivery option using an enum DeliveryOption which has two options:
- All (1): Delivers message to all the registered subscribers.
Any (0): Delivers the message to any one of the registered subscribers.
Moreover, it allows the publisher to get notified if the message has failed to deliver because of any issue using the
NotifyDeliveryFailure
flag.
Pre-Requisites
- Include the following namespace in your application:
Alachisoft.NCache.Client
Alachisoft.NCache.Runtime
Alachisoft.NCache.Runtime.Exceptions
- The application must be connected to cache before performing the operation.
- Cache must be running.
- Make sure that the data being added is serializable.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
The following code sample does the following:
- Create dedicated topics for Order related messages.
- Register
MessageDeliveryFailure
event for topic. - Register
OnTopicDeleted
event for topic. - Create messages for each topic, enabling expiration and delivery options.
- Publish the messages.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Publish Asynchronously
Note
This feature is only available from NCache 5.0 SP2 onwards.
PublishAsync lets you publish the messages on the topic asynchronously so that the application does not wait for the operation completion for the next operation. The user has the control returned to him immediately for further processing.
The following example lets you publish a message asynchronously.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
// task.Wait(2000);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Publish Bulk Messages
Note
This feature is only available from NCache 5.0 SP2 onwards.
NCache now lets you publish a multiple number of messages in a single call using the PublishBulk method. This improves the performance and memory usage as a bulk of messages will be combined and published in a single call.
The code below takes an instance of an already created topic orderTopic
and shows the bulk publishing of messages on the topic.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Register Callbacks
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessgeFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Additional Resources
NCache provides sample application for Pub/Sub at:
- GitHub
- Shipped with NCache: %NCHOME%\samples\dotnet\PubSub
See Also
Event Notifications in Cache
Pub/Sub Topics
Pub/Sub Messages
Subscribe for Topic Messages
Continuous Query