Code covered by the BSD License  

Highlights from
Parallel Producer-Consumer for MATLABPOOL

Parallel Producer-Consumer for MATLABPOOL

by

 

Provides a simple interface to the MATLABPOOL with functionality similar to PCT PARALLEL_FUNCTION.

...
% [outputData, intervals] = pforfun(FUN, commonData, inputData, configInfo)
%   parallel for-loop over a function call
%
%   FUN = function handle
%   commonData = data used by FUN for all loop iterations
%   inputData = array sliced for each call to FUN,
%               or number of times to call FUN
%   configInfo = configuration struct (optional)
%     .timePerExc = estimate of time required for each loop cycle (execution)
%     .batchTime = target batch execution time (default = 1 second)
%     .batchSize = number of executions per batch ([] for auto)
%     .minBatchSize = minimum number of executions per batch ([] for none)
%     .maxBatchSize = maximum number of executions per batch ([] for none)
%     .Nthread = maxiumum number of threads (workers) to use
%     .enableWaitBar = display a wait bar? {true, false}
%     .nameWaitBar = wait bar name
%
%   outputData = accumulated results of fun
%   intervals = Nx3 array of execution intervals [start, stop, time]
%
% pforfun attempts to use multiple threads to evaluate the argument function
% FUN on all of the specified input data.  This is roughly equivalent to
%
%   for n = 1:numel(inputData)
%     outputData(n) = FUN(commonData, inputData(n));
%   end
%
% Furthermore, FUN itself must be able to loop over an inputData array,
% such that the same result is obtained by the above code as by
%
%   outputData = FUN(commonData, inputData);
%
% This is useful to avoid the overhead associated with function calls.
% That is, if each call to FUN on a single element of inputData is takes
% very little time, it is better to have FUN evaluate an array of inputData
% with each call.  The size of the inputData array passed to FUN is called
% the "batch size", and can be configured with the configInfo struct.
%
% If the batch size is not specified explicitly, it will be determined
% from the "time per execution" estimate and target "batch time".  If
% no time per execution estimate is given (or timePerExc = []), it is
% measured during pforfun execution and the batch size is adjusted dynamically.
% The range of automatically generated batch sizes can be limited by the
% minBatchSize and maxBatchSize settings.
%
% Of course, during parallel execution the requirements are somewhat more
% stringent: calls to FUN over any range of inputData must result in the
% outputData for that range.  That is outputData(n:m) in
%
%   outputData(n:m) = FUN(commonData, inputData(n:m));
%
% must be the same as outputData(n:m) in the previous 2 examples for any
% valid n, m pair.
%
% FUN must also satisfy some "workspace transparency" requirements: no
% globals, or references to other workspaces (e.g., with evalin or assignin).
% Furthermore, FUN has no access to graphical display, though printing to
% the console is supported. (see parfor or spmd for more info)
%
% Example: see example_pforfun

% by Matthew Evans, Feb 2010

