Contents
- Schedule dependent computations
- Adaptation to local environment
- Module Input - set by end user
- Runtime Callback - set by application programmer
- Scheduler
- Job Output - set by getresult callback/submit method
- Status - set by run method, reset by getresult callback
- Input Assignment
- Scheduler Assignment
- Run Method
- DCS Callback
- Qsub Callback
- Output Listener Callback
classdef cfg_item < hgsetget
Schedule dependent computations
This example demonstrates how one can schedule computations that depend on each other. It is based on the following MATLAB constructs:
- Handle objects. The class DEP_PARALLEL.CFG_ITEM is a HGSETGET class.
- Events and listeners. If an object o1 of class DEP_PARALLEL.CFG_ITEM is assigned to the input property of another object o2 of the same class, object o2 will listen to the PostSet event for object o1's output property. The associated callback resolve will invoke o2's run method on the output data returned by o1.
This mechanism can be used to run independent computations in parallel if QSUB_SUBMIT_CM (see http://www.mathworks.com/matlabcentral/fileexchange/24512) or a MATLAB DCS jobmanager scheduler is available. DCS Schedulers of other type than 'jobmanager' are not supported, because they only provide simplejob objects without callbacks.
To switch between serial and parallel computation the code to set up the object hierarchy in an application does not need to be changed. It is sufficient to change the scheduler property of the DEP_PARALLEL.CFG_ITEM objects.
To understand how dependency resolution works, the following debug break points may be helpful:
- at the beginning of the run method
- at the beginning of the getresult_qsub/dcs method, if a parallel scheduler is used
- at the beginning of the resolve method
This code has been developed as a demonstration for a batch job configuration system for MATLAB. See http://sourceforge.net/projects/matlabbatch for details about the original project.
%_______________________________________________________________________ % Copyright (C) 2007 Freiburg Brain Imaging
Adaptation to local environment
The example code needs to be adapted to the local environment in the following places: To use MATLAB DCS, the set.scheduler method must be modified to find the jobmanager object. To use QSUB_SUBMIT_CM, the jobdir variable in the run method must be set to a shared network drive which is accessible from all nodes in the cluster.
Module Input - set by end user
Meaning of input values
- empty cell - input not set
- double (matrix) - input set, do computation
- cfg_item handle - input set, but depends on the output of another object
properties
input = {};
end
Runtime Callback - set by application programmer
invoked automatically, once input is set and all dependencies are resolved.
properties
prog = function_handle.empty;
end
Scheduler
The value of this property determines how the computation done in this object is run. The following string values are allowed:
- 'jobmanager' - on a MATLAB DCS scheduler. The scheduler will be looked up by calling findResource(obj.scheduler{:}), i.e. the elements of the cell string must be a valid argument list for findResource(...).
- 'qsub' - using QSUB_SUBMIT_CM (available from MATLAB Central, file id 24512, http://www.mathworks.com/matlabcentral/fileexchange/24512)
- 'serial' - in the current MATLAB session
The actual scheduler is stored in the hidden property real_scheduler.
properties (Dependent = true)
scheduler;
end
properties (Access = private, Hidden = true)
real_scheduler = 'serial';
end
Job Output - set by getresult callback/submit method
properties (SetAccess = private, SetObservable = true)
output = {};
end
Status - set by run method, reset by getresult callback
properties (SetAccess = private, SetObservable = true)
running = false;
end
methods
function obj = cfg_item(varargin) for k = 1:2:nargin obj.(varargin{k}) = varargin{k+1}; end end
Input Assignment
If input is a cfg_item, add this object as listener to the PostSet event of the output property of the source object. This will trigger the resolve method, which will read the output and start the execution of this object's run method.
function set.input(obj, val) if isa(val, 'dep_parallel.cfg_item') % Dependency - add listener to source module addlistener(val, 'output', 'PostSet', @(src,evt)resolve(obj, src, evt)); runinput = val.output; else runinput = val; end obj.input = val; % run, if input is ready if isnumeric(runinput) run(obj, runinput); end end
Scheduler Assignment
The set function tests for validity of the requested scheduler. If a 'jobmanager' is requested, the internal property real_scheduler is set to the resolved MATLAB DCS scheduler. The findResource() call may need to be adapted to the actual configuration.
function set.scheduler(obj, val) switch lower(val) case 'jobmanager' try obj.real_scheduler = findResource('scheduler','type','jobmanager'); if ~isa(obj.real_scheduler,'jobmanager') obj.real_scheduler = 'serial'; end catch obj.real_scheduler = 'serial'; end case 'qsub' if any(exist('qsub_submit_cm') == 2:6) obj.real_scheduler = 'qsub'; else obj.real_scheduler = 'serial'; end otherwise obj.real_scheduler = 'serial'; end end function val = get.scheduler(obj) if isa(obj.real_scheduler, 'jobmanager') val = 'jobmanager'; else val = obj.real_scheduler; end end
Run Method
Branch into job submission according to selected scheduler. For 'jobmanager' and 'qsub' schedulers, the run function will return immediately. Callback functions will collect the job outputs. If the scheduler is 'serial', this method will evaluate the .prog function directly. The assignment of a value to obj.output will trigger PostSet events for all objects that wait for this object to finish computation.
function run(obj, data) switch obj.scheduler case 'jobmanager' s = obj.real_scheduler; j = s.createJob(); j.createTask(obj.prog, 1, {data}, 'FinishedFcn', @(task, evt)getresult_dcs(obj, task, evt)); j.submit; obj.running = true; fprintf('Submitted function %s.\n', func2str(obj.prog)); case 'qsub' j.fun = obj.prog; j.job = {data}; j.noutputs = 1; j.ctx.path = matlabpath; jobdir = tempdir; finishcb = @(out)getresult_qsub(obj, out); qsub_submit_cm(j, jobdir, 'qsub_job', finishcb); obj.running = true; fprintf('Submitted function %s.\n', func2str(obj.prog)); case 'serial' fprintf('Running function %s.\n', func2str(obj.prog)); obj.output = feval(obj.prog, data); end end
DCS Callback
The assignment of the computation result to obj.output will trigger the listeners of other modules waiting for this result.
function getresult_dcs(obj, task, evt) %#ok<INUSD> fprintf('Finished function %s.\n', func2str(obj.prog)); if strcmpi(task.State, 'finished') obj.output = task.OutputArguments{1}; end obj.running = false; end
Qsub Callback
The assignment of the computation result to obj.output will trigger the listeners of other modules waiting for this result.
function getresult_qsub(obj, out) fprintf('Finished function %s.\n', func2str(obj.prog)); if ~isa(out, 'MException') obj.output = out{1}; end obj.running = false; end
Output Listener Callback
Check whether data source has become valid (PostSet events occur after any set operation)
function resolve(obj, src, evt) %#ok<INUSL> if isnumeric(evt.AffectedObject.output) run(obj, evt.AffectedObject.output); end end
end
end
