Main Content

Run mapreduce on a Hadoop Cluster

Cluster Preparation

Before you can run mapreduce on a Hadoop® cluster, make sure that the cluster and client machine are properly configured. Consult your system administrator, or see Configure for Hadoop Clusters (MATLAB Parallel Server).

Output Format and Order

When running mapreduce on a Hadoop cluster with binary output (the default), the resulting KeyValueDatastore points to Hadoop Sequence files, instead of binary MAT files as generated by mapreduce in other environments. For more information, see the 'OutputType' argument description on the mapreduce reference page.

When running mapreduce on a Hadoop cluster, the order of the key-value pairs in the output is different compared to running mapreduce in other environments. If your application depends on the arrangement of data in the output, you must sort the data according to your own requirements.

Calculate Mean Delay

This example shows how to modify the MATLAB® example for calculating mean airline delays to run on a Hadoop cluster.

First, you must set environment variables and cluster properties as appropriate for your specific Hadoop configuration. Contact your system administrator for the values for these and other properties necessary for submitting jobs to your cluster or see Configure for Hadoop Clusters (MATLAB Parallel Server).

setenv('HADOOP_HOME','/path/to/hadoop/install')
cluster = parallel.cluster.Hadoop;

Note

The specified outputFolder must not already exist. The mapreduce output from a Hadoop cluster cannot overwrite an existing folder.

You will lose your data, if mapreducer is changed or deleted.

Create a MapReducer object to specify that mapreduce must use your Hadoop cluster.

mr = mapreducer(cluster);

Create and preview the datastore.

ds = datastore('airlinesmall.csv','TreatAsMissing','NA',...
     'SelectedVariableNames','ArrDelay','ReadSize',1000);
preview(ds)
    ArrDelay
    ________

     8
     8
    21
    13
     4
    59
     3
    11

Next, specify your output folder, output outds and call mapreduce to execute on the Hadoop cluster specified by mr.

outputFolder = 'hdfs:///home/myuser/out1';
outds = mapreduce(ds,@myMapperFcn,@myReducerFcn,...
'OutputFolder',outputFolder);
meanDelay = mapreduce(ds,@meanArrivalDelayMapper,...
@meanArrivalDelayReducer,mr,...
            'OutputFolder',outputFolder)
Parallel mapreduce execution on the Hadoop cluster:
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  66% Reduce   0%
Map 100% Reduce  66%
Map 100% Reduce 100%

meanDelay =

  KeyValueDatastore with properties:

       Files: {
              ' .../tmp/myuser/tpc00621b1_4eef_4abc_8078_646aa916e7d9/part0.seq'
              }
    ReadSize: 1 key-value pairs
    FileType: 'seq'

Read the result.

readall(meanDelay)
           Key             Value
    __________________    ________

    'MeanArrivalDelay'    [7.1201]

Although for demonstration purposes this example uses a local data set, it is likely when using Hadoop that your data set is stored in an HDFS™ file system. Likewise, you might be required to store the mapreduce output in HDFS. For details about accessing HDFS in MATLAB, see Work with Remote Data.

Supporting Functions

The meanArrivalDelayMapper mapper function finds the count and sum of the arrival delays in each block of data. The mapper then stores these values as the intermediate values associated with the key "PartialCountSumDelay".

function meanArrivalDelayMapper(data,info,intermKVStore)
  % Data is an n-by-1 table of the ArrDelay. Remove missing values first:
  data(isnan(data.ArrDelay),:) = [];

  % Record the partial counts and sums and the reducer will accumulate them.
  partCountSum = [length(data.ArrDelay),sum(data.ArrDelay)];
  add(intermKVStore,"PartialCountSumDelay",partCountSum);
end
The meanArrivalDelayReducer reducer function accepts the count and sum for each block stored by the mapper. It sums up the values to obtain the total count and total sum. The overall mean arrival delay is a simple division of the values. mapreduce only calls this reducer once, since the mapper only adds a single unique key. The reducer uses add to add a single key-value pair to the output.
function meanArrivalDelayReducer(intermKey,intermValIter,outKVStore)
  count = 0;
  sum = 0;
  while hasnext(intermValIter)
    countSum = getnext(intermValIter);
    count = count + countSum(1);
    sum = sum + countSum(2);
  end
  meanDelay = sum/count;

  % The key-value pair added to outKVStore will become the output of mapreduce 
  add(outKVStore,"MeanArrivalDelay",meanDelay);
end

See Also

Functions

Topics