As data becomes more critical to businesses, the need for real-time data processing (particularly when data is arriving in chunks or streams) and analysis is increasing at an unprecedented rate. Applications that depend on real-time data processing require a reliable method of receiving real-time notifications when data changes occur without causing performance overheads that result from manual polling, specifically when working with large datasets. To address these issues, NCache provides its Continuous Query feature – where you can create queries that filter data as per your defined criteria.
Without the hassle of manual polling, CQ enables developers to track data changes in real-time and receive alerts. For example, in a real-time stock trading application, traders need constant updates on the changing stock prices. Without a suitable real-time monitoring system, they would have to manually check stock prices – resulting in lost time and missed opportunities. The traders can, therefore, create queries that filter the stock prices to receive real-time updates whenever the filtered data changes. This gives them the advantage of querying the cache only for the dataset of their choice (by using SQL- like OQL queries) within the distributed cache cluster rather than exhausting the cache. This blog will guide you through using Continuous Query to monitor data changes in NCache using Python.
Monitoring Changes through Continuous Query in NCache
With NCache’s Continuous Query feature, you can define a specific dataset in the distributed cache network and track changes to it using OQL queries. When changes occur within the dataset (due to write operations like add, update or remove operations), registered applications are updated through Cache Level Events, which prevents application overlap by filtering out data using OQL queries. By providing a method for tracking and sharing data between applications through events, CQ enables developers to specify their business logic.
When using CQ in Python, users can leverage the NCache Python Client Library to register for CQ events. You can also specify the amount of data returned upon event execution, i.e., None, Metadata, or DataWithMetadata. However, it’s essential to understand that Continuous Query only acts as a mechanism for monitoring changes rather than altering application data.
Configuring Continuous Query in NCache
The steps mentioned below explain the process of registering and unregistering callbacks, queries, and notifications to receive notifications against the dataset defined by you.
Step 1: Register Callback for Events
First, you need to register a callback for Cache Level Events, it can be either the Add, Update, Remove, or a combination, so that these callbacks are executed whenever the query is modified. The sample code below shows how to register a callback for an ITEM_ADDED event.
1 2 3 4 |
def query_item_callback(key: str, arg: ncache.CQEventArg): if arg.get_event_type() is ncache.EventType.ITEM_ADDED: # Key has been added to the cache print(key + " added to cache") |
You can also register callback events for item updated and removed events.
Step 2: Register Query and Notifications
The key processes to track data changes in NCache are registering callbacks and creating a Continuous Query that specifies the result set criteria. Following the creation of the query, the pre-defined callbacks are registered based on EventType and EventDataFilter. Next, the execute_reader is used to query cached data after the CQ has been registered on the server using register_cq. To trigger events, cache data can be modified to affect the result set. The sample code below demonstrates this scenario.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# Precondition: Cache is already connected # Query for required operation query = "SELECT $Value$ FROM FQN.Product WHERE category = ?" query_command = ncache.QueryCommand(query) query_command.set_parameters({"Category": "Beverages"}) # Create continuous query continuous_query = ncache.ContinuousQuery(query_command) event_type = [ncache.EventType.ITEM_REMOVED] # Item remove notification # EventDataFilter.Metadata returns cache keys + item metadata on updation continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE) # Register continuous query on server cache.get_messaging_service().register_cq(continuous_query) reader = cache.get_search_service().execute_reader(query_command) if reader.get_field_count() > 0: while reader.read(): result = reader.get_value(Product, 1) # Perform operations else: # None query result set returned print("Query result is None") # Update Product data in cache to trigger callback updated_product = Product() updated_product.set_product_id(1001) updated_product.set_product_name("Tea") key = "Product:" + updated_product.get_product_id() cache_item = ncache.CacheItem(updated_product) # Trigger add notifications version = cache.insert(key, cache_item) # This will add item to the result set as it matches query criteria |
Step 3: Unregister Notifications from Continuous Query
To stop receiving notifications from Continuous Query in your application, you have the choice to unregister them using the remove_data_modification_listener method. However, this is specific to the event types like Add and Remove. You can even unregister the item added notifications if, you previously registered for both, item added and removed events, but now only require notifications for the removed item.
1 2 3 |
# Unregister notifications for ItemAdded events only event_type = [ncache.EventType.ITEM_ADDED] c_query.remove_data_modification_listener(cq_event_listener, event_type) |
Step 4: Unregister Continuous Query from Server
To prevent resource utilization when this query is no longer required, you must unregister the Continuous Query from the server. By doing so, you will no longer receive notifications for changes in the query result set. The following code example shows how to unregister CQ from the cache server by using the un_register_cq method.
1 2 |
# Unregister cq from server cache.get_messaging_service().un_register_cq(c_query) |
Conclusion
With NCache’s Continuous Query feature, you can monitor changes in a specific dataset within a distributed cache cluster. It ensures that only the necessary data is retrieved by the application, improving efficiency and minimizing extraneous data processing via OQL queries and Cache Level Events. Such an easy and flexible way to monitor changes? Start your free trial today!