Documentation

This is machine translation

Translated by Microsoft
Mouseover text to see original. Click the button below to return to the English verison of the page.

Note: This page has been translated by MathWorks. Please click here
To view all translated materals including this page, select Japan from the country navigator on the bottom of this page.

Analyze Big Data in MATLAB Using MapReduce

This example shows how to use the datastore and mapreduce functions to process a large amount of file-based data. The MapReduce algorithm is a mainstay of many modern "big data" applications. This example operates on a single computer, but the code can scale up to use Hadoop®.

Throughout this example, the data set is a collection of records for USA domestic airline flights between 1987 and 2008. If you have experimented with "big data" before, you may already be familiar with this data set. The full data set can be downloaded from http://stat-computing.org/dataexpo/2009/the-data.html. A small subset of the data set is also included with MATLAB® to allow you to run this and other examples without downloading the entire data set.

Introduction to datastore

Creating a datastore allows you to access a collection of data in a chunk-based manner. A datastore can process arbitrarily large amounts of data, and the data can even be spread across multiple files. You can create a datastore for a collection of tabular text files (demonstrated next), a SQL database (Database Toolbox™ required) or a Hadoop® Distributed File System (HDFS™).

Create a datastore for a collection of tabular text files and preview the contents.

ds = datastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8x6 table
    FlightNum    TailNum    ActualElapsedTime    CRSElapsedTime    AirTime    ArrDelay
    _________    _______    _________________    ______________    _______    ________

    1503         'NA'        53                   57               'NA'        8      
    1550         'NA'        63                   56               'NA'        8      
    1589         'NA'        83                   82               'NA'       21      
    1655         'NA'        59                   58               'NA'       13      
    1702         'NA'        77                   72               'NA'        4      
    1729         'NA'        61                   65               'NA'       59      
    1763         'NA'        84                   79               'NA'        3      
    1800         'NA'       155                  143               'NA'       11      

The datastore automatically parses the input data and makes a best guess as to the type of data in each column. In this case, use the 'TreatAsMissing' Name-Value pair argument to have datastore replace the missing values correctly. For numeric variables (such as 'AirTime'), datastore replaces every instance of 'NA' with a NaN value, which is the IEEE arithmetic representation for Not-a-Number.

ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s';
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s';
dsPreview = preview(ds);
dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8x4 table
    AirTime    TaxiIn    TailNum    CancellationCode
    _______    ______    _______    ________________

    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            
    NaN        NaN       'NA'       'NA'            

Scan for rows of interest

datastore objects contain an internal pointer to keep track of which chunk of data the read function returns next. Use the hasdata and read functions to step through the entire data set, and filter the data set to only the rows of interest. In this case, the rows of interest are flights on United Airlines ("UA") departing from Boston ("BOS").

subset = [];

while hasdata(ds)
    t = read(ds);
    t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :);
    subset = vertcat(subset, t);
end

subset(1:10,[9,10,15:17])
ans=10x5 table
    UniqueCarrier    FlightNum    ArrDelay    DepDelay    Origin
    _____________    _________    ________    ________    ______

    'UA'              121          -9          0          'BOS' 
    'UA'             1021          -9         -1          'BOS' 
    'UA'              519          15          8          'BOS' 
    'UA'              354           9          8          'BOS' 
    'UA'              701         -17          0          'BOS' 
    'UA'              673          -9         -1          'BOS' 
    'UA'               91          -3          2          'BOS' 
    'UA'              335          18          4          'BOS' 
    'UA'             1429           1         -2          'BOS' 
    'UA'               53          52         13          'BOS' 

Introduction to mapreduce

MapReduce is an algorithmic technique to "divide and conquer" big data problems. In MATLAB, mapreduce requires three input arguments:

  1. A datastore to read data from

  2. A "mapper" function that is given a subset of the data to operate on. The output of the map function is a partial calculation. mapreduce calls the mapper function one time for each chunk in the datastore, with each call operating independently.

  3. A "reducer" function that is given the aggregate outputs from the mapper function. The reducer function finishes the computation begun by the mapper function, and outputs the final answer.

This is an over-simplification to some extent, since the output of a call to the mapper function can be shuffled and combined in interesting ways before being passed to the reducer function. This will be examined later in this example.

Use mapreduce to perform a computation

A simple use of mapreduce is to find the longest flight time in the entire airline data set. To do this:

  1. The "mapper" function computes the maximum of each chunk from the datastore.

  2. The "reducer" function then computes the maximum value among all of the maxima computed by the calls to the mapper function.

First, reset the datastore and filter the variables to the one column of interest.

reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};

Write the mapper function, maxTimeMapper.m. It takes three input arguments:

  1. The input data, which is a table obtained by applying the read function to the datastore.

  2. A collection of configuration and contextual information, info. This can be ignored in most cases, as it is here.

  3. An intermediate data storage object, which records the results of the calculations from the mapper function. Use the add function to add Key/Value pairs to this intermediate output. In this example, the name of the key ('MaxElapsedTime') is arbitrary.

