function PP_SERVER(PR_OPTIONS,PR_REQUEST_PATH)
%PP_SERVER(PR_OPTIONS,PR_REQUEST_PATH)
%
% This function starts a Parallel Processing server session
% which handles requests posted in the directory PR_REQUEST_PATH..
% Behavior of this function can be customized via the
% parameter PR_OPTIONS which is a text string of the
% form "opt1=val1&opt2=val2&...." The options allowed are:
%
% NAME - A text string to identify this server session.
% The default is 'PP_SERVER_computer'.
% PAUSE - When equal to a positive value,
% the function will pause the indicated number of seconds
% between checks for additional requests to process.
% Otherwise, this function will return as soon as all
% outstanding requests have been processed..
% This file and the DistributePP software package were created and
% are maintained by Michael D. DeVore. The package originated in
% November, 1999. Permission is granted by the author for anyone
% to use or modify this software provided that:
% (1) any modified files are documented internally to clearly indicate
% that they have been modified from the original release; and
% (2) it is recognized that neigher Michael D. DeVore nor Washington
% University assumes any liability for the use or misuse of the software.
% Default option values...
LV_PAUSE = 0;
LV_SERVER_NAME = ['PP_SERVER_' computer];
% Extract options...
LV_OPTION_TAIL = PR_OPTIONS;
while length(LV_OPTION_TAIL) > 0
[LV_OPT,LV_OPTION_TAIL] = strtok(LV_OPTION_TAIL,'=');
LV_OPT = deblank(strrep(LV_OPT,'&',''));
[LV_VAL,LV_OPTION_TAIL] = strtok(LV_OPTION_TAIL,'&');
LV_VAL = strrep(LV_VAL,'=','');
if strcmp('PAUSE',upper(LV_OPT)) == 1
LV_PAUSE = str2num(LV_VAL);
end
if strcmp('NAME',upper(LV_OPT)) == 1
LV_SERVER_NAME = LV_VAL;
end
end
% Generate a unique name for this server based upon the third parameter...
LV_SERVER_NAME = [LV_SERVER_NAME '_' num2str(PP_SEQUENCE([PR_REQUEST_PATH LV_SERVER_NAME '.seq']))];
% We will log all operations to this file...
LV_LOG_FILE = fopen([PR_REQUEST_PATH LV_SERVER_NAME '.log'],'w');
fprintf(LV_LOG_FILE,'PP Server %s commencing processing at %s...',LV_SERVER_NAME,datestr(now));
% Capture the path and working directory so we can set them back after each operation...
LV_DEFAULT_PATH = path;
LV_WORKING_DIR = pwd;
LV_STOP_FLAG = 0;
while ~LV_STOP_FLAG
[LV_STOP_FLAG,LV_REQUEST,LV_FID] = GET_NEXT_REQUEST(PR_REQUEST_PATH,LV_SERVER_NAME);
if LV_FID >= 0
fprintf(LV_LOG_FILE,'\n\nProcessing request %s at %s...',LV_REQUEST,datestr(now));
load(LV_REQUEST);
addpath(LV_PROC_SPEC.Path);
cd(LV_PROC_SPEC.WorkingDir);
lasterr('');
% Matlab 5.0 does not have try...catch...end blocks, so we resort to this for compatibility...
eval('LV_PROC_RESULT = feval(LV_PROC_SPEC.Command,LV_PROC_SPEC.Arguments{:}); LV_LAST_ERR = '''';',...
'LV_LAST_ERR = lasterr; LV_PROC_RESULT = '''';');
if isempty(findstr(LV_LAST_ERR,'Out of memory'))
save(LV_REQUEST,'LV_PROC_RESULT','LV_LAST_ERR','-append')
delete([LV_REQUEST '.req']);
fprintf(LV_LOG_FILE,'\n Completed request %s at %s. %s',LV_REQUEST,datestr(now),...
strrep(LV_LAST_ERR,sprintf('\n'),': '));
else
fprintf(LV_LOG_FILE,'\n Out of memory failure on request %s at %s.',LV_REQUEST,datestr(now));
end
clear global;
path(LV_DEFAULT_PATH);
cd(LV_WORKING_DIR);
PP_CLOSE_LOCK(LV_FID);
else
if (LV_PAUSE > 0) & (~LV_STOP_FLAG)
pause(LV_PAUSE);
else
LV_STOP_FLAG = 1;
end
end
end
fprintf(LV_LOG_FILE,'\n\nServer termination at %s.\n',datestr(now));
fclose(LV_LOG_FILE);
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% End of PP_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
function [RT_STOP_FLAG,RT_REQUEST,RT_FID] = GET_NEXT_REQUEST(PR_REQUEST_PATH,PR_SERVER_NAME)
% This function locks the next indicator file in the given path, creates a flag file
% indicating that processing has begun, and returns the request and file id of a path
% to that indicator.
RT_STOP_FLAG = 0;
RT_REQUEST = '';
RT_FID = -1;
if exist([PR_REQUEST_PATH 'STOP_ALL.ind'],'file') | exist([PR_REQUEST_PATH 'STOP_' PR_SERVER_NAME '.ind'],'file')
RT_STOP_FLAG = 1;
else
% Get a list of request files and sort them by date...
LV_DIR = dir([PR_REQUEST_PATH '*.req']);
if length(LV_DIR) ~= 0
LV_TIMESTAMP = [];
for LV_INDX =1:length(LV_DIR)
LV_TIMESTAMP(LV_INDX)=datenum(LV_DIR(LV_INDX).date);
end
[LV_TIMESTAMP,LV_INDX] = sort(LV_TIMESTAMP);
LV_DIR = LV_DIR(LV_INDX);
LV_IND_LIST = cell(length(LV_DIR),1);
[LV_IND_LIST{:}] = deal(LV_DIR.name);
LV_INDX = 0;
while (LV_INDX < length(LV_IND_LIST)) & (RT_FID < 0)
LV_INDX = LV_INDX + 1;
RT_FID = PP_OPEN_LOCK([PR_REQUEST_PATH LV_IND_LIST{LV_INDX}]);
% Ensure that we don't attempt to re-try a computation we've already failed at...
LV_PRC_FILE = [PR_REQUEST_PATH strrep(LV_IND_LIST{LV_INDX},'.req','') '.prc'];
if exist(LV_PRC_FILE,'file') & (RT_FID >= 0)
LV_ALREADY_TRIED = 0;
LV_PRC_FILE = fopen(LV_PRC_FILE,'rt');
while (~feof(LV_PRC_FILE)) & (~LV_ALREADY_TRIED)
LV_RECORD = fgetl(LV_PRC_FILE);
LV_ALREADY_TRIED = strcmp(PR_SERVER_NAME,strtok(LV_RECORD,','));
end
fclose(LV_PRC_FILE);
if LV_ALREADY_TRIED
PP_CLOSE_LOCK(RT_FID);
RT_FID = -1;
end
end
end
if RT_FID >=0
RT_REQUEST = [PR_REQUEST_PATH strrep(LV_IND_LIST{LV_INDX},'.req','')];
LV_PRC_FILE = fopen([RT_REQUEST '.prc'],'at');
fprintf(LV_PRC_FILE,'\n%s, %s',PR_SERVER_NAME,datestr(now));
fclose(LV_PRC_FILE);
end
end
end