Pub/Sub Topics Behaviour and Properties
A topic is an entity containing the message itself, along with additional subscriber and publisher information and is stored in the cache. The topic contains the message store which stores the actual data objects published by the publisher in a queue. It also internally maintains a list of all subscribers subscribing to it.
Once the message is published to the topic, it fires an event, which the topic relays to subscribers based upon preference of message delivery option so that they can get the message as needed. The following figure illustrates the role of a topic as an intermediary channel for publishers and subscribers:
Topic Behavior
Temporary Client Disconnection
In case of temporary client application disconnection and then auto re-connection, all topic related information such as client subscriptions and failure event notifications are re-registered to topic without any interruption on client application end.
Topic Deletion Notification
In case a topic is deleted, it is a forceful deletion and deletes all messages and related meta-info form the cache. Hence, the subscriber and publisher must be notified of this deletion because of the following reasons:
The subscriber might be waiting for incoming messages from the registered topic. Once the topic does not exist, the subscribers can then handle their execution accordingly through event notification and prevent infinite waiting state.
The publisher can prevent sending messages to a non-existing topic and handle any pending messages and future execution accordingly.
Managing Topics
Pre-Requisites for Managing Topics
- Include the following namespace in your application:
Alachisoft.NCache.Client
Alachisoft.NCache.Runtime
Alachisoft.NCache.Runtime.Caching
Alachisoft.NCache.Runtime.Exceptions
- The application must be connected to cache before performing the operation.
- Cache must be running.
- Make sure that the data being added is serializable.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Create Topic
CreateTopic creates topic in cache with specified name. If the topic already exists, an instance of the topic is returned as ITopic. Whenever a message is published on a topic, it is delivered based on message preference to subscribers that are registered on that topic.
Following example creates a topic orderTopic.
try
{
// Pre-condition: Cache is already connected
// Mention the name of the topic
string topicName = "orderTopic";
// Create the topic
ITopic topic = cache.MessagingService.CreateTopic(topicName);
}
catch (OperationFailedException ex)
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Get Topic
GetTopic method fetches an instance of the specified topic from the cache. If the topic exists, it is returned, otherwise null is returned.
The following example gets an existing topic orderTopic from the cache.
try
{
// Pre-condition: Cache is already connected
// Mention the name of the topic
string topicName = "orderTopic";
// Get the topic from the cache
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Verify successful topic retrieval
if (orderTopic != null)
{
// orderTopic will be used for recieving and/or publishing messages
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_NOT_FOUND)
{
// Specified topic does not exist
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Delete Topic
DeleteTopic unregisters the topic from cache and removes all messages associated with that topic. If registered, a topic deletion callback OnTopicDeleted will be triggered upon this method call.
The following code sample deletes the existing topic orderTopic from pubSubCache and removes all messages associated with that topic. If the OnTopicDeleted
callback is registered, it will be triggered upon this method call.
try
{
// Pre-condition: Cache is already connected
// Define the topic to be deleted
string topicName = "orderTopic";
// Delete the topic "orderTopic"
cache.MessagingService.DeleteTopic(topicName);
// Callback will be triggered if registered
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_NOT_FOUND)
{
// Topic does not exist
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Note
This feature is only available from NCache 5.0 SP2 onwards.
You can also delete a topic asynchronously using the DeleteTopicAsync method. Whenever a topic is deleted asynchronously, Task
is returned to the user for performing further tasks without waiting for the topic to be deleted.
The following code shows the asynchronous deletion of topic orderTopic
.
try
{
// Pre-condition: Cache is already connected
// Define the topic to be deleted
string topicName = "orderTopic";
// Delete the topic "orderTopic"
Task task = cache.MessagingService.DeleteTopicAsync(topicName);
// task.Wait(2000);
// Use task to perform further operations according to business logic
// Callback will be triggered if registered
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_NOT_FOUND)
{
// Topic does not exist
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Properties of ITopic Interface
Members | Type | Description |
---|---|---|
MessageDeliveryFailure |
MessageDeliveryFailureCallback |
Event on topic so that publisher receives all failed messages that are not delivered to any subscribers or may be messages are expired or evicted before delivery. |
Name |
string |
Name of the topic specified during topic creation. |
OnTopicDeleted |
TopicDeletedCallback |
Event to handle topic deletion by publisher and subscriber. |
ExpirationTime |
TimeSpan |
Topic level expiration, if message level expiration is not provided, this expiration expires messages in the topic by default. The value is TimeSpan.MaxValue by default. |
IsClosed |
bool |
Check whether topic is disposed, before performing any operation. |
Dispose |
IDisposable |
Removes registered topic’s subscription from cache server. |
Additional Resources
NCache provides sample application for Pub/Sub at:
- GitHub
- Shipped with NCache: %NCHOME%\samples\dotnet\PubSub
See Also
Event Notifications in Cache
Pub/Sub Messages
Publish Messages to Topic
Subscribe for Topic Messages
Continuous Query