Save the following mapper function (maxTimeMapper.m) in your current folder.

type maxTimeMapper
function maxTimeMapper(data, ~, intermKVStore)
% Copyright 2014 The MathWorks, Inc.

maxElaspedTime = max(data{:,:});
add(intermKVStore, 'MaxElaspedTime',maxElaspedTime);
end

Next, write the reducer function. It also takes three input arguments:

  1. A set of input "keys". Keys will be discussed further below, but they can be ignored in some simple problems, as they are here.

  2. An intermediate data input object that mapreduce passes to the reducer function. This data is in the form of Key/Value pairs, and you use the hasnext and getnext functions to iterate through the values for each key.

  3. A final output data storage object. Use the add and addmulti functions to directly add Key/Value pairs to the output.

Save the following reducer function (maxTimeReducer.m) in your current folder.

type maxTimeReducer
function maxTimeReducer(~, intermValsIter, outKVStore)
% Copyright 2014 The MathWorks, Inc.

maxElaspedTime = -inf;
while hasnext(intermValsIter)
    maxElaspedTime = max(maxElaspedTime, getnext(intermValsIter));
end
add(outKVStore, 'MaxElaspedTime', maxElaspedTime);
end

Once the mapper and reducer functions are written and saved in your current folder, you can call mapreduce using the datastore, mapper function, and reducer function. If you have Parallel Computing Toolbox (PCT), MATLAB will automatically start a pool and parallelize execution. Use the readall function to display the results of the MapReduce algorithm.

result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%
readall(result)
ans=1x2 table
          Key           Value 
    ________________    ______

    'MaxElaspedTime'    [1650]

Use of keys in mapreduce

The use of keys is an important and powerful feature of mapreduce. Each call to the mapper function adds intermediate results to one or more named "buckets", called keys. The number of calls to the mapper function by mapreduce corresponds to the number of chunks in the datastore.

If the mapper function adds values to multiple keys, this leads to multiple calls to the reducer function, with each call working on only one key's intermediate values. The mapreduce function automatically manages this data movement between the map and reduce phases of the algorithm.

This flexibility is useful in many contexts. The example below uses keys in a relatively obvious way for illustrative purposes.

Calculating group-wise metrics with mapreduce

The behavior of the mapper function in this application is more complex. For every flight carrier found in the input data, use the add function to add a vector of values. This vector is a count of the number of flights for that carrier on each day in the 21+ years of data. The carrier code is the key for this vector of values. This ensures that all of the data for each carrier will be grouped together when mapreduce passes it to the reducer function.

Save the following mapper function (countFlightsMapper.m) in your current folder.

type countFlightsMapper
function countFlightsMapper(data, ~, intermKVStore)

% Copyright 2014 The MathWorks, Inc.

dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;

[airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
end
end

The reducer function is less complex. It simply iterates over the intermediate values and adds the vectors together. At completion, it outputs the values in this aggregate vector. Note that the reduce function does not need to sort or examine the intermediateKeysIn values; each call to the reduce function by mapreduce only passes the values for one airline carrier.

Save the following reducer function (countFlightsReducer.m) in your current folder.

type countFlightsReducer
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
%countFlightsReducer Reducer function for mapreduce to count flights

% Copyright 2014 The MathWorks, Inc.

daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
dayArray = zeros(daysSinceEpoch, 1);

while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
end
add(outKVStore, intermKeysIn, dayArray);
end

Reset the datastore and select the variables of interest. Once the mapper and reducer functions are written and saved in your current folder, you can call mapreduce using the datastore, mapper function, and reducer function.

reset(ds);
ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};

result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  10%
Map 100% Reduce  21%
Map 100% Reduce  31%
Map 100% Reduce  41%
Map 100% Reduce  52%
Map 100% Reduce  62%
Map 100% Reduce  72%
Map 100% Reduce  83%
Map 100% Reduce  93%
Map 100% Reduce 100%
result = readall(result);

In case this example was run with only the sample data set, load the results of the mapreduce algorithm run on the entire data set.

load airlineResults

Visualizing the results

Using only the top 7 carriers, apply a filter to the data to smooth out the effects of weekend travel. This would otherwise clutter the visualization.

lines = result.Value;
lines = horzcat(lines{:});
[~,sortOrder] = sort(sum(lines), 'descend');
lines = lines(:,sortOrder(1:7));
result = result(sortOrder(1:7),:);

lines(lines==0) = nan;
for carrier=1:size(lines,2)
    lines(:,carrier) = filter(repmat(1/7, [7 1]), 1, lines(:,carrier));
end

Plot the data.

figure('Position',[1 1 800 600]);
plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines)
title ('Domestic airline flights per day per carrier')
xlabel('Date')
ylabel('Flights per day (7-day moving average)')
legend(result.Key, 'Location', 'South')

The plot shows the emergence of Southwest Airlines (WN) during this time period.

Learning more

This example only scratches the surface of what is possible with mapreduce. See the documentation for mapreduce for more information, including information on using it with Hadoop and MATLAB Distributed Computing Server.

Was this topic helpful?