from Use handle classes to model dependencies between computation steps by Volkmar Glauche
Trigger computations in one object once computation results from other objects become available.

cfg_item

Contents

classdef cfg_item < hgsetget

Schedule dependent computations

This example demonstrates how one can schedule computations that depend on each other. It is based on the following MATLAB constructs:

  • Handle objects. The class DEP_PARALLEL.CFG_ITEM is a HGSETGET class.
  • Events and listeners. If an object o1 of class DEP_PARALLEL.CFG_ITEM is assigned to the input property of another object o2 of the same class, object o2 will listen to the PostSet event for object o1's output property. The associated callback resolve will invoke o2's run method on the output data returned by o1.

This mechanism can be used to run independent computations in parallel if QSUB_SUBMIT_CM (see http://www.mathworks.com/matlabcentral/fileexchange/24512) or a MATLAB DCS jobmanager scheduler is available. DCS Schedulers of other type than 'jobmanager' are not supported, because they only provide simplejob objects without callbacks.

To switch between serial and parallel computation the code to set up the object hierarchy in an application does not need to be changed. It is sufficient to change the scheduler property of the DEP_PARALLEL.CFG_ITEM objects.

To understand how dependency resolution works, the following debug break points may be helpful:

  • at the beginning of the run method
  • at the beginning of the getresult_qsub/dcs method, if a parallel scheduler is used
  • at the beginning of the resolve method

This code has been developed as a demonstration for a batch job configuration system for MATLAB. See http://sourceforge.net/projects/matlabbatch for details about the original project.

    %_______________________________________________________________________
    % Copyright (C) 2007 Freiburg Brain Imaging

Adaptation to local environment

The example code needs to be adapted to the local environment in the following places: To use MATLAB DCS, the set.scheduler method must be modified to find the jobmanager object. To use QSUB_SUBMIT_CM, the jobdir variable in the run method must be set to a shared network drive which is accessible from all nodes in the cluster.

Module Input - set by end user

Meaning of input values

  • empty cell - input not set
  • double (matrix) - input set, do computation
  • cfg_item handle - input set, but depends on the output of another object
    properties
        input = {};
    end

Runtime Callback - set by application programmer

invoked automatically, once input is set and all dependencies are resolved.

    properties
        prog = function_handle.empty;
    end

Scheduler

The value of this property determines how the computation done in this object is run. The following string values are allowed:

  • 'jobmanager' - on a MATLAB DCS scheduler. The scheduler will be looked up by calling findResource(obj.scheduler{:}), i.e. the elements of the cell string must be a valid argument list for findResource(...).
  • 'qsub' - using QSUB_SUBMIT_CM (available from MATLAB Central, file id 24512, http://www.mathworks.com/matlabcentral/fileexchange/24512)
  • 'serial' - in the current MATLAB session

The actual scheduler is stored in the hidden property real_scheduler.

    properties (Dependent = true)
        scheduler;
    end
    properties (Access = private, Hidden = true)
        real_scheduler  = 'serial';
    end

Job Output - set by getresult callback/submit method

    properties (SetAccess = private, SetObservable = true)
        output = {};
    end

Status - set by run method, reset by getresult callback

    properties (SetAccess = private, SetObservable = true)
        running = false;
    end
    methods
        function obj = cfg_item(varargin)
            for k = 1:2:nargin
                obj.(varargin{k}) = varargin{k+1};
            end
        end

Input Assignment

If input is a cfg_item, add this object as listener to the PostSet event of the output property of the source object. This will trigger the resolve method, which will read the output and start the execution of this object's run method.

        function set.input(obj, val)
            if isa(val, 'dep_parallel.cfg_item')
                % Dependency - add listener to source module
                addlistener(val, 'output', 'PostSet', @(src,evt)resolve(obj, src, evt));
                runinput = val.output;
            else
                runinput = val;
            end
            obj.input = val;
            % run, if input is ready
            if isnumeric(runinput)
                run(obj, runinput);
            end
        end

Scheduler Assignment

The set function tests for validity of the requested scheduler. If a 'jobmanager' is requested, the internal property real_scheduler is set to the resolved MATLAB DCS scheduler. The findResource() call may need to be adapted to the actual configuration.

        function set.scheduler(obj, val)
            switch lower(val)
                case 'jobmanager'
                    try
                        obj.real_scheduler = findResource('scheduler','type','jobmanager');
                        if ~isa(obj.real_scheduler,'jobmanager')
                            obj.real_scheduler = 'serial';
                        end
                    catch
                        obj.real_scheduler = 'serial';
                    end
                case 'qsub'
                    if any(exist('qsub_submit_cm') == 2:6)
                        obj.real_scheduler = 'qsub';
                    else
                        obj.real_scheduler = 'serial';
                    end
                otherwise
                    obj.real_scheduler = 'serial';
            end
        end
        function val = get.scheduler(obj)
            if isa(obj.real_scheduler, 'jobmanager')
                val = 'jobmanager';
            else
                val = obj.real_scheduler;
            end
        end

Run Method

Branch into job submission according to selected scheduler. For 'jobmanager' and 'qsub' schedulers, the run function will return immediately. Callback functions will collect the job outputs. If the scheduler is 'serial', this method will evaluate the .prog function directly. The assignment of a value to obj.output will trigger PostSet events for all objects that wait for this object to finish computation.

        function run(obj, data)
            switch obj.scheduler
                case 'jobmanager'
                    s = obj.real_scheduler;
                    j = s.createJob();
                    j.createTask(obj.prog, 1, {data}, 'FinishedFcn', @(task, evt)getresult_dcs(obj, task, evt));
                    j.submit;
                    obj.running = true;
                    fprintf('Submitted function %s.\n', func2str(obj.prog));
                case 'qsub'
                    j.fun      = obj.prog;
                    j.job      = {data};
                    j.noutputs = 1;
                    j.ctx.path = matlabpath;
                    jobdir     = tempdir;
                    finishcb   = @(out)getresult_qsub(obj, out);
                    qsub_submit_cm(j, jobdir, 'qsub_job', finishcb);
                    obj.running = true;
                    fprintf('Submitted function %s.\n', func2str(obj.prog));
                case 'serial'
                    fprintf('Running function %s.\n', func2str(obj.prog));
                    obj.output = feval(obj.prog, data);
            end
        end

DCS Callback

The assignment of the computation result to obj.output will trigger the listeners of other modules waiting for this result.

        function getresult_dcs(obj, task, evt) %#ok<INUSD>
            fprintf('Finished function %s.\n', func2str(obj.prog));
            if strcmpi(task.State, 'finished')
                obj.output = task.OutputArguments{1};
            end
            obj.running = false;
        end

Qsub Callback

The assignment of the computation result to obj.output will trigger the listeners of other modules waiting for this result.

        function getresult_qsub(obj, out)
            fprintf('Finished function %s.\n', func2str(obj.prog));
            if ~isa(out, 'MException')
                obj.output = out{1};
            end
            obj.running = false;
        end

Output Listener Callback

Check whether data source has become valid (PostSet events occur after any set operation)

        function resolve(obj, src, evt) %#ok<INUSL>
            if isnumeric(evt.AffectedObject.output)
                run(obj, evt.AffectedObject.output);
            end
        end
    end
end

Contact us at files@mathworks.com