NCache has lots of features: ASP.NET Cache provider and Full-Text searching, among others. But, this time, let’s see another NCache feature: Publisher/Subscriber (Pub/Sub) messaging. Let’s learn how to implement the Pub/Sub messaging pattern with NCache.
What’s Pub/Sub messaging?
Pub/Sub is a messaging pattern where senders (or publishers) share messages with multiple receivers (or subscribers) through a channel, avoiding coupling between senders and receivers.
With Pub/Sub messaging, we can share messages between applications and offload time-consuming operations to background processors. For example, in a Reservation Management System, we can resize room images in a background processor by “listening” to an event triggered by a Web application.
NCache provides an in-memory Pub/Sub messaging to enable real-time information sharing between .NET applications.
Thanks to distributed architecture, NCache offers a scalable, highly-available, and storage-efficient pub/sub messaging mechanism.
How to implement Pub/Sub messaging with NCache
Before moving to a sample application, let’s go through some terminology first.
With NCache, the channel to exchange messages is called a Topic. Each subscriber subscribes to a topic to receive the messages sent to it.
NCache provides multiple types of subscriptions between the subscribers and publishers. In Pub/Sub messaging, a subscription is the “interest” of one or more subscribers in a topic.
NCache has two types of subscriptions:
- Non-durable: With this subscription type, subscribers won’t receive any message sent while they are disconnected. This is the default subscription type.
- Durable: Subscribers won’t lose any messages while they’re disconnected. With Durable subscriptions, messages are stored until subscribers rejoin or the messages expire.
Also, NCache has these two types of policies:
- Shared: In this policy, a subscription could have more than one active subscriber at a time. If a subscriber leaves the network, the subscription keeps active. This policy is only supported by Durable subscriptions.
- Exclusive: Unlike the Shared policy, subscriptions with an Exclusive policy could have only one subscriber at a time. This policy is available for both Durable and Non-Durable subscriptions.
In Pub/Sub messaging, subscribers register to a topic by name. Later in our sample application, we will use this subscription method. But, NCache also supports a pattern-based subscription method. This means a subscriber listens
to multiple topics in a single call. For example, a subscriber listening to the pattern new-*
will receive messages sent to the new-movies
and new-series
topics.
For more details about topic priorities and messages parameters, check Pub/Sub Messaging Components and Usage.
With this terminology in place, let’s build a sample .NET application to notify movie fans about every new movie release. Let’s write two Console applications: one as a publisher and another as a subscriber.
1. Publish a New Movie Message
First, let’s create a Console app to publish some movies. Also, let’s install the Alachisoft.NCache.SDK
NuGet package, version 5.3.0.
In the Program.cs
file, let’s publish some random movies from IMDb to a newReleases
topic. Something like this,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
Let’s go through it. First, we started by grabbing a reference to an NCache cache with GetCache()
. We’re using the default demoCache
created during the NCache installation.
Then, we created a new topic using CreateTopic()
with a topic name. This is the same name we will use later in the subscriber application. Also, we attached a callback to get notified of delivery failures.
To publish a message for every movie, we used PublishAsync()
with three parameters: an NCache Message
, the DeliveryOption.All
to notify all subscribers, and true
to get notified of failures.
The NCache Message
is a wrapper around the object to send. We should mark it as [Serializable]
. For example, here’s the Movie
record,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
To build the Message
, we used the ToMessage()
extension method with an expiration.
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
If one of our messages expires and nobody receives it, the OnFailureMessageReceived
callback is called. For more details, check Messages Behavior and Properties.
For example, let’s run our Publisher application without any subscriber (yet) to let the messages expire,
In our Publisher app, we used PublishAsync()
. Also, NCache has support to send multiple messages in bulk in a single call with the PublishBulk()
method.
The Publisher is ready. Let’s write the Subscriber application.
2. Subscribe to New Movie Messages
Let’s create another Console application to subscribe to the newReleases
topic.
The Program.cs
file looks like this,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
This time, we referenced a topic one using the same topic name we used in the Publisher application and attached a callback in case of topic deletion.
Next, we create a non-durable subscription using CreateSubscription()
with a MessageReceived
callback and DeliveryMode.Async
. With this delivery mode, NCache doesn’t guarantee the order of messages. To receive messages in order, we should use DeliveryMode.Sync
.