Skip to content

Commit

Permalink
Add Win32 implementation for GNUmakeTokenPool
Browse files Browse the repository at this point in the history
GNU make uses a semaphore as jobserver protocol on Win32. See also

   https://www.gnu.org/software/make/manual/html_node/Windows-Jobserver.html

Usage is pretty simple and straightforward, i.e. WaitForSingleObject()
to obtain a token and ReleaseSemaphore() to return it.

Unfortunately subprocess-win32.cc uses an I/O completion port (IOCP).
IOCPs aren't waitable objects, i.e. we can't use WaitForMultipleObjects()
to wait on the IOCP and the token semaphore at the same time.

Therefore GNUmakeTokenPoolWin32 creates a child thread that waits on the
token semaphore and posts a dummy I/O completion status on the IOCP when
it was able to obtain a token. That unblocks SubprocessSet::DoWork() and
it can then check if a token became available or not.

- split existing GNUmakeTokenPool into common and platform bits
- add GNUmakeTokenPool interface
- move the Posix bits to GNUmakeTokenPoolPosix
- add the Win32 bits as GNUmakeTokenPoolWin32
- move Setup() method up to TokenPool interface
- update Subprocess & TokenPool tests accordingly
  • Loading branch information
stefanb2 committed Feb 19, 2023
1 parent f99d747 commit 71ad7da
Show file tree
Hide file tree
Showing 11 changed files with 653 additions and 227 deletions.
8 changes: 5 additions & 3 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,12 +538,13 @@ def has_re2c():
'state',
'status',
'string_piece_util',
'tokenpool-gnu-make',
'util',
'version']:
objs += cxx(name, variables=cxxvariables)
if platform.is_windows():
for name in ['subprocess-win32',
'tokenpool-none',
'tokenpool-gnu-make-win32',
'includes_normalize-win32',
'msvc_helper-win32',
'msvc_helper_main-win32']:
Expand All @@ -552,8 +553,9 @@ def has_re2c():
objs += cxx('minidump-win32', variables=cxxvariables)
objs += cc('getopt')
else:
objs += cxx('subprocess-posix')
objs += cxx('tokenpool-gnu-make')
for name in ['subprocess-posix',
'tokenpool-gnu-make-posix']:
objs += cxx(name)
if platform.is_aix():
objs += cc('getopt')
if platform.is_msvc():
Expand Down
11 changes: 8 additions & 3 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,14 @@ struct RealCommandRunner : public CommandRunner {

RealCommandRunner::RealCommandRunner(const BuildConfig& config) : config_(config) {
max_load_average_ = config.max_load_average;
tokens_ = TokenPool::Get(config_.parallelism_from_cmdline,
config_.verbosity == BuildConfig::VERBOSE,
max_load_average_);
if ((tokens_ = TokenPool::Get()) != NULL) {
if (!tokens_->Setup(config_.parallelism_from_cmdline,
config_.verbosity == BuildConfig::VERBOSE,
max_load_average_)) {
delete tokens_;
tokens_ = NULL;
}
}
}

RealCommandRunner::~RealCommandRunner() {
Expand Down
9 changes: 9 additions & 0 deletions src/subprocess-win32.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "subprocess.h"
#include "tokenpool.h"

#include <assert.h>
#include <stdio.h>
Expand Down Expand Up @@ -256,6 +257,9 @@ bool SubprocessSet::DoWork(struct TokenPool* tokens) {
Subprocess* subproc;
OVERLAPPED* overlapped;

if (tokens)
tokens->WaitForTokenAvailability(ioport_);

if (!GetQueuedCompletionStatus(ioport_, &bytes_read, (PULONG_PTR)&subproc,
&overlapped, INFINITE)) {
if (GetLastError() != ERROR_BROKEN_PIPE)
Expand All @@ -266,6 +270,11 @@ bool SubprocessSet::DoWork(struct TokenPool* tokens) {
// delivered by NotifyInterrupted above.
return true;

if (tokens && tokens->TokenIsAvailable((ULONG_PTR)subproc)) {
token_available_ = true;
return false;
}

subproc->OnPipeReady();

if (subproc->Done()) {
Expand Down
34 changes: 30 additions & 4 deletions src/subprocess_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@ struct TokenPoolTest : public TokenPool {
void Reserve() {}
void Release() {}
void Clear() {}
bool Setup(bool ignore_unused, bool verbose, double& max_load_average) { return false; }

#ifdef _WIN32
// @TODO
bool _token_available;
void WaitForTokenAvailability(HANDLE ioport) {
if (_token_available)
// unblock GetQueuedCompletionStatus()
PostQueuedCompletionStatus(ioport, 0, (ULONG_PTR) this, NULL);
}
bool TokenIsAvailable(ULONG_PTR key) { return key == (ULONG_PTR) this; }
#else
int _fd;
int GetMonitorFd() { return _fd; }
Expand Down Expand Up @@ -297,34 +304,48 @@ TEST_F(SubprocessTest, ReadStdin) {
}
#endif // _WIN32

// @TODO: remove once TokenPool implementation for Windows is available
#ifndef _WIN32
TEST_F(SubprocessTest, TokenAvailable) {
Subprocess* subproc = subprocs_.Add(kSimpleCommand);
ASSERT_NE((Subprocess *) 0, subproc);

// simulate GNUmake jobserver pipe with 1 token
#ifdef _WIN32
tokens_._token_available = true;
#else
int fds[2];
ASSERT_EQ(0u, pipe(fds));
tokens_._fd = fds[0];
ASSERT_EQ(1u, write(fds[1], "T", 1));
#endif

subprocs_.ResetTokenAvailable();
subprocs_.DoWork(&tokens_);
#ifdef _WIN32
tokens_._token_available = false;
// we need to loop here as we have no conrol where the token
// I/O completion post ends up in the queue
while (!subproc->Done() && !subprocs_.IsTokenAvailable()) {
subprocs_.DoWork(&tokens_);
}
#endif

EXPECT_TRUE(subprocs_.IsTokenAvailable());
EXPECT_EQ(0u, subprocs_.finished_.size());

// remove token to let DoWork() wait for command again
#ifndef _WIN32
char token;
ASSERT_EQ(1u, read(fds[0], &token, 1));
#endif

while (!subproc->Done()) {
subprocs_.DoWork(&tokens_);
}

#ifndef _WIN32
close(fds[1]);
close(fds[0]);
#endif

EXPECT_EQ(ExitSuccess, subproc->Finish());
EXPECT_NE("", subproc->GetOutput());
Expand All @@ -337,22 +358,27 @@ TEST_F(SubprocessTest, TokenNotAvailable) {
ASSERT_NE((Subprocess *) 0, subproc);

// simulate GNUmake jobserver pipe with 0 tokens
#ifdef _WIN32
tokens_._token_available = false;
#else
int fds[2];
ASSERT_EQ(0u, pipe(fds));
tokens_._fd = fds[0];
#endif

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork(&tokens_);
}

#ifndef _WIN32
close(fds[1]);
close(fds[0]);
#endif

EXPECT_FALSE(subprocs_.IsTokenAvailable());
EXPECT_EQ(ExitSuccess, subproc->Finish());
EXPECT_NE("", subproc->GetOutput());

EXPECT_EQ(1u, subprocs_.finished_.size());
}
#endif // _WIN32
203 changes: 203 additions & 0 deletions src/tokenpool-gnu-make-posix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2016-2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "tokenpool-gnu-make.h"

#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

// TokenPool implementation for GNU make jobserver - POSIX implementation
// (http://make.mad-scientist.net/papers/jobserver-implementation/)
struct GNUmakeTokenPoolPosix : public GNUmakeTokenPool {
GNUmakeTokenPoolPosix();
virtual ~GNUmakeTokenPoolPosix();

virtual int GetMonitorFd();

virtual const char *GetEnv(const char *name) { return getenv(name); };
virtual bool ParseAuth(const char *jobserver);
virtual bool AcquireToken();
virtual bool ReturnToken();

private:
int rfd_;
int wfd_;

struct sigaction old_act_;
bool restore_;

static int dup_rfd_;
static void CloseDupRfd(int signum);

bool CheckFd(int fd);
bool SetAlarmHandler();
};

GNUmakeTokenPoolPosix::GNUmakeTokenPoolPosix() : rfd_(-1), wfd_(-1), restore_(false) {
}

GNUmakeTokenPoolPosix::~GNUmakeTokenPoolPosix() {
Clear();
if (restore_)
sigaction(SIGALRM, &old_act_, NULL);
}

bool GNUmakeTokenPoolPosix::CheckFd(int fd) {
if (fd < 0)
return false;
int ret = fcntl(fd, F_GETFD);
if (ret < 0)
return false;
return true;
}

int GNUmakeTokenPoolPosix::dup_rfd_ = -1;

void GNUmakeTokenPoolPosix::CloseDupRfd(int signum) {
close(dup_rfd_);
dup_rfd_ = -1;
}

bool GNUmakeTokenPoolPosix::SetAlarmHandler() {
struct sigaction act;
memset(&act, 0, sizeof(act));
act.sa_handler = CloseDupRfd;
if (sigaction(SIGALRM, &act, &old_act_) < 0) {
perror("sigaction:");
return(false);
} else {
restore_ = true;
return(true);
}
}

bool GNUmakeTokenPoolPosix::ParseAuth(const char *jobserver) {
int rfd = -1;
int wfd = -1;
if ((sscanf(jobserver, "%*[^=]=%d,%d", &rfd, &wfd) == 2) &&
CheckFd(rfd) &&
CheckFd(wfd) &&
SetAlarmHandler()) {
rfd_ = rfd;
wfd_ = wfd;
return true;
}

return false;
}

bool GNUmakeTokenPoolPosix::AcquireToken() {
// Please read
//
// http://make.mad-scientist.net/papers/jobserver-implementation/
//
// for the reasoning behind the following code.
//
// Try to read one character from the pipe. Returns true on success.
//
// First check if read() would succeed without blocking.
#ifdef USE_PPOLL
pollfd pollfds[] = {{rfd_, POLLIN, 0}};
int ret = poll(pollfds, 1, 0);
#else
fd_set set;
struct timeval timeout = { 0, 0 };
FD_ZERO(&set);
FD_SET(rfd_, &set);
int ret = select(rfd_ + 1, &set, NULL, NULL, &timeout);
#endif
if (ret > 0) {
// Handle potential race condition:
// - the above check succeeded, i.e. read() should not block
// - the character disappears before we call read()
//
// Create a duplicate of rfd_. The duplicate file descriptor dup_rfd_
// can safely be closed by signal handlers without affecting rfd_.
dup_rfd_ = dup(rfd_);

if (dup_rfd_ != -1) {
struct sigaction act, old_act;
int ret = 0;

// Temporarily replace SIGCHLD handler with our own
memset(&act, 0, sizeof(act));
act.sa_handler = CloseDupRfd;
if (sigaction(SIGCHLD, &act, &old_act) == 0) {
struct itimerval timeout;

// install a 100ms timeout that generates SIGALARM on expiration
memset(&timeout, 0, sizeof(timeout));
timeout.it_value.tv_usec = 100 * 1000; // [ms] -> [usec]
if (setitimer(ITIMER_REAL, &timeout, NULL) == 0) {
char buf;

// Now try to read() from dup_rfd_. Return values from read():
//
// 1. token read -> 1
// 2. pipe closed -> 0
// 3. alarm expires -> -1 (EINTR)
// 4. child exits -> -1 (EINTR)
// 5. alarm expired before entering read() -> -1 (EBADF)
// 6. child exited before entering read() -> -1 (EBADF)
// 7. child exited before handler is installed -> go to 1 - 3
ret = read(dup_rfd_, &buf, 1);

// disarm timer
memset(&timeout, 0, sizeof(timeout));
setitimer(ITIMER_REAL, &timeout, NULL);
}

sigaction(SIGCHLD, &old_act, NULL);
}

CloseDupRfd(0);

// Case 1 from above list
if (ret > 0)
return true;
}
}

// read() would block, i.e. no token available,
// cases 2-6 from above list or
// select() / poll() / dup() / sigaction() / setitimer() failed
return false;
}

bool GNUmakeTokenPoolPosix::ReturnToken() {
const char buf = '+';
while (1) {
int ret = write(wfd_, &buf, 1);
if (ret > 0)
return true;
if ((ret != -1) || (errno != EINTR))
return false;
// write got interrupted - retry
}
}

int GNUmakeTokenPoolPosix::GetMonitorFd() {
return(rfd_);
}

struct TokenPool *TokenPool::Get() {
return new GNUmakeTokenPoolPosix;
}
Loading

0 comments on commit 71ad7da

Please sign in to comment.