Subscribe for Topic Messages
The ITopic interface facilitates creating non-durable subscription and publishing of messages against the topic. The IDurableTopicSubscription interface facilitates creating durable subscription and publishing of messages against the topic. These interfaces also provide event registrations for message delivery failure, receiving messages and deleting topics.
Pre-Requisites
- Include the following namespace in your application:
Alachisoft.NCache.Client
Alachisoft.NCache.Runtime
Alachisoft.NCache.Runtime.Exceptions
- The application must be connected to cache before performing the operation.
- Cache must be running.
- Make sure that the data being added is serializable.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Non-Durable Subscriptions
The CreateSubscription method registers against the topic if the topic exists. It allows the subscriber to register MessageReceivedCallback
against the topic so that it can receive the published messages.
The following code sample does the following:
- Get existing topic of interest –
orderTopic
. - Create subscription for each topic.
- Register events for subscribers to receive messages once published to the topic.
- Unsubscribe subscribers registered to
orderTopic
.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create and register subscribers for Order topic
// MessageReceived callback is specified
ITopicSubscription ordSubscriber =
orderTopic.CreateSubscription(MessageReceived);
// Topics can also be unsubscribed
// ordSubscriber.UnSubscribe();
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Messaage failed to receive
}
}
Durable Subscriptions
The CreateDurableSubscription method registers against the topic if the topic exists. It allows the subscriber to register MessageReceivedCallback against the topic so that it can receive the published messages.
The following code sample does the following:
- Get existing topic of interest –
orderTopic
. - Create durable subscription with shared policy for each topic with a timespan for 20 minutes.
- Register events for subscribers to receive messages once published to the topic.
- Unsubscribe subscribers registered to
orderTopic
.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Create and register subscribers for Order topic
// MessageReceived callback is specified below
// The subscription policy is shared which means that
// the subscription can have more than one subscribers
IDurableTopicSubscription ordSubscriber =
orderTopic.CreateDurableSubscription(subscriptionName,
SubscriptionPolicy.Shared,
MessageReceived, TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
// ordSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback ------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Messaage failed to receive
}
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Pattern Based Subscriptions
NCache lets you provide with a specific pattern and then search the topics which match the pattern provided. Pattern based subscriptions provided by NCache support the following wildcards as already explained in the Pub/Sub Subscriptions section:
*
: zero or many characters?
: any one character[]
: specify a range
The following example provides a pattern based on which the topic is subscribed matching the pattern.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "order*";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService
.GetTopic(topicName, TopicSearchOptions.ByPattern);
// Create and register subscribers for Order topic
ITopicSubscription ordSubscriber =
orderTopic.CreateSubscription(MessageReceived);
// Topics can also be unsubscribed
// ordSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback ---------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Messaage failed to receive
}
}
Pattern Based Durable Subscription
Durable Pattern-based Subscription combines the characteristics of Pattern-based Subscriptions and Durable Subscriptions. In this mode of subscription, the client node can receive pattern-based messages. In case this client gets disconnected and rejoins the network, it will receive any messages that it may have missed during disconnection based on the pattern of the topic it subscribed to.
The following example provides a pattern based on which the topic is subscribed matching the pattern and the subscription made is durable subscription.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "order*";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService
.GetTopic(topicName, TopicSearchOptions.ByPattern);
// Create and register subscribers for Order topic
// MessageReceived callback is specified below
// The subscription policy is exclusive which means that
// one subscription can have only one subscriber at a time
IDurableTopicSubscription ordSubscriber =
orderTopic.CreateDurableSubscription(subscriptionName,
SubscriptionPolicy.Exclusive,
MessageReceived, TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
// ordSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback -------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Messaage failed to receive
}
}
Pattern Based Subscription with Failure Notification
The following example subscribes for topics which fulfil the pattern with callback registered for message delivery failure.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "*Topic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
if (orderTopic != null)
{
// Register message delivery failure
// Since the instances of pattern based topics are get and registered for message failure
// Message failure will be notified for these topics
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Additional Resources
NCache provides sample application for Pub/Sub at:
Shipped with NCache: %NCHOME%\samples\dotnet\PubSub
See Also
Event Notifications in Cache
Pub/Sub Topics
Pub/Sub Messages
Publish Messages to Topic
Continuous Query