Subscribe for Topic Messages
The ITopic interface facilitates creating non-durable subscription and publishing of messages against the topic. The IDurableTopicSubscription interface facilitates creating durable subscription and publishing of messages against the topic. These interfaces also provide event registrations for message delivery failure, receiving messages and deleting topics. You can also add delivery mode while creating subscriptions that can be synchronous or asynchronous. Synchronous delivery mode can be used for ordered messages whereas if you are not using ordered messages, asynchronous mode improves performance. Delivery mode by default is set to synchronous.
Note
- To use Maven packages for NCache Professional Edition, change the
<artifactId>
as shown below:<artifactId>ncache-professional-client</artifactId>
- To use Node.js API in NCache Professional Edition, install and include the
ncache-professional-client
npm package in your Node.js application.
Pre-Requisites
- Install the following NuGet packages in your application:
- Include the following namespace in your application:
Alachisoft.NCache.Client
Alachisoft.NCache.Runtime
Alachisoft.NCache.Runtime.Exceptions
- The application must be connected to cache before performing the operation.
- Cache must be running.
- Make sure that the data being added is serializable.
- For API details, refer to: ICache, CacheItem, ITopic, IDurableTopicSubscription, CreateSubscription, CreateDurableSubscription.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
Non-Durable Subscriptions
The CreateSubscription method registers against the topic if the topic exists. It allows the subscriber to register MessageReceivedCallback
against the topic so that it can receive the published messages.
The following code sample does the following:
- Get existing topic of interest –
orderTopic
. - Create subscription for each topic.
- Register events for subscribers to receive messages once published to the topic.
- Unsubscribe subscribers registered to
orderTopic
.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create and register subscribers for Order topic
// MessageReceived callback is specified
ITopicSubscription orderSubscriber =
orderTopic.CreateSubscription(MessageReceived);
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
Create Subscriptions with Delivery Mode
Note
This feature is only available in NCache 5.1 and onward.
Delivery mode can be specified while creating subscriptions for ordered messages and can be sync
or async
. The following example creates subscription using CreateSubscription method with sync delivery mode. It is recommended to use sync
mode for ordered messages and async
mode otherwise to achieve high performance.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create and register subscribers for Order topic
// MessageReceived callback is specified
// DeliveryMode is set to async
ITopicSubscription orderSubscriber =
orderTopic.CreateSubscription(MessageReceived, DeliveryMode.Async);
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
Durable Subscriptions
The CreateDurableSubscription method registers against the topic if the topic exists. It allows the subscriber to register MessageReceivedCallback
against the topic so that it can receive the published messages.
The following code sample does the following:
- Get existing topic of interest –
orderTopic
. - Create durable subscription with shared policy for each topic with a timespan for 20 minutes.
- Register events for subscribers to receive messages once published to the topic.
- Unsubscribe subscribers registered to
orderTopic
.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Create and register subscribers for Order topic
// MessageReceived callback is specified below
// The subscription policy is shared which means that
// the subscription can have more than one subscribers
IDurableTopicSubscription orderSubscriber =
orderTopic.CreateDurableSubscription(subscriptionName,
SubscriptionPolicy.Shared,
MessageReceived, TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback ------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Pattern Based Subscriptions
NCache lets you provide with a specific pattern and then search the topics which match the pattern provided. Pattern based subscriptions provided by NCache support the following wildcards as already explained in the Pub/Sub Subscriptions section:
*
: zero or many characters?
: any one character[]
: specify a range
The following example provides a pattern based on which the topic is subscribed matching the pattern.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "order*";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// Create and register subscribers for Order topic
ITopicSubscription orderSubscriber = orderTopic.CreateSubscription(MessageReceived);
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback ---------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
Pattern Based Durable Subscription
Durable Pattern-based Subscription combines the characteristics of Pattern-based Subscriptions and Durable Subscriptions. In this mode of subscription, the client node can receive pattern-based messages. In case this client gets disconnected and rejoins the network, it will receive any messages that it may have missed during disconnection based on the pattern of the topic it subscribed to.
The following example provides a pattern based on which the topic is subscribed matching the pattern and the subscription made is durable subscription.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "order*";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// Create and register subscribers for Order topic
// MessageReceived callback is specified below
// The subscription policy is exclusive which means that
// one subscription can have only one subscriber at a time
IDurableTopicSubscription orderSubscriber =
orderTopic.CreateDurableSubscription(subscriptionName,
SubscriptionPolicy.Exclusive,
MessageReceived, TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback -------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
Pattern Based Subscription with Failure Notification
The following example subscribes for topics which fulfil the pattern with callback registered for message delivery failure.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "*Topic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
if (orderTopic != null)
{
// Register message delivery failure
// Since the instances of pattern based topics are get and registered for message failure
// Message failure will be notified for these topics
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Recommendation: To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Additional Resources
NCache provides sample application for Pub/Sub on GitHub.
See Also
Event Notifications in Cache
Pub/Sub Topics
Pub/Sub Messages
Publish Messages to Topic
Continuous Query