-
Notifications
You must be signed in to change notification settings - Fork 6
/
batch_job_distrib.m
284 lines (272 loc) · 10.6 KB
/
batch_job_distrib.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
%BATCH_JOB_DISTRIB Distribute a MATLAB for loop across several PCs
%
% output = batch_job_distrib(func, input, [workers, [global_data]], ...)
%
% If you have a for loop which can be written as:
%
% for a = 1:size(input, 2)
% output(:,a) = func(input(:,a), global_data);
% end
%
% where input is a numeric array and output is a numeric or cell array,
% then batch_job_distrib() can parallelize the work across multiple worker
% MATLAB instances on multiple (unlimited) networked worker PCs as follows:
%
% output = batch_job_distrib(func, input, workers, global_data);
%
% This is a replacement for parfor in this use case, if you don't have the
% Parallel Computing Toolbox and/or Distributed Computing Server.
%
% There is also an asynchronous mode, which returns immediately, passing
% back a handle to a function which can load the output data later:
%
% output = batch_job_distrib(..., '-async'); % Start an asynchronous computation
% ... % Do other stuff here
% output = output(); % Get the results here
%
% The function can always spread the work across multiple MATLABs on the
% local PC, but the requirements for it to run on OTHER PCs are that:
% - There is an ssh executable on the system path of the local PC.
% - All worker PCs can be ssh'd into non-interactively (i.e. without
% manually entering a password).
% - MATLAB is on the system path of all worker PCs.
% - Every worker has a valid license for MATLAB and all required toolboxes
% for the user ssh'ing in.
% - The current directory can be written to from all worker PCs via the
% SAME path.
% - All the required functions are on the MATLAB paths of every worker PC.
% - The networked filesystem supports file locks (not crucial, but safer).
% - All the worker PCs honour the networked filesystem file locks (again,
% not crucial, but safer).
%
% The input arguments func and global_data may optionally be function
% names. When the latter is called it outputs the true global_data. Note
% that global_data could be incorporated into func before calling
% batch_job, using an anonymous function, but the functionality provided
% here is more flexible. For example, normally every worker loads a copy of
% global_data into its own memory space, but this can be avoided if
% global_data is a function which loads the data into shared memory via a
% memory mapped file. Indeed, this is the most efficient way of doing
% things - the data doesn't need to be saved to disk first (as it's already
% on the disk), and only one copy is loaded per PC, regardless of the
% number of workers on that PC. Passing global_data through a function call
% also allows the function to do further initializations, such as setting
% the path.
%
% Notes:
% - The workers need not all run the same operating system, but they must
% all have working versions of the required functions, including where
% these are platform-dependent, e.g. mex files.
% - If a single worker uses 100% of CPU due to the code already being
% parallelized internally, then there is little to be gained by having
% more than one MATLAB worker instance on each worker PC.
% - There is little point using this function if the for loop would
% complete in a single MATLAB instance faster than it takes to start a
% second MATLAB instance and load the necessary data in that instance.
%
%IN:
% func - a function handle or function name string
% input - Mx..xN numeric input data array, to be iterated over the
% trailing dimension.
% workers - Wx2 cell array, with each row being {hostname, num_workers},
% hostname being a text string indicating the name of a worker
% PC ('' for the local PC), and num_workers being the number of
% MATLAB worker instances to start on that PC. Default: {'',
% feature('numCores')}.
% global_data - a data structure, or function handle or function name
% string of a function which returns a data structure, to
% be passed to func. Default: global_data not passed to
% func.
% '-async' - flag indicating to operate in asynchronous mode, returning
% immediately, and completing the job in the background.
% '-progress' - flag indicating to display a progress bar.
% '-keep' - flag indicating intermediate result files should be kept.
% '-timeout', timeInSecs - option pair indicating a maximum time to allow
% each iteration to run before killing it. 0
% means no timeout is used. If non-zero, the
% current MATLAB instance is not used to run any
% iterations. If negative, the absolute value is
% used, but iterations are rerun if they
% previously timed out; otherwise timed-out
% iterations are skipped. Default: 0 (no
% timeout).
% '-chunk_lims', [min max] - option pair indicating the minimum and
% maximum number of loop iterations to run per
% chunk of work distributed to each worker.
% Default: [1 1e10].
%
%OUT:
% output - Px..xN numeric or cell output array, or if in asynchronous
% mode, a handle to a function which will return the output
% array when called (blocking while the batch job finishes, if
% necessary).
%
% See also BATCH_JOB, BATCH_JOB_SUBMIT, BATCH_JOB_COLLECT, BATCH_JOB_WORKER, PARFOR
function output = batch_job_distrib(varargin)
% Check for flags
chunk_lims = [1 1e10];
async = false;
progress = false;
keep = false;
timeout = 0;
M = true(size(varargin));
a = 1;
while a <= nargin
V = varargin{a};
if ischar(V)
switch V
case '-keep'
keep = true;
M(a) = false;
case '-async'
async = true;
M(a) = false;
case '-progress'
progress = true;
M(a) = false;
case '-timeout'
a = a + 1;
timeout = varargin{a};
assert(isscalar(timeout));
M(a-1:a) = false;
case '-chunk_lims'
a = a + 1;
chunk_lims = varargin{a};
assert(numel(chunk_lims) == 2 && isposint(chunk_lims(1)) && isposint(chunk_lims(2)) && chunk_lims(2) >= chunk_lims(1), 'chunk_lims should be a 1x2 vector of positive integers');
M(a-1:a) = false;
end
end
a = a + 1;
end
varargin = varargin(M);
progress = progress & usejava('awt');
% Get the arguments
sargs{5} = chunk_lims;
sargs{4} = timeout;
sargs{3} = varargin{2};
sargs{2} = varargin{1};
sargs{1} = cd();
assert(isnumeric(varargin{2}));
N = numel(varargin);
if N > 2 && ~isempty(varargin{3})
workers = varargin{3};
assert(iscell(workers) && size(workers, 2) == 2);
else
workers = {'', feature('numCores')};
end
% Check the worker array makes sense
for w = 1:size(workers, 1)
assert(ischar(workers{w,1}) && isposint(workers{w,2}));
if isequal(workers{w,1}, '')
workers{w,2} = workers{w,2} - (~async && timeout == 0); % Start one less if we use this MATLAB instance too
end
end
if N > 3
sargs{6} = varargin{4};
end
% Submit the job to the workers
s = batch_job_submit(sargs{:});
% Create the wait bar
hb = [];
if progress
hb = waitbar(0, 'Starting...', 'Name', 'Batch job processing...');
% Initialize the waitbar data
info.bar = hb;
info.time = tic();
info.dir_str = [s.work_dir 'chunk*.mat'];
info.nChunks = ceil(s.N / s.chunk_size);
% Start a timer to update the waitbar
info.timer = timer('ExecutionMode', 'fixedSpacing', 'Period', 2, 'StartDelay', 2, 'Tag', s.work_dir, 'TimerFcn', @progress_func);
set(info.timer, 'UserData', info);
start(info.timer);
end
% Make sure the directory gets deleted on exit or if we quit early
co = onCleanup(@() cleanup_all(s, hb, keep, workers));
% Start the workers
start_workers(s, workers);
if async
% Return a handle to the function for getting the output
output = @() batch_job_collect(s, co);
else
% Return the output
output = batch_job_collect(s, co);
end
end
function cleanup_all(s, hb, keep, workers)
% Stop any timers
ht = timerfindall('Tag', s.work_dir);
for a = 1:numel(ht)
stop(ht(a));
delete(ht(a));
end
% Close the waitbar
try
close(hb);
catch
end
% Check if we need to send the kill signal
if ~kill_signal(s)
% Send the signal
kill_signal(s);
fprintf('Please wait while the workers are halted.\n');
keep = false;
% Wait for the workers to stop
str = [s.work_dir '*.lock'];
tic;
while ~isempty(dir(str)) && toc < 4
pause(0.05);
end
end
if ~keep
% Wait for all files to be closed
pause(0.05);
% Remove all the other files and the work directory
rmdir(s.work_dir, 's');
end
% Delete the remote worker scripts
for w = 1:size(workers, 1)
if ~isequal(workers{w,1}, '')
% Remove the command file
try
[status, cmdout] = system(sprintf('ssh %s "rm -f ./batch_job_distrib_cmd.bat"', workers{w,1}));
assert(status == 0, cmdout);
catch me
% Error catching
fprintf('Could not delete batch script on host %s\n', workers{w,1});
fprintf('%s\n', getReport(me, 'basic'));
end
end
end
end
function progress_func(ht, varargin)
% Get the progress bar info
info = get(ht, 'UserData');
% Compute the proportion finished
proportion = numel(dir(info.dir_str)) / info.nChunks;
% Check if done
if proportion >= 1
stop(info.timer);
delete(info.timer);
close(info.bar);
drawnow;
return;
end
% Update the title
t_elapsed = toc(info.time);
t_remaining = ((1 - proportion) * t_elapsed) / proportion;
newtitle = sprintf('Elapsed: %s', timestr(t_elapsed));
if proportion > 0.01 || (t_elapsed > 30 && proportion ~= 0)
if t_remaining < 600
newtitle = sprintf('%s, Remaining: %s', newtitle, timestr(t_remaining));
else
newtitle = sprintf('%s, ETA: %s', newtitle, datestr(datenum(clock()) + (t_remaining * 1.15741e-5), 0));
end
end
% Protect against the waitbar being closed
try
waitbar(proportion, info.bar, newtitle);
catch me
stop(info.timer);
delete(info.timer);
end
end