Sample Implementation of MapReduce Interfaces
MapReduce related interfaces are provided in
Alachisoft.NCache.Runtime.Mapreduce
namespace.
Users can implement the following interfaces:
Mapper
Implement the interface IMapper
and provide implementation for Map()
method.
Member | Description |
---|---|
Map() |
This method will contain the logic to map the input from cache into more meaningful and goal specific key-value pairs which can be sent to the Reducer or optional Combiner. Referring to the workflow diagram, the string input is transformed by emitting each word with a key-value pair like <word, count> in the Mapper. |
public class WordCountMapper : IMapper
{
string[] parsedline;
string line;
public void Map(Object key, Object value, IOutputMap outputMap)
{
line = value.ToString();
parsedline = line.Split(' ');
for (int i = 0; i < parsedline.Length; i++)
{
outputMap.Emit(parsedline[i], 1);
}
}
public void Dispose()
{
//dispose resources
}
}
Combiner Factory
Implement the interface ICombinerFactory
and provide implementation for its
Create(key)
method. Providing Combiner Factory or Combiner is optional.
Member | Description |
---|---|
Create(key) |
This method will provide the incoming element with a new instance of the Combiner so that it merges the intermediate key-value pairs from the Mapper. |
public class WordCountCombinerFactory : ICombinerFactory
{
public ICombiner Create(object key)
{
WordCountCombiner wcCombiner = new WordCountCombiner();
return wcCombiner;
}
}
Combiner
Implement the interface ICombiner
and provide implementation for the methods:
Member | Description |
---|---|
BeginCombine() |
This method will provide the user with a starting point to initialize any parameters you want to be used before the actual combining of elements begins. |
Combine() |
This method reduces the task result locally in chunks so that the Reducer is not burdened with excessive processing. If the workflow diagram is referred, it means that the elements with the same keys are grouped together before being sent to the Reducer for counting the word occurrence. |
FinishChunk() |
This method marks the end of the Combiner functionality as when the number of combined results reaches the specified chunk size, it sends the tasks in the form of a chunk to the Reducer and resets its internal state for the next chunk. |
public class WordCountCombiner : ICombiner
{
int count = 0;
public void BeginCombine()
{
//any initialization
}
public void Combine(object value)
{
count += int.Parse(value.ToString());
}
public object FinishChunk()
{
return count;
}
public void Dispose()
{
//dispose resources
}
}
Reducer Factory
Implement the interface IReducerFactory
and provide implementation for its
Create(key)
method. Providing Reducer Factory or Reducer is optional.
Member | Description |
---|---|
Create(key) |
This method will provide the incoming element with a new instance of the Reducer so that it merges the intermediate key-value pairs from the Combiner. |
public class WordCountReducerFactory : IReducerFactory
{
public IReducer Create(object key)
{
WordCountReducer wcReducer = new WordCountReducer(key);
return wcReducer;
}
}
Reducer
Implement the interface IReducer
and provide implementation for the methods:
Member | Description |
---|---|
BeginReduce() |
This method will provide the user with a starting point to initialize any parameters you want to be used before the actual reducing of elements begins. |
Reduce() |
This method will reduce (process) the intermediate key-value pairs into further meaningful pairs. If the workflow diagram is referred, it means that the values of the grouped elements from Combiner are summed up to find the actual word count. |
FinishReduce() |
This method will provide the task with the final result of the map reduce operation for specific intermediate key/value pair. |
public class WordCountReducer : IReducer
{
int count = 0;
object reducerKey;
public WordCountReducer(object key)
{
reducerKey = key;
}
public void BeginReduce()
{
//perform operations
}
public void Reduce(object value)
{
count += int.Parse(value.ToString());
}
public KeyValuePair FinishReduce()
{
KeyValuePair kvp = new KeyValuePair();
kvp.Key = reducerKey;
kvp.Value = count;
return kvp;
}
public void Dispose()
{
//dispose resources
}
}
Key Filter
Implement the interface IKeyFilter
to provide implementation for the
FilterKey()
method. Providing this interface is optional.
Member | Description |
---|---|
FilterKey(key) |
This method will allow user to filter the input for Mapper by specifying the keys to be processed. If this option is not utilized, the whole cache data will be taken as input for the MapReduce tasks. |
public class WordCountKeyFilter : IKeyFilter
{
public bool FilterKey(object key)
{
try
{
if (key.ToString().Contains("hungry"))
{
return true;
}
}
catch (Exception exp)
{
//handle exception
}
return false;
}
}
Task Tracker
You do not need to implement this interface.
The ITrackableTask
interface implements the following members, which can be
used if you want to track the execution of your task:
Member | Description |
---|---|
event MapReduceCallback OnMapReduceComplete |
Users can register a callback on the MapReduce task that is called when task execution is completed, failed or cancelled with a parameter response that encapsulates status of the task and result (if completed). |
GetResult() |
This is a blocking call that waits for the callback from the server about the task’s completion, failure or cancellation in the form of an ITaskResult . The ITaskResult lets you get the following: |
GetResult(timeout) |
If no result is obtained within the specified timeout duration, OperationFailedException is thrown. |
TaskId |
This is a GUID identification ID of the task to mark it as unique. |
TaskStatus() |
Gets the task’s status and contains following values: Waiting , InProgress , Completed , Cancelled , Failed |
CancelTask() |
To cancel the already running task. |
GetEnumerator
: obtain the result in the form of a dictionaryStatus
: will return either failure, success, cancelled.TaskFailureReason
: the reason behind the failure of the task.
Note that TaskCallback()
and GetResult()
cannot be executed together because
GetResult
is a blocking call and this combination will throw an exception.
Important
Make sure to deploy the MapReduce task libraries after implementing the interfaces using NCache Manager as guided in Configuring MapReduce in Administrators' Guide.
See Also
Sample Usage of MapReduce
Aggregator
Entry Processor
Configure MapReduce