Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapper: Monitor status of child task process even if parent exits #3307

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
29 changes: 29 additions & 0 deletions lib/proc_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,35 @@ void suspend_or_resume_process(int pid, bool resume) {
#endif
}

#ifdef _WIN32

// Handling of job objects
//

void get_job_object_processes(HANDLE job_handle, vector<int>& pids) {
int max_job_object_processes = 32;
PJOBOBJECT_BASIC_PROCESS_ID_LIST process_list;
process_list = (PJOBOBJECT_BASIC_PROCESS_ID_LIST)LocalAlloc(LPTR, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST) + (max_job_object_processes - 1) * 32);
QueryInformationJobObject(job_handle, JobObjectBasicProcessIdList, process_list, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST) + (max_job_object_processes - 1) * 32, NULL);
for (int i = 0; i < process_list->NumberOfProcessIdsInList; i++) {
pids.push_back((int)process_list->ProcessIdList[i]);
}
}

void suspend_or_resume_job_object(HANDLE job_handle, bool resume) {
vector<int> pids;
get_job_object_processes(job_handle, pids);
suspend_or_resume_threads(pids, 0, resume, false);
}

void kill_job_object(HANDLE job_handle) {
vector<int> pids;
get_job_object_processes(job_handle, pids);
kill_all(pids);
}

#endif

// return OS-specific value associated with priority code
//
int process_priority_value(int priority) {
Expand Down
5 changes: 5 additions & 0 deletions lib/proc_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ extern void kill_descendants(int child_pid=0);
#endif
extern void suspend_or_resume_descendants(bool resume);
extern void suspend_or_resume_process(int pid, bool resume);
#ifdef _WIN32
extern void get_job_object_processes(HANDLE job_handle, std::vector<int>& pids);
extern void suspend_or_resume_job_object(HANDLE job_handle, bool resume);
extern void kill_job_object(HANDLE job_handle);
#endif

extern int process_priority_value(int);

Expand Down
106 changes: 95 additions & 11 deletions samples/wrapper/wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ struct TASK {
bool forward_slashes;
double time_limit;
int priority;
bool wait_for_children;

// dynamic stuff follows
double current_cpu_time;
Expand All @@ -151,6 +152,8 @@ struct TASK {
double elapsed_time;
#ifdef _WIN32
HANDLE pid_handle;
HANDLE job_handle;
HANDLE ioport_handle;
DWORD pid;
struct _stat last_stat; // mod time of checkpoint file
#else
Expand Down Expand Up @@ -435,6 +438,7 @@ int TASK::parse(XML_PARSER& xp) {
forward_slashes = false;
time_limit = 0;
priority = PROCESS_PRIORITY_LOWEST;
wait_for_children = false;

while (!xp.get_tag()) {
if (!xp.is_tag) {
Expand Down Expand Up @@ -471,6 +475,7 @@ int TASK::parse(XML_PARSER& xp) {
else if (xp.parse_bool("append_cmdline_args", append_cmdline_args)) continue;
else if (xp.parse_double("time_limit", time_limit)) continue;
else if (xp.parse_int("priority", priority)) continue;
else if (xp.parse_bool("wait_for_children", wait_for_children)) continue;
}
return ERR_XML_PARSE;
}
Expand Down Expand Up @@ -741,6 +746,33 @@ int TASK::run(int argct, char** argvt) {
);

#ifdef _WIN32
if (wait_for_children) {
// Some processes will launch child processes and then exit; set up a
// job object to prevent this from ending the task
//
job_handle = CreateJobObject(NULL, NULL);
if (!job_handle) {
fprintf(stderr, "CreateJobObject failed: %d\n", GetLastError());
return ERR_FORK;
}

ioport_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL, 0, 1);
if (!ioport_handle) {
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return ERR_FORK;
}

JOBOBJECT_ASSOCIATE_COMPLETION_PORT Port;
Port.CompletionKey = job_handle;
Port.CompletionPort = ioport_handle;
if (!SetInformationJobObject(job_handle,
JobObjectAssociateCompletionPortInformation,
&Port, sizeof(Port))) {
fprintf(stderr, "SetInformation failed: %d\n", GetLastError());
return ERR_FORK;
}
}
PROCESS_INFORMATION process_info;
STARTUPINFO startup_info;
string command;
Expand Down Expand Up @@ -800,7 +832,7 @@ int TASK::run(int argct, char** argvt) {
NULL,
NULL,
TRUE, // bInheritHandles
CREATE_NO_WINDOW|process_priority_value(priority),
CREATE_NO_WINDOW|CREATE_SUSPENDED|process_priority_value(priority),
(LPVOID) env_vars,
exec_dir.empty()?NULL:exec_dir.c_str(),
&startup_info,
Expand All @@ -820,6 +852,18 @@ int TASK::run(int argct, char** argvt) {
if (env_vars) delete [] env_vars;
pid_handle = process_info.hProcess;
pid = process_info.dwProcessId;
if (wait_for_children) {
// Use a job object if completion of child processes is required
//
if (!AssignProcessToJobObject(job_handle, pid_handle)) {
fprintf(stderr, "Failed to assign process to job: %d\n", GetLastError());
return ERR_FORK;
}
}
// Process was created suspended in case it needed to be attached to a job
// object; it can now be resumed
//
ResumeThread(process_info.hThread);
#else
int retval;
char* argv[256];
Expand Down Expand Up @@ -925,15 +969,37 @@ bool TASK::poll(int& status) {
}
#ifdef _WIN32
unsigned long exit_code;
if (GetExitCodeProcess(pid_handle, &exit_code)) {
if (exit_code != STILL_ACTIVE) {
status = exit_code;
final_cpu_time = current_cpu_time;
fprintf(stderr, "%s %s exited; CPU time %f\n",
boinc_msg_prefix(buf, sizeof(buf)),
application.c_str(), final_cpu_time
);
return true;
if (wait_for_children) {
DWORD completion_code;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
exit_code = 0;
if (GetQueuedCompletionStatus(ioport_handle, &completion_code, &completion_key, &overlapped, 3000)) {
if ((HANDLE)completion_key == job_handle && completion_code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO) {
status = exit_code;
final_cpu_time = current_cpu_time;
fprintf(stderr, "%s %s all processes exited; CPU time %f\n",
boinc_msg_prefix(buf, sizeof(buf)),
application.c_str(), final_cpu_time
);
return true;
}
else if ((HANDLE)completion_key == job_handle && completion_code == JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS) {
exit_code = JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS;
}
}
}
else {
if (GetExitCodeProcess(pid_handle, &exit_code)) {
if (exit_code != STILL_ACTIVE) {
status = exit_code;
final_cpu_time = current_cpu_time;
fprintf(stderr, "%s %s exited; CPU time %f\n",
boinc_msg_prefix(buf, sizeof(buf)),
application.c_str(), final_cpu_time
);
return true;
}
}
}
#else
Expand Down Expand Up @@ -966,13 +1032,24 @@ bool TASK::poll(int& status) {
//
void TASK::kill() {
#ifdef _WIN32
kill_descendants();
if (wait_for_children) {
kill_job_object(job_handle);
} else {
kill_descendants();
}
#else
kill_descendants(pid);
#endif
}

void TASK::stop() {
#ifdef _WIN32
if (wait_for_children) {
suspend_or_resume_job_object(job_handle, false);
suspended = true;
return;
}
#endif
if (multi_process) {
suspend_or_resume_descendants(false);
} else {
Expand All @@ -982,6 +1059,13 @@ void TASK::stop() {
}

void TASK::resume() {
#ifdef _WIN32
if (wait_for_children) {
suspend_or_resume_job_object(job_handle, true);
suspended = false;
return;
}
#endif
if (multi_process) {
suspend_or_resume_descendants(true);
} else {
Expand Down