function [outputData, intervals] = ...
    pforfun(fun, commonData, inputData, configInfo)
  
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % Validate Arguments
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

  % input data
  if isvector(inputData)
    Nexc = numel(inputData);
  elseif isnumeric(inputData) && isscalar(inputData)
    Nexc = inputData;
    inputData(1:Nexc, 1) = struct('dummy', []);
  else
    error('inputData must be a vector, or a number')
  end
  
  % config info
  if nargin < 4
    configInfo = struct;
  end
  
  if ~isfield(configInfo, 'timePerExc')
    configInfo.timePerExc = [];   % use time trials for execution time
  elseif configInfo.timePerExc <= 0
      error('timePerExc must be greater than zero.')
  end
  
  if ~isfield(configInfo, 'batchTime')
    configInfo.batchTime = 1;   % 1 second nominal batch execution time
  elseif configInfo.batchTime <= 0
      error('batchTime must be greater than zero.')
  end
  
  if ~isfield(configInfo, 'batchSize')
    configInfo.batchSize = [];    % decide dynamically
  elseif ~isempty(configInfo.batchSize) && ...
      ~iIsIntegerScalar(configInfo.batchSize, 1, Inf)
      error('batchSize must be an integer scalar > 0.')
  end
  
  if ~isfield(configInfo, 'minBatchSize') || isempty(configInfo.minBatchSize)
    configInfo.minBatchSize = 1;    % set minimum to 1
  elseif ~iIsIntegerScalar(configInfo.minBatchSize, 1, Inf)
      error('minBatchSize must be an integer scalar > 0.')
  end
  
  if ~isfield(configInfo, 'maxBatchSize')
    configInfo.maxBatchSize = [];    % decide later
  elseif ~isempty(configInfo.maxBatchSize) && ...
      ~iIsIntegerScalar(configInfo.maxBatchSize, 1, Inf)
      error('maxBatchSize must be an integer scalar > 0.')
  end
  
  if ~isfield(configInfo, 'Nthread') || isempty(configInfo.Nthread)
    configInfo.Nthread = matlabpool('size');    % use pool size
  elseif ~iIsIntegerScalar(configInfo.Nthread, 0, Inf)
      error('Nthread must be an integer scalar >= 0.')
  end

  if ~isfield(configInfo, 'enableWaitBar')
    configInfo.enableWaitBar = false;
  end
  
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % Setup Waitbar
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

  if ~configInfo.enableWaitBar
    waitInfo = [];
  else
    if ~isfield(configInfo, 'nameWaitBar')
      configInfo.nameWaitBar = 'Running Parallel Loop';
    elseif ~ischar(configInfo.nameWaitBar)
      error('nameWaitBar must be a string.')
    end
    waitInfo.name = configInfo.nameWaitBar;
    waitInfo.hWaitBar = [];
    waitInfo.tStart = tic;
    waitInfo.tLast = 0;
    waitInfo.Ndone = 0;
    waitInfo.Nexc = Nexc;
  end
  
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % Setup parProcess State Struct
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

  % fill in configInfo.maxBatchSize
  if isempty(configInfo.maxBatchSize)
    Nworkers = matlabpool('size');
    if Nworkers < 2
      configInfo.maxBatchSize = Nexc;
    else
      configInfo.maxBatchSize = max(configInfo.minBatchSize, ...
        ceil(Nexc / Nworkers));
    end
  end
  
  % fill in batchSize, if timePerExc is given
  if isempty(configInfo.batchSize) && ~isempty(configInfo.timePerExc)
    % make batch size to match nominal batch time
    timePerExc = configInfo.timePerExc;
    batchTime = configInfo.batchTime;
    configInfo.batchSize = ceil(batchTime / timePerExc);
    
    % enforce minium and maximum
    configInfo.batchSize = max(configInfo.minBatchSize, configInfo.batchSize);
    configInfo.batchSize = min(configInfo.maxBatchSize, configInfo.batchSize);
  end
  
  stateInfo.config = configInfo;
  stateInfo.wait = waitInfo;
  stateInfo.input = inputData;
  stateInfo.output = [];
  
  stateInfo.isReady = Nexc > 0;
  stateInfo.base = 0;
  stateInfo.limit = Nexc;
  stateInfo.interval = [];

  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % Call parProcess
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

  processInfo.common = commonData;
  processInfo.fun = fun;
  
  % call parallel_function
  stateInfo = parProcess(@iParFun, @iConsume, @iSupply, ...
    stateInfo, processInfo, configInfo.Nthread);
  
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % Clean up
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  outputData = stateInfo.output;
  intervals = stateInfo.interval;
  intervals(:, 1) = intervals(:, 1) + 1; % convert base to start
  
  % close the waitbar
  if configInfo.enableWaitBar
    if ~isempty(stateInfo.wait.hWaitBar)
      pause(0.2)  % pause for effect
      close(stateInfo.wait.hWaitBar);
      drawnow     % make sure waitbar is closed
    end
  end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% iParFun
%   the data processing function for parProcess
function output = iParFun(processInfo, inputData)
  % measure the cpu time required
  tStart = cputime;
  
  % this is run worker-side... just pass on through
  output.data = processInfo.fun(processInfo.common, inputData);
  
  % include execution time in output
  output.tExc = cputime - tStart;
  
  %fprintf('iParFun(%d data) took %.2fs\n', numel(output.data), output.tExc)  
end


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% iConsume
%   the data consumption function for parProcess
function stateInfo = iConsume(stateInfo, output, n)
  
  % extract base and limit for this interval
  base = stateInfo.interval(n, 1);
  limit = stateInfo.interval(n, 2);
  
  % if necessary, initilize stateInfo.output with the current data
  if isempty(stateInfo.output)
    stateInfo.output = output.data;
  end
  
  % add the current data to stateInfo.output
  stateInfo.output((base + 1):limit) = output.data;

  % and save the interval execution time
  stateInfo.interval(n, 3) = output.tExc;
  
  % update waitbar
  if stateInfo.config.enableWaitBar
    stateInfo.wait = iUpdateWait(stateInfo.wait, limit - base);
  end
  
  %fprintf('iConsume((%d, %d), (%d %s), %d)\n', base, limit, ...
  %  numel(outputData), class(outputData), n)
end


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% iSupply
%   the data supply function for parProcess
function [stateInfo, inputData] = iSupply(stateInfo)

  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % decide on next inerval size
  if ~isempty(stateInfo.config.batchSize)
    batchSize = stateInfo.config.batchSize;
  elseif isempty(stateInfo.interval)
    % start with minimum
    batchSize = stateInfo.config.minBatchSize;
  else
    % determine batch size dynamically, based on the last 100 batches
    nn = find(stateInfo.interval(:, 3) > 0, 100, 'last');
    
    if isempty(nn)
      % keep using minimum
      batchSize = stateInfo.config.minBatchSize;
    else
      % try to match the target batch time
      nEach = stateInfo.interval(nn, 2) - stateInfo.interval(nn, 1);
      tEach = stateInfo.interval(nn, 3) ./ nEach;
      
      % use the mean if there are only a few, otherwise use the median
      if numel(nn) < 4
        timePerExc = mean(tEach);
      else
        timePerExc = median(tEach);
      end
      
      % set the batch size
      batchSize = ceil(stateInfo.config.batchTime / timePerExc);
      
      % enforce minium and maximum
      batchSize = max(batchSize, stateInfo.config.minBatchSize);
      batchSize = min(batchSize, stateInfo.config.maxBatchSize);
    end
  end
    
  %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  % set the next interval
  
  % get current base (number of slices already done)
  if isempty(stateInfo.interval)
    base = stateInfo.base;
  else
    base = stateInfo.interval(end, 2);
  end
  
  % set the limit (last slice to process in this interval)
  limit = base + batchSize;
  if limit > stateInfo.limit
    limit = stateInfo.limit;
    stateInfo.isReady = false;
  end
  
  % record this interval
  % if necessary, initialize the interval array
  %   interval is [base, limit, tExc]
  %   where the execution time is filled in by iConsume
  if isempty(stateInfo.interval)
    stateInfo.interval = [base, limit, 0];
  else
    stateInfo.interval(end + 1, :) = [base, limit, 0];
  end
  
  % make data struct
  inputData = stateInfo.input((base + 1):limit);
  
  %fprintf('iSupply(%d, %d)\n', base, limit)
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% iUpdateWait
%   update the wait bar
function waitInfo = iUpdateWait(waitInfo, NdoneNow)
  
  % extract from state struct
  name = waitInfo.name;
  hWaitBar = waitInfo.hWaitBar;
  tStart = waitInfo.tStart;
  tLast = waitInfo.tLast;
  Nexc = waitInfo.Nexc;
  Ndone = waitInfo.Ndone + NdoneNow;
  
  % make time estimate
  tNow = toc(tStart);
  frac = Ndone / Nexc;
  tRem = tNow * (1 / frac - 1);
  
  if tNow > 5 && tNow - tLast > 0.5
    % wait bar string
    str = sprintf('%.0f s used, %.0f s left', tNow, tRem);
    
    % check and update waitbar
    if isempty(hWaitBar)
      % create wait bar if there are at least 10s remaining
      if tRem > 5
        try
          strWB = [str ' (close this window to stop)'];
          hWaitBar = waitbar(frac, strWB, 'Name', name);
          tLast = tNow;
        catch
          % can't make wait bar... use text
          if tNow - tLast > 5
            disp(str)
            tLast = tNow;
          end
        end
      end
    else
      try
        strWB = [str ' (close waitbar to stop)'];
        findobj(hWaitBar);			% error if wait bar closed
        waitbar(frac, hWaitBar, strWB);	% update wait string
        tLast = tNow;
      catch
        error('Wait bar closed by user.  Exiting.')
      end
    end
  end

  % update global
  waitInfo.hWaitBar = hWaitBar;
  waitInfo.tLast = tLast;
  waitInfo.Ndone = Ndone;
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% check if input is a vector of scalar within the specified bounds.
function valid = iIsIntegerScalar(value, lowerBound, upperBound)
  valid = isnumeric(value) && isreal(value) && isscalar(value) ...
    && (value >= lowerBound) && (value <= upperBound) ...
    && isfinite(value) && (fix(value) == value);
end

Contact us