Run mapreduce on a Hadoop Cluster
Cluster Preparation
Before you can run mapreduce on a Hadoop® cluster, make sure that the cluster and client machine are properly
configured. Consult your system administrator, or see Configure for Hadoop Clusters (MATLAB Parallel Server).
Output Format and Order
When running mapreduce on a Hadoop cluster with binary output (the default), the resulting
KeyValueDatastore points to Hadoop Sequence files, instead of binary MAT files as generated by
mapreduce in other environments. For more information, see
the 'OutputType' argument description on the
mapreduce reference page.
When running mapreduce on a Hadoop cluster, the order of the key-value pairs in the output is different
compared to running mapreduce in other environments. If your
application depends on the arrangement of data in the output, you must sort the data
according to your own requirements.
Calculate Mean Delay
This example shows how to modify the MATLAB® example for calculating mean airline delays to run on a Hadoop cluster.
First, you must set environment variables and cluster properties as appropriate for your specific Hadoop configuration. Contact your system administrator for the values for these and other properties necessary for submitting jobs to your cluster or see Configure for Hadoop Clusters (MATLAB Parallel Server).
setenv('HADOOP_HOME','/path/to/hadoop/install') cluster = parallel.cluster.Hadoop;
Note
The specified outputFolder must not already exist. The
mapreduce output from a Hadoop cluster cannot overwrite an existing folder.
You will lose your data, if mapreducer is changed or
deleted.
Create a MapReducer object to specify that
mapreduce must use your Hadoop cluster.
mr = mapreducer(cluster);
Create and preview the datastore.
ds = datastore('airlinesmall.csv','TreatAsMissing','NA',... 'SelectedVariableNames','ArrDelay','ReadSize',1000); preview(ds)
ArrDelay
________
8
8
21
13
4
59
3
11Next, specify your output folder, output outds and call
mapreduce to execute on the Hadoop cluster specified by mr.
outputFolder = 'hdfs:///home/myuser/out1'; outds = mapreduce(ds,@myMapperFcn,@myReducerFcn,... 'OutputFolder',outputFolder); meanDelay = mapreduce(ds,@meanArrivalDelayMapper,... @meanArrivalDelayReducer,mr,... 'OutputFolder',outputFolder)
Parallel mapreduce execution on the Hadoop cluster:
********************************
* MAPREDUCE PROGRESS *
********************************
Map 0% Reduce 0%
Map 66% Reduce 0%
Map 100% Reduce 66%
Map 100% Reduce 100%
meanDelay =
KeyValueDatastore with properties:
Files: {
' .../tmp/myuser/tpc00621b1_4eef_4abc_8078_646aa916e7d9/part0.seq'
}
ReadSize: 1 key-value pairs
FileType: 'seq'
Read the result.
readall(meanDelay)
Key Value
__________________ ________
'MeanArrivalDelay' [7.1201]
Although for demonstration purposes this example uses a local data set, it is
likely when using Hadoop that your data set is stored in an HDFS™ file system. Likewise, you might be required to store the
mapreduce output in HDFS. For details about accessing HDFS in MATLAB, see Work with Remote Data.
Supporting Functions
The meanArrivalDelayMapper mapper function finds the count
and sum of the arrival delays in each block of data. The mapper then stores
these values as the intermediate values associated with the key
"PartialCountSumDelay".
function meanArrivalDelayMapper(data,info,intermKVStore) % Data is an n-by-1 table of the ArrDelay. Remove missing values first: data(isnan(data.ArrDelay),:) = []; % Record the partial counts and sums and the reducer will accumulate them. partCountSum = [length(data.ArrDelay),sum(data.ArrDelay)]; add(intermKVStore,"PartialCountSumDelay",partCountSum); end
meanArrivalDelayReducer reducer
function accepts the count and sum for each block stored by the mapper. It sums
up the values to obtain the total count and total sum. The overall mean arrival
delay is a simple division of the values. mapreduce only
calls this reducer once, since the mapper only adds a single unique key. The
reducer uses add to add a single key-value pair to the
output.function meanArrivalDelayReducer(intermKey,intermValIter,outKVStore) count = 0; sum = 0; while hasnext(intermValIter) countSum = getnext(intermValIter); count = count + countSum(1); sum = sum + countSum(2); end meanDelay = sum/count; % The key-value pair added to outKVStore will become the output of mapreduce add(outKVStore,"MeanArrivalDelay",meanDelay); end