From 43ad15a498caa3ae08a64188c0877d3aee43af4c Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Tue, 24 Sep 2019 23:57:33 -0400 Subject: [PATCH 1/8] Wrapper: Monitor status of child task process even if parent exits On Windows, CreateProcess() is used to launch tasks, but this on its own does not handle child processes; if the parent task process exits, the workunit will be terminated. If is set in the job file, attach the task process to a job object instead, which can then be monitored to determine when all child processes are finished. --- samples/wrapper/wrapper.cpp | 81 ++++++++++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 10 deletions(-) diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index d8c89abadc3..f06f6044437 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -139,6 +139,7 @@ struct TASK { bool forward_slashes; double time_limit; int priority; + bool wait_for_children; // dynamic stuff follows double current_cpu_time; @@ -151,6 +152,8 @@ struct TASK { double elapsed_time; #ifdef _WIN32 HANDLE pid_handle; + HANDLE job_handle; + HANDLE ioport_handle; DWORD pid; struct _stat last_stat; // mod time of checkpoint file #else @@ -435,6 +438,7 @@ int TASK::parse(XML_PARSER& xp) { forward_slashes = false; time_limit = 0; priority = PROCESS_PRIORITY_LOWEST; + wait_for_children = false; while (!xp.get_tag()) { if (!xp.is_tag) { @@ -471,6 +475,7 @@ int TASK::parse(XML_PARSER& xp) { else if (xp.parse_bool("append_cmdline_args", append_cmdline_args)) continue; else if (xp.parse_double("time_limit", time_limit)) continue; else if (xp.parse_int("priority", priority)) continue; + else if (xp.parse_bool("wait_for_children", wait_for_children)) continue; } return ERR_XML_PARSE; } @@ -741,6 +746,33 @@ int TASK::run(int argct, char** argvt) { ); #ifdef _WIN32 + if (wait_for_children) { + // Some processes will launch child processes and then exit; set up a + // job object to prevent this from ending the task + // + job_handle = CreateJobObject(nullptr, nullptr); + if (!job_handle) { + fprintf(stderr, "CreateJobObject failed: %d\n", GetLastError()); + return ERR_FORK; + } + + ioport_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + nullptr, 0, 1); + if (!ioport_handle) { + fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError()); + return ERR_FORK; + } + + JOBOBJECT_ASSOCIATE_COMPLETION_PORT Port; + Port.CompletionKey = job_handle; + Port.CompletionPort = ioport_handle; + if (!SetInformationJobObject(job_handle, + JobObjectAssociateCompletionPortInformation, + &Port, sizeof(Port))) { + fprintf(stderr, "SetInformation failed: %d\n", GetLastError()); + return ERR_FORK; + } + } PROCESS_INFORMATION process_info; STARTUPINFO startup_info; string command; @@ -800,7 +832,7 @@ int TASK::run(int argct, char** argvt) { NULL, NULL, TRUE, // bInheritHandles - CREATE_NO_WINDOW|process_priority_value(priority), + CREATE_NO_WINDOW|CREATE_SUSPENDED|process_priority_value(priority), (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, @@ -820,6 +852,18 @@ int TASK::run(int argct, char** argvt) { if (env_vars) delete [] env_vars; pid_handle = process_info.hProcess; pid = process_info.dwProcessId; + if (wait_for_children) { + // Use a job object if completion of child processes is required + // + if (!AssignProcessToJobObject(job_handle, pid_handle)) { + fprintf(stderr, "Failed to assign process to job: %d\n", GetLastError()); + return ERR_FORK; + } + } + // Process was created suspended in case it needed to be attached to a job + // object; it can now be resumed + // + ResumeThread(process_info.hThread); #else int retval; char* argv[256]; @@ -925,15 +969,32 @@ bool TASK::poll(int& status) { } #ifdef _WIN32 unsigned long exit_code; - if (GetExitCodeProcess(pid_handle, &exit_code)) { - if (exit_code != STILL_ACTIVE) { - status = exit_code; - final_cpu_time = current_cpu_time; - fprintf(stderr, "%s %s exited; CPU time %f\n", - boinc_msg_prefix(buf, sizeof(buf)), - application.c_str(), final_cpu_time - ); - return true; + if (wait_for_children) { + ULONG_PTR completion_key; + LPOVERLAPPED overlapped; + if (GetQueuedCompletionStatus(ioport_handle, &exit_code, &completion_key, &overlapped, INFINITE)) { + if ((HANDLE)completion_key == job_handle && exit_code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO) { + status = exit_code; + final_cpu_time = current_cpu_time; + fprintf(stderr, "%s %s exited; CPU time %f\n", + boinc_msg_prefix(buf, sizeof(buf)), + application.c_str(), final_cpu_time + ); + return true; + } + } + } + else { + if (GetExitCodeProcess(pid_handle, &exit_code)) { + if (exit_code != STILL_ACTIVE) { + status = exit_code; + final_cpu_time = current_cpu_time; + fprintf(stderr, "%s %s exited; CPU time %f\n", + boinc_msg_prefix(buf, sizeof(buf)), + application.c_str(), final_cpu_time + ); + return true; + } } } #else From 8665946a353b5817ab586c2ec66102ff6846c745 Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Wed, 25 Sep 2019 15:43:14 -0400 Subject: [PATCH 2/8] Wrapper: Replace undeclared use of nullptr with NULL --- samples/wrapper/wrapper.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index f06f6044437..a8ec74d4fb3 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -750,14 +750,14 @@ int TASK::run(int argct, char** argvt) { // Some processes will launch child processes and then exit; set up a // job object to prevent this from ending the task // - job_handle = CreateJobObject(nullptr, nullptr); + job_handle = CreateJobObject(NULL, NULL); if (!job_handle) { fprintf(stderr, "CreateJobObject failed: %d\n", GetLastError()); return ERR_FORK; } ioport_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, - nullptr, 0, 1); + NULL, 0, 1); if (!ioport_handle) { fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError()); return ERR_FORK; From ece6bdd20b207f7ebf5e3e37595f5d3044668d24 Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Sun, 6 Oct 2019 00:38:25 -0400 Subject: [PATCH 3/8] Wrapper: Fix status code when child processes complete When using a job object to handle child processes, the status should still return 0 on success. If a child exits abnormally, the completion code is set to JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, so just return that for now. --- samples/wrapper/wrapper.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index a8ec74d4fb3..ad9fc73c222 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -970,10 +970,12 @@ bool TASK::poll(int& status) { #ifdef _WIN32 unsigned long exit_code; if (wait_for_children) { + DWORD completion_code; ULONG_PTR completion_key; LPOVERLAPPED overlapped; - if (GetQueuedCompletionStatus(ioport_handle, &exit_code, &completion_key, &overlapped, INFINITE)) { - if ((HANDLE)completion_key == job_handle && exit_code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO) { + exit_code = 0; + if (GetQueuedCompletionStatus(ioport_handle, &completion_code, &completion_key, &overlapped, INFINITE)) { + if ((HANDLE)completion_key == job_handle && completion_code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO) { status = exit_code; final_cpu_time = current_cpu_time; fprintf(stderr, "%s %s exited; CPU time %f\n", @@ -982,6 +984,9 @@ bool TASK::poll(int& status) { ); return true; } + else if ((HANDLE)completion_key == job_handle && completion_code == JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS) { + exit_code = JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS; + } } } else { From d0ec1c530f508b6409fec35e05287f30831eb129 Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Thu, 10 Oct 2019 13:01:59 -0400 Subject: [PATCH 4/8] Wrapper: Add different stderr log message on task exit if waiting for children --- samples/wrapper/wrapper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index ad9fc73c222..a19aae20ffa 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -978,7 +978,7 @@ bool TASK::poll(int& status) { if ((HANDLE)completion_key == job_handle && completion_code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO) { status = exit_code; final_cpu_time = current_cpu_time; - fprintf(stderr, "%s %s exited; CPU time %f\n", + fprintf(stderr, "%s %s all processes exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); From 649ac78adf97d8127ef9b789b1ae0fdeed20fd4d Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Fri, 18 Oct 2019 23:39:54 -0400 Subject: [PATCH 5/8] Wrapper: Add process control for job objects Add routines for handling kill(), stop(), and resume() calls on tasks that use the option --- lib/proc_control.cpp | 27 +++++++++++++++++++++++++++ lib/proc_control.h | 5 +++++ samples/wrapper/wrapper.cpp | 17 ++++++++++++++--- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/lib/proc_control.cpp b/lib/proc_control.cpp index 8c87e8f351a..f34af7de0ca 100644 --- a/lib/proc_control.cpp +++ b/lib/proc_control.cpp @@ -277,6 +277,33 @@ void suspend_or_resume_process(int pid, bool resume) { #endif } +#ifdef _WIN32 + +// Handling of job objects +// + +void get_job_object_processes(HANDLE job_handle, vector& pids) { + JOBOBJECT_BASIC_PROCESS_ID_LIST process_list; + QueryInformationJobObject(job_handle, JobObjectBasicProcessIdList, &process_list, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST), NULL); + for (int i = 0; i < process_list.NumberOfProcessIdsInList; i++) { + pids.push_back(process_list.ProcessIdList[i]); + } +} + +void suspend_or_resume_job_object(HANDLE job_handle, bool resume) { + vector pids; + get_job_object_processes(job_handle, pids); + suspend_or_resume_threads(pids, 0, resume, false); +} + +void kill_job_object(HANDLE job_handle) { + vector pids; + get_job_object_processes(job_handle, pids); + kill_all(pids); +} + +#endif + // return OS-specific value associated with priority code // int process_priority_value(int priority) { diff --git a/lib/proc_control.h b/lib/proc_control.h index 972e87b63fc..1a0f1f845e6 100644 --- a/lib/proc_control.h +++ b/lib/proc_control.h @@ -36,6 +36,11 @@ extern void kill_descendants(int child_pid=0); #endif extern void suspend_or_resume_descendants(bool resume); extern void suspend_or_resume_process(int pid, bool resume); +#ifdef _WIN32 +extern void get_job_object_processes(HANDLE job_handle, std::vector& pids); +extern void suspend_or_resume_job_object(HANDLE job_handle, bool resume); +extern void kill_job_object(HANDLE job_handle); +#endif extern int process_priority_value(int); diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index a19aae20ffa..adec055061e 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -1032,14 +1032,22 @@ bool TASK::poll(int& status) { // void TASK::kill() { #ifdef _WIN32 - kill_descendants(); + if (wait_for_children) { + kill_job_object(job_handle); + } + else { + kill_descendants(); + } #else kill_descendants(pid); #endif } void TASK::stop() { - if (multi_process) { + if (wait_for_children) { + suspend_or_resume_job_object(job_handle, false); + } + else if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); @@ -1048,7 +1056,10 @@ void TASK::stop() { } void TASK::resume() { - if (multi_process) { + if (wait_for_children) { + suspend_or_resume_job_object(job_handle, true); + } + else if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); From e75d1002d614bd59734b04bf6d067189044a289f Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Fri, 18 Oct 2019 23:46:41 -0400 Subject: [PATCH 6/8] Wrapper: Fix whitespace around else --- samples/wrapper/wrapper.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index adec055061e..afd5db186e9 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -1034,8 +1034,7 @@ void TASK::kill() { #ifdef _WIN32 if (wait_for_children) { kill_job_object(job_handle); - } - else { + } else { kill_descendants(); } #else @@ -1046,8 +1045,7 @@ void TASK::kill() { void TASK::stop() { if (wait_for_children) { suspend_or_resume_job_object(job_handle, false); - } - else if (multi_process) { + } else if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); @@ -1058,8 +1056,7 @@ void TASK::stop() { void TASK::resume() { if (wait_for_children) { suspend_or_resume_job_object(job_handle, true); - } - else if (multi_process) { + } else if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); From efaffa13daac39800b66d52a6dca87dcd030131b Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Tue, 22 Oct 2019 11:24:21 -0400 Subject: [PATCH 7/8] Wrapper: Fix out of scope variable on Linux and Mac The job_handle for job objects is only relevant on Windows, so should not be referenced outside of _WIN32 blocks --- samples/wrapper/wrapper.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index afd5db186e9..9141c6ed6e8 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -1043,9 +1043,14 @@ void TASK::kill() { } void TASK::stop() { +#ifdef _WIN32 if (wait_for_children) { suspend_or_resume_job_object(job_handle, false); - } else if (multi_process) { + suspended = true; + return; + } +#endif + if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); @@ -1054,9 +1059,14 @@ void TASK::stop() { } void TASK::resume() { +#ifdef _WIN32 if (wait_for_children) { suspend_or_resume_job_object(job_handle, true); - } else if (multi_process) { + suspended = false; + return; + } +#endif + if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); From 471b0a01f47c0691621039242e745a52dfe5bf8c Mon Sep 17 00:00:00 2001 From: Tristan Olive Date: Thu, 31 Oct 2019 13:22:29 -0400 Subject: [PATCH 8/8] Wrapper: Fix job control for job objects The get_job_object_processes() function was not providing a complete list of PIDs, as the cbJobObjectInformationLength parameter passed to QueryInformationJobObject() needed to be larger. It should now accomodate up to 32 processes in the job object. Also related to job control, having no timeout set in the GetQueuedCompletionStatus() call was causing task polling to hang indefinitely when a child process launched another child process. Set the timeout to 3000ms to prevent this. --- lib/proc_control.cpp | 10 ++++++---- samples/wrapper/wrapper.cpp | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/proc_control.cpp b/lib/proc_control.cpp index f34af7de0ca..311249dcce1 100644 --- a/lib/proc_control.cpp +++ b/lib/proc_control.cpp @@ -283,10 +283,12 @@ void suspend_or_resume_process(int pid, bool resume) { // void get_job_object_processes(HANDLE job_handle, vector& pids) { - JOBOBJECT_BASIC_PROCESS_ID_LIST process_list; - QueryInformationJobObject(job_handle, JobObjectBasicProcessIdList, &process_list, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST), NULL); - for (int i = 0; i < process_list.NumberOfProcessIdsInList; i++) { - pids.push_back(process_list.ProcessIdList[i]); + int max_job_object_processes = 32; + PJOBOBJECT_BASIC_PROCESS_ID_LIST process_list; + process_list = (PJOBOBJECT_BASIC_PROCESS_ID_LIST)LocalAlloc(LPTR, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST) + (max_job_object_processes - 1) * 32); + QueryInformationJobObject(job_handle, JobObjectBasicProcessIdList, process_list, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST) + (max_job_object_processes - 1) * 32, NULL); + for (int i = 0; i < process_list->NumberOfProcessIdsInList; i++) { + pids.push_back((int)process_list->ProcessIdList[i]); } } diff --git a/samples/wrapper/wrapper.cpp b/samples/wrapper/wrapper.cpp index 9141c6ed6e8..5e216ccfd36 100644 --- a/samples/wrapper/wrapper.cpp +++ b/samples/wrapper/wrapper.cpp @@ -974,7 +974,7 @@ bool TASK::poll(int& status) { ULONG_PTR completion_key; LPOVERLAPPED overlapped; exit_code = 0; - if (GetQueuedCompletionStatus(ioport_handle, &completion_code, &completion_key, &overlapped, INFINITE)) { + if (GetQueuedCompletionStatus(ioport_handle, &completion_code, &completion_key, &overlapped, 3000)) { if ((HANDLE)completion_key == job_handle && completion_code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO) { status = exit_code; final_cpu_time = current_cpu_time;