Aggregator Implementation and Usage with MapReduce [Deprecated]
This page provides step-by-step guide on implementing and using MapReduce Aggregator with NCache.
Note
This feature is only available in the NCache Enterprise.
Prerequisites for MapReduce Aggregator Implementation
- To learn about the standard prerequisites required to work with all NCache server-side features, please refer to the given page Server-Side API Prerequisites.
- The class must be implemented as a Class Library (.dll) in Visual Studio. This will be deployed on NCache cluster.
- For API details, refer to: ICache, IAggregator, IValueExtractor, Aggregate, AggregateAll, Extract, IExecutionService.
The Aggregator and Value Extractor logic can be implemented in classes implementing the interfaces IAggregator and IValueExtractor respectively. These implementations will contain the logic to be executed over the cache item(s) on the server side. Once implemented, this implementation will be deployed on NCache. You can then invoke the cache from your client application to perform the specified Entry Processor logic over the server.
Step 1: Implement IAggregator Interface
Your custom logic is provided in Aggregate and AggregateAll methods in the IAggregator implementation. Aggregate()
contains the logic of applying the aggregation operation on the same node (locally) as is with the Combiner. If you wish to combine the values using an aggregator before being sent for further processing in the Reducer, you can use the Aggregate()
call.
AggregateAll()
contains the logic of applying the aggregation operation in the Reduce Phase. If you wish to combine the values using an aggregator, you can use the AggregateAll()
call.
Important
Once implemented, deploy this class on NCache by referring to Deploy Providers in the Administrator’s Guide.
The following sample implementation processes give values accordingly and return the result where needed.
[Serializable]
public class IntAggregator : IAggregator
{
string function;
public IntAggregator(string function)
{
this.function = function;
}
public object Aggregate(object value)
{
return Calculate(value);
}
public object AggregateAll(object value)
{
return Calculate(value);
}
private object Calculate(object value)
{
switch (function)
{
case "MIN":
value = int.MinValue;
return value;
case "MAX":
value = int.MaxValue;
return value;
default:
return 0;
}
}
// This class is to be deployed on cache
}
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Step 2: Implement IValueExtractor Interface
Your custom logic is provided in the Extract method, which will extract meaningful information/ attributes from the object like the Mapper does in the MapReduce. The returned value may also be null.
The following sample implementation processes give values accordingly and return the result where needed.
[Serializable]
public class BasicValueExtractor : IValueExtractor
{
public object Extract(object value)
{
try
{
if (value.GetType() == typeof(int))
{
return value;
}
if (value.GetType() == typeof(float))
{
return 0.0;
}
}
catch (Exception e)
{
// Handle exceptions
}
return value;
}
// This class is to be deployed in cache
}
Step 3: Deploy Implementations on the Cache
Deploy these custom implementations and any dependent assemblies on NCache by referring to Deploy Providers in the Administrator’s Guide for help.
Step 4: Use Aggregator in the Application
Once the interfaces are implemented and deployed on the cache, you can execute the Aggregator using the Aggregate method in your client application.
The following code sample adds a bulk of items into the cache and then invokes the Aggregator using the keys added to the cache, for which implementation has been provided in IAggregator class.
// Preconditions: Cache is already connected
// Data exists in cache
int value;
// Get single value by custom implemented aggregator
value = (int)cache.ExecutionService.Aggregate(new BasicValueExtractor(), new IntAggregator("MIN"));
// Get single value by custom implemented aggregator
value = (int)cache.ExecutionService.Aggregate(new BasicValueExtractor(), new IntAggregator("MAX"));
// Using the built-in aggregator
value = (int)cache.ExecutionService.Aggregate(new BasicValueExtractor(), BuiltInAggregator.IntegerSum());
See Also
Aggregator (MapReduce) Components and Working
MapReduce
WAN Replication across Multi Datacenters through Bridge
Deploy Providers