Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Robustify fork server launching & problem detection, part 3
Browse files Browse the repository at this point in the history
This change addresses the problem (5) from the bug:

5. The timeout is detected and logged, but the diagnostic doesn't point at the root cause of the problem, because the fork server's log (which is redirected to a file) is not printed.

PiperOrigin-RevId: 516387799
  • Loading branch information
ussuri authored and copybara-github committed Mar 14, 2023
1 parent 8b0ed67 commit 68c592a
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 27 deletions.
2 changes: 2 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,9 @@ cc_library(
":logging",
":util",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",
],
)
Expand Down
181 changes: 155 additions & 26 deletions command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@

#include "absl/log/check.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_replace.h"
#include "absl/strings/str_split.h"
#include "absl/time/clock.h"
#include "./logging.h"
#include "./util.h"
Expand Down Expand Up @@ -103,30 +106,112 @@ bool Command::StartForkServer(std::string_view temp_dir_path,
.append(absl::StrCat(prefix, "_FIFO0"));
fifo_path_[1] = std::filesystem::path(temp_dir_path)
.append(absl::StrCat(prefix, "_FIFO1"));
const std::string pid_file_path =
std::filesystem::path(temp_dir_path).append("pid");
(void)std::filesystem::create_directory(temp_dir_path); // it may not exist.
for (int i = 0; i < 2; ++i) {
PCHECK(mkfifo(fifo_path_[i].c_str(), 0600) == 0)
<< VV(i) << VV(fifo_path_[i]);
}

const std::string command = absl::StrCat(
"CENTIPEDE_FORK_SERVER_FIFO0=", fifo_path_[0], kCommandLineSeparator,
"CENTIPEDE_FORK_SERVER_FIFO1=", fifo_path_[1], kCommandLineSeparator,
command_line_, " &");
LOG(INFO) << "Fork server command:\n" << command;
int ret = system(command.c_str());
CHECK_EQ(ret, 0) << "Failed to start fork server using command:\n" << command;
// NOTE: A background process does not return its exit status to the subshell,
// so failures will never propagate to the caller of `system()`. Instead, we
// save out the background process's PID to a file and use it later to assert
// that the process has started and is still running.
static constexpr std::string_view kForkServerCommandStub = R"sh(
set -eux
declare -r fifo0_f=%s
declare -r fifo1_f=%s
declare -r pid_f=%s
{
CENTIPEDE_FORK_SERVER_FIFO0="$fifo0_f" \
CENTIPEDE_FORK_SERVER_FIFO1="$fifo1_f" \
%s
} &
declare -ri pid=$!
echo -n "$pid" > "$pid_f"
)sh";
const std::string fork_server_command =
absl::StrFormat(kForkServerCommandStub, fifo_path_[0], fifo_path_[1],
pid_file_path, command_line_);
LOG(INFO) << "Fork server command:" << fork_server_command;

pipe_[0] = open(fifo_path_[0].c_str(), O_WRONLY);
pipe_[1] = open(fifo_path_[1].c_str(), O_RDONLY);
if (pipe_[0] < 0 || pipe_[1] < 0) {
LOG(INFO) << "Failed to establish communication with fork server; will "
"proceed without it";
const int exit_code = system(fork_server_command.c_str());

// Check if `system()` was able to parse and run the command at all.
if (exit_code != EXIT_SUCCESS) {
LOG(ERROR) << "Failed to parse or run command to launch fork server; will "
"proceed without it";
LogRedirectedStdoutAndStderr();
return false;
}

// The fork server is probably running now. However, one failure scenario is
// that it starts and exits early. Try opening the read/write comms pipes with
// it: if that fails, something is wrong.
// We use non-blocking I/O to open the pipes. That is good and safe, because:
// 1) This prevents the `open()` calls from hanging when the fork server fails
// to open the pipes on its side (note the use of O_RDWR, not O_WRONLY, to
// avoid ENXIO).
// 2) In `Command::Execute`, we wait for the return channel pipe with a
// `poll()`, so it should always have data when we attempt to `read()` from
// it.
// See more at
// https://www.gnu.org/software/libc/manual/html_node/Operating-Modes.html.
if ((pipe_[0] = open(fifo_path_[0].c_str(), O_RDWR | O_NONBLOCK)) < 0 ||
(pipe_[1] = open(fifo_path_[1].c_str(), O_RDONLY | O_NONBLOCK)) < 0) {
PLOG(ERROR) << "Failed to establish communication with fork server; will "
"proceed without it";
LogRedirectedStdoutAndStderr();
return false;
}

// The fork server has started and the comms pipes got opened successfully.
// Read the fork server's PID and the initial /proc/<PID>/exe symlink pointing
// at the fork server's binary, written to the provided files by `command`.
// `Execute()` uses these to monitor the fork server health.
std::string pid_str;
ReadFromLocalFile(pid_file_path, pid_str);
CHECK(absl::SimpleAtoi(pid_str, &fork_server_pid_)) << VV(pid_str);
std::string proc_exe = absl::StrFormat("/proc/%d/exe", fork_server_pid_);
CHECK_EQ(stat(proc_exe.c_str(), &fork_server_exe_stat_), EXIT_SUCCESS)
<< VV(proc_exe);

return true;
}

absl::Status Command::AssertForkServerIsHealthy() {
// Preconditions: the callers (`Execute()`) should call us only when the fork
// server is presumed to be running (`fork_server_pid_` >= 0). If it is, the
// comms pipes are guaranteed to be opened by `StartForkServer()`.
CHECK(fork_server_pid_ >= 0) << "Fork server wasn't started";
CHECK(pipe_[0] >= 0 && pipe_[1] >= 0) << "Didn't connect to fork server";

// A process with the fork server PID exists (_some_ process, possibly with a
// recycled PID)...
if (kill(fork_server_pid_, 0) != EXIT_SUCCESS) {
return absl::UnknownError(absl::StrCat(
"Can't communicate with fork server, PID=", fork_server_pid_));
}
// ...and it is a process with our expected binary, so it's practically
// guaranteed to be our original fork server process.
const std::string proc_exe =
absl::StrFormat("/proc/%d/exe", fork_server_pid_);
struct stat proc_exe_stat = {};
if (stat(proc_exe.c_str(), &proc_exe_stat) != EXIT_SUCCESS) {
return absl::UnknownError(absl::StrCat(
"Failed to stat fork server's /proc/<PID>/exe symlink, PID=",
fork_server_pid_));
}
if (proc_exe_stat.st_dev != fork_server_exe_stat_.st_dev ||
proc_exe_stat.st_ino != fork_server_exe_stat_.st_ino) {
return absl::UnknownError(absl::StrCat(
"Fork server's /proc/<PID>/exe symlink changed (new process?), PID=",
fork_server_pid_));
}
return absl::OkStatus();
}

Command::~Command() {
for (int i = 0; i < 2; ++i) {
if (pipe_[i] >= 0) CHECK_EQ(close(pipe_[i]), 0);
Expand All @@ -137,7 +222,15 @@ Command::~Command() {

int Command::Execute() {
int exit_code = 0;
if (pipe_[0] >= 0 && pipe_[1] >= 0) {

if (fork_server_pid_ >= 0) {
if (const auto status = AssertForkServerIsHealthy();
!AssertForkServerIsHealthy().ok()) {
LOG(ERROR) << "Fork server should be running, but isn't: " << status;
LogRedirectedStdoutAndStderr();
return EXIT_FAILURE;
}

// Wake up the fork server.
char x = ' ';
CHECK_EQ(1, write(pipe_[0], &x, 1));
Expand All @@ -164,19 +257,16 @@ int Command::Execute() {
if (poll_ret != 1 || (poll_fd.revents & POLLIN) == 0) {
// The fork server errored out or timed out, or some other error occurred,
// e.g. the syscall was interrupted.
std::string fork_server_log = "<not dumped>";
if (!out_.empty()) {
ReadFromLocalFile(out_, fork_server_log);
}
if (poll_ret == 0) {
LOG(FATAL) << "Timeout while waiting for fork server: " << VV(timeout_)
<< VV(fork_server_log) << VV(command_line_);
LOG(ERROR) << "Timeout while waiting for fork server: " << VV(timeout_)
<< VV(command_line_);
} else {
PLOG(FATAL) << "Error or interrupt while waiting for fork server: "
<< VV(poll_ret) << VV(poll_fd.revents)
<< VV(fork_server_log) << VV(command_line_);
PLOG(ERROR) << "Error or interrupt while waiting for fork server: "
<< VV(poll_ret) << VV(poll_fd.revents) << VV(command_line_);
}
__builtin_unreachable();
LogRedirectedStdoutAndStderr();
RequestEarlyExit(EXIT_FAILURE);
return EXIT_FAILURE;
}

// The fork server wrote the execution result to the pipe: read it.
Expand All @@ -185,10 +275,49 @@ int Command::Execute() {
// No fork server, use system().
exit_code = system(command_line_.c_str());
}
if (WIFSIGNALED(exit_code) && (WTERMSIG(exit_code) == SIGINT))
RequestEarlyExit(EXIT_FAILURE);
if (WIFEXITED(exit_code)) return WEXITSTATUS(exit_code);

if (WIFSIGNALED(exit_code) ||
(WIFEXITED(exit_code) && WEXITSTATUS(exit_code) != EXIT_SUCCESS)) {
LOG(ERROR) << "Target returned error or signaled";
LogRedirectedStdoutAndStderr();
if (WIFSIGNALED(exit_code)) {
if (WTERMSIG(exit_code) == SIGINT) RequestEarlyExit(EXIT_FAILURE);
}
if (WIFEXITED(exit_code)) {
exit_code = WEXITSTATUS(exit_code);
}
}

return exit_code;
}

std::string Command::ReadRedirectedStdout() const {
std::string ret;
if (!out_.empty()) ReadFromLocalFile(out_, ret);
return ret;
}

std::string Command::ReadRedirectedStderr() const {
std::string ret;
if (!err_.empty()) {
if (err_ != "2>&1" && err_ == out_)
ReadFromLocalFile(err_, ret);
else
ret = "<DUPED TO STDERR>";
}
return ret;
}

void Command::LogRedirectedStdoutAndStderr() const {
LOG(INFO).NoPrefix() << "=== STDOUT: ===";
for (const auto &line : absl::StrSplit(ReadRedirectedStdout(), '\n')) {
LOG(INFO).NoPrefix() << "STDOUT: " << line;
}
LOG(INFO).NoPrefix() << "=== STDERR: ===";
for (const auto &line : absl::StrSplit(ReadRedirectedStderr(), '\n')) {
LOG(INFO).NoPrefix() << "STDERR: " << line;
}
LOG(INFO).NoPrefix() << "======";
}

} // namespace centipede
27 changes: 26 additions & 1 deletion command.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
#ifndef THIRD_PARTY_CENTIPEDE_COMMAND_H_
#define THIRD_PARTY_CENTIPEDE_COMMAND_H_

#include <sys/stat.h>
#include <sys/types.h>

#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include "absl/status/status.h"
#include "absl/time/time.h"

namespace centipede {
Expand Down Expand Up @@ -88,6 +92,20 @@ class Command final {
const std::string& path() const { return path_; }

private:
// Returns true is the fork server process, previously started with
// `StartFormServer()`, is still running and is responsive.
absl::Status AssertForkServerIsHealthy();

// Reads and returns the stdout of the command, if redirected to a file. If
// not redirected, returns a placeholder text.
std::string ReadRedirectedStdout() const;
// Reads and returns the stderr of the command, if redirected to a file that
// is also different from the redirected stdout. If not redirected, returns a
// placeholder text.
std::string ReadRedirectedStderr() const;
// Logs the redirected stdout and stderr of the command in a readable format.
void LogRedirectedStdoutAndStderr() const;

const std::string path_;
const std::vector<std::string> args_;
const std::vector<std::string> env_;
Expand All @@ -96,9 +114,16 @@ class Command final {
const absl::Duration timeout_;
const std::string temp_file_path_;
const std::string command_line_ = ToString();
// Pipe paths and file descriptors for the fork server.
// Pipe paths and file descriptors.
std::string fifo_path_[2];
int pipe_[2] = {-1, -1};
// The PID of the fork server process. When this is >= 0, `Execute()` assumes
// that the fork server is running and the pipe are ready for comms.
pid_t fork_server_pid_ = -1;
// A `stat` of the fork server's binary right after it's started. Used to
// detect that the running process with `fork_server_pid_` is still the
// original fork server, not a PID recycled by the OS.
struct stat fork_server_exe_stat_ = {};
};

} // namespace centipede
Expand Down

0 comments on commit 68c592a

Please sign in to comment.