Every day, we generate and consume an increasing amount of data. For businesses, the speed of data processing impacts the decisions they make. As the demand for efficient data processing grows, our software needs to keep up with that demand. Long polling isn’t simply an alternative. Let’s look at one NCache feature, Continuous Query, designed to support real-time data processing.
Continuous Query
With Continuous Query, NCache provides a mechanism to monitor an observable data set within a specific time window. This way, NCache notifies us about all changes occurring to that data set while it’s cached. Continuous Query works as a mechanism to monitor changes, not to alter application data.
Thanks to its distributed architecture, NCache offers scalability, high availability, and storage efficiency. With its self-healing peer-to-peer clustering architecture, NCache handles large amounts of incoming data for real-time data processing.
How to Implement a Continuous Query
With Continuous Queries, NCache monitors the result of a query. And, unlike relational databases, to search items using a SQL-like query, NCache requires indexes. Otherwise, it would have to scan the entire cache to find items we want to monitor. It will make NCache slow.
One easy way of indexing the entries we want to search is by adding attributes like QueryIndexable and QueryIndexed to our classes and properties. For example, let’s monitor the count of failed parts created by a given machine model after some maintenance work. To search our cached measurements by machine model, let’s annotate our Measurement
class like this:
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Acme.Monitoring.CacheItems; public class Measurement { [QueryIndexed] public string MachineModel { get; set; } public int LastFailedPartCount { get; set; } public DateTimeOffset At { get; set; } } |
Step 1: Register Query and Notifications
Next, let’s install the Alachisoft.NCache.SDK
NuGet package to define a query to monitor and register a notification for changes in our cache entries.
For example, inside an ASP.NET Core hosted service or any other background processor, let’s write a query and a notification to monitor the measurements of all machines with a given model. Something like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 1. Define a continuous query var queryText = "SELECT $VALUE$ FROM Acme.Monitoring.CacheItems.Measurement WHERE MachineModel = ?"; // Inside our query text, we need to fully qualify our objects var queryCommand = new QueryCommand(queryText); queryCommand.Parameters.Add("MachineModel", "ACME-001"); var query = new ContinuousQuery(queryCommand); // 2. Register the notification query.RegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated, datafilter: EventDataFilter.DataWithMetadata); // 3. Register the continuous query ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.RegisterCQ(query); |
Let’s notice that we wrote a continuous query using a SQL-like query text, query command, and parameter. We needed to fully qualify the name of our object inside the query text.
In the query text, we used the $VALUE$
projection to retrieve the actual object stored inside the cache, not only one of its properties. NCache supports other projections to retrieve tags, groups, and the result of projection functions like SUM
, MIN
, and MAX
.
Then, we registered a notification passing a callback, event type, and data filter. Notice we registered the same callback for two event types.With the event type parameter, we specify the event we want to monitor. NCache supports three event types: ItemAdded
, ItemUpdated
, and ItemRemoved
. And, with the data filter parameter, we specify the information we want inside our callback once an event is fired. NCache supports three data filters:
1. None: It only returns the key of the added, updated, or removed entry.
2. Metadata: It returns the affected key and the metadata, such as group name, item priority, and item version.
3.DataWithMetadata: It returns the cache entry and the associated metadata.
In our example, every time we add or update one measurement for a machine with the model “ACME-001”, NCache calls the OnQueryResultSetChanged
method, passing the actual entry added or updated. We used ItemAdded
and EventType.ItemUpdated
as the event type and DataWithMetadata
as the data filter.
The DataWithMetadata
is helpful to avoid fetching items by key again inside the callback. But let’s use it carefully because it’s an expensive network trip.
Step 2: Register Callbacks for Events
One of the parameters we pass while registering a notification is a callback. This is the action we want to perform once an item that satisfies our “monitoring” query gets affected by the event type we registered.
To continue with our example, this is a callback to listen to ItemAdded
and ItemUpdated
events:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void OnQueryResultSetChanged(string key, CQEventArg arg) { switch (arg.EventType) { case EventType.ItemAdded: // A new measurement for an ACME-001 machine was added // Do something here... Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query"); break; case EventType.ItemUpdated: // A measurement for an ACME-001 machine was changed Console.WriteLine($"Item with key '{key}' has been updated in the result set of the continuous query"); // Since we passed DataWithMetadata, we have access to the cache entry itself if (arg.Item != null) { var old = arg.OldItem.GetValue<Measurement>() var updated = arg.Item.GetValue<Measurement>(); // Do something here with old and updated // Send a notification, push a message into a topic... Console.WriteLine($"Updated product '{key}' has '{updatedMeasurement.FailPartCount}'"); } break; } } |
With this callback in place, we can start ingesting measurements from our machines into our cache. NCache will monitor cache items and let us know when a measurement with model “ACME-001” gets added or updated. Then, we can send an email or generate an alert if the failed count of any machine exceeds a threshold.
Step 3: Unregister Query and Notifications
Monitoring queries and notifying clients have some costs. But it doesn’t affect cache clients since it runs asynchronously. To better use Continuous Queries, let’s unregister notifications and queries when we don’t need to monitor our query result anymore, like this.
1 2 3 4 5 6 |
query.UnRegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated); ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.UnRegisterCQ(query); |
Conclusion
That’s how to implement Continuous Query inside .NET applications. With a continuous query, we monitor a query result for additions, changes, and deletions during a window of time. Here, we wrote an example to watch the count of failed parts of machines. But we can use Continuous Query for risk management, fraud detection, log analysis, and other real-time scenarios.
To learn more about Continuous Query, check the Continuous Query Overview guide.