Code covered by the BSD License  

Highlights from
Parallel Distributed Processing of Weka Algorithms in Matlab

image thumbnail

Parallel Distributed Processing of Weka Algorithms in Matlab



05 Oct 2011 (Updated )

Run Weka algorithms in parallel across distributed computers to exploit available hardware.

function runParallelWeka
% This function runs multiple executions of a Weka machine learning algorithm
% in parallel across distributed computers.  Each execution is run on the
% same data but with different algorithm parameters, each execution as a 
% separate job.
% The function returns as soon as jobs have been scheduled (asynchronous). 
% The result of each job, a classification model, is stored in a cell array, 
% and upon completion of all jobs, the models are saved to file on the local 
% machine.
% Before running, set variable "config" at the top of the code to the name
% of a defined job manager. (To define a job manager see "Configuring Parallel 
% Processing.txt".)  The name cannot be "local" as the local scheduler uses 
% jobs of type simplejob that do not support a job timeout parameter.
% Subfunctions: 
% * jobRun is the function that is executed within each job.
% * jobFinished is invoked upon completion of each job.
% This file and containing folder must reside at the same path on all 
% machines over which processing is distributed.
% Author: Jaspar Cahill, 30 Sept 2011.

% settings
config = 'localcores'; % <<< replace this with the name of defined job manager
cVals = [Inf 100 10 5 1 0.5 0.05];
timeoutMins = 2;
resultsFile = 'results.dat';
wekaJarFile = 'weka.jar';

% load dataset  
dat=dataset('XLSFile','dataset.xls', 'ReadVarNames', false); 
dat.Properties.Description = 'mydataset';

[pathstr, name, ext] = fileparts( mfilename('fullpath'));
javaaddpath([pathstr filesep wekaJarFile]);

if ~isempty(JM.Jobs)
finishedCount = 0;
models = {};
J = {};  % list of created jobs

% create and submit jobs to scheduler
for i=1:length(cVals)
    J{i} = createJob(JM, 'PathDependencies', getNonMatlabPaths(),... 
        'FinishedFcn', @jobFinished, 'Timeout', 60*timeoutMins);
    userdata.i = i; % job number
    set(J{i}, 'userdata', userdata);
    T = createTask(J{i}, @jobRun, 1, { dat cVals(i) }); 

function retVal = jobRun(dat,cVal) 
    % This function is the function that is executed within each job.  The 
    % SVM algorithm in Weka is applied to the supplied data using the 
    % specified C value.  Returned is the generated SVM model.
    % Weka performance evaluation code is unused. It can be uncommented, 
    % however, without a means of dealing with the variable length parameter
    % in method evaluateModel(..), the following java method may be
    % used instead, inserted into weka.classifiers.Evaluation:
    %  public double[] evaluateModel1(Classifier classifier,
    %      Instances data)throws Exception {
    %	  return evaluateModel(  classifier,
    %            data);
    %   }
    % inform worker of the location of the Weka library.                 
    javaaddpath([pathstr filesep wekaJarFile]);
    % convert dataset to Weka format (class Instances).        
    train = datasetToWeka(dat);
    test = datasetToWeka(dat);
    % use Weka to train classifier
    c = weka.classifiers.functions.SMO();     
    k = weka.classifiers.functions.supportVector.PolyKernel();
    %k = weka.classifiers.functions.supportVector.RBFKernel();
    % evaluate model (simple evaluation over training set)
    ev = weka.classifiers.Evaluation(test);
    ev.evaluateModel1(c, test);
    f = ev.fMeasure(1);
    tp = ev.truePositiveRate(1);
    fp = ev.falsePositiveRate(1);
    retVal = c;      

function jobFinished(job, event)
    % This function is called at the completion of a job.  It prints some
    % job information to the command window: description, finish status
    % (ok, error, empty result, timeout), and elapsed execution time.
    % The returned result of the job is saved to the cell array "models".
    % If this is the last job to finish, "models" is saved to file on
    % the local machine.
    u = get(job, 'userdata');
    s=sprintf('JOB FINISHED: %s cval: %d', dat.Properties.Description, cVals(u.i));
    iserror = get(job.Tasks(1), 'errormessage');           
    if isempty(iserror)          
        v = getAllOutputArguments(job);                     
        if isempty(v{1})
            disp('EMPTY RESULT RETURNED');
        models{u.i} = v{1};  
        disp('ERROR OR TIMEOUT: '); disp(dat.Properties.Description);
        models{u.i}= NaN;  
    s=datenum(job.StartTime,'ddd mmm dd HH:MM:SS zzz yyyy');
    e=datenum(job.FinishTime,'ddd mmm dd HH:MM:SS zzz yyyy');    
    secs = (e-s) * 24 * 60 * 60;
    [h m s]=sec2hms(secs);   
    s=sprintf('%.2f:%.2f:%.2f', h, m, s);
    finishedCount = finishedCount+1;
    if finishedCount >= length(J) = dat;
      results.models = models;
      save([pathstr filesep resultsFile], 'results');


Contact us