Skip to content

Commit

Permalink
Process may exit, but pipe buffer may still have
Browse files Browse the repository at this point in the history
output buffered (JDK or OS), so having exit code
is not necessarily meaning we are done, we need
to wait for streams as well.
  • Loading branch information
cstamas committed Feb 4, 2025
1 parent 61efc04 commit ed2c48b
Showing 1 changed file with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

import org.apache.maven.api.annotations.Nullable;
import org.apache.maven.api.cli.Executor;
Expand Down Expand Up @@ -99,15 +100,17 @@ public String mavenVersion(ExecutorRequest executorRequest) throws ExecutorExcep
protected void validate(ExecutorRequest executorRequest) throws ExecutorException {}

@Nullable
protected Consumer<Process> wrapRequestStandardStreams(ExecutorRequest executorRequest) {
protected Function<Process, CountDownLatch> wrapRequestStandardStreams(ExecutorRequest executorRequest) {
if (executorRequest.stdIn().isEmpty()
&& executorRequest.stdOut().isEmpty()
&& executorRequest.stdErr().isEmpty()) {
return null;
} else {
return p -> {
CountDownLatch latch = new CountDownLatch(3);
String suffix = "-pump-" + p.pid();
executorRequest.stdOut().ifPresent(stdout -> {
if (executorRequest.stdOut().isPresent()) {
OutputStream stdout = executorRequest.stdOut().orElseThrow();
Thread pump = new Thread(() -> {
try (InputStream out = p.getInputStream()) {
int c;
Expand All @@ -117,13 +120,18 @@ protected Consumer<Process> wrapRequestStandardStreams(ExecutorRequest executorR
stdout.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
latch.countDown();
}
});
pump.setName("stdout" + suffix);
pump.setDaemon(true);
pump.start();
});
executorRequest.stdErr().ifPresent(stderr -> {
} else {
latch.countDown();
}
if (executorRequest.stdErr().isPresent()) {
OutputStream stderr = executorRequest.stdErr().orElseThrow();
Thread pump = new Thread(() -> {
try (InputStream err = p.getErrorStream()) {
int c;
Expand All @@ -133,13 +141,18 @@ protected Consumer<Process> wrapRequestStandardStreams(ExecutorRequest executorR
stderr.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
latch.countDown();
}
});
pump.setName("stderr" + suffix);
pump.setDaemon(true);
pump.start();
});
executorRequest.stdIn().ifPresent(stdin -> {
} else {
latch.countDown();
}
if (executorRequest.stdIn().isPresent()) {
InputStream stdin = executorRequest.stdIn().orElseThrow();
Thread pump = new Thread(() -> {
try (OutputStream in = p.getOutputStream()) {
int c;
Expand All @@ -149,17 +162,22 @@ protected Consumer<Process> wrapRequestStandardStreams(ExecutorRequest executorR
in.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
latch.countDown();
}
});
pump.setName("stdin" + suffix);
pump.setDaemon(true);
pump.start();
});
} else {
latch.countDown();
}
return latch;
};
}
}

protected int doExecute(ExecutorRequest executorRequest, Consumer<Process> processConsumer)
protected int doExecute(ExecutorRequest executorRequest, Function<Process, CountDownLatch> processConsumer)
throws ExecutorException {
ArrayList<String> cmdAndArguments = new ArrayList<>();
cmdAndArguments.add(executorRequest
Expand Down Expand Up @@ -212,11 +230,14 @@ protected int doExecute(ExecutorRequest executorRequest, Consumer<Process> proce
pb.environment().putAll(env);
}

CountDownLatch latch = new CountDownLatch(0);
Process process = pb.start();
if (processConsumer != null) {
processConsumer.accept(process);
latch = processConsumer.apply(process);
}
return process.waitFor();
int exitCode = process.waitFor();
latch.await();
return exitCode;
} catch (IOException e) {
throw new ExecutorException("IO problem while executing command: " + cmdAndArguments, e);
} catch (InterruptedException e) {
Expand Down

0 comments on commit ed2c48b

Please sign in to comment.