From ed2c48b07fe44e43d55ce21f9f470764a69d0678 Mon Sep 17 00:00:00 2001 From: Tamas Cservenak Date: Tue, 4 Feb 2025 12:30:43 +0100 Subject: [PATCH] Process may exit, but pipe buffer may still have output buffered (JDK or OS), so having exit code is not necessarily meaning we are done, we need to wait for streams as well. --- .../executor/forked/ForkedMavenExecutor.java | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/impl/maven-executor/src/main/java/org/apache/maven/cling/executor/forked/ForkedMavenExecutor.java b/impl/maven-executor/src/main/java/org/apache/maven/cling/executor/forked/ForkedMavenExecutor.java index ea0f4b00cc1..254dfd7140a 100644 --- a/impl/maven-executor/src/main/java/org/apache/maven/cling/executor/forked/ForkedMavenExecutor.java +++ b/impl/maven-executor/src/main/java/org/apache/maven/cling/executor/forked/ForkedMavenExecutor.java @@ -30,7 +30,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.function.Consumer; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; import org.apache.maven.api.annotations.Nullable; import org.apache.maven.api.cli.Executor; @@ -99,15 +100,17 @@ public String mavenVersion(ExecutorRequest executorRequest) throws ExecutorExcep protected void validate(ExecutorRequest executorRequest) throws ExecutorException {} @Nullable - protected Consumer wrapRequestStandardStreams(ExecutorRequest executorRequest) { + protected Function wrapRequestStandardStreams(ExecutorRequest executorRequest) { if (executorRequest.stdIn().isEmpty() && executorRequest.stdOut().isEmpty() && executorRequest.stdErr().isEmpty()) { return null; } else { return p -> { + CountDownLatch latch = new CountDownLatch(3); String suffix = "-pump-" + p.pid(); - executorRequest.stdOut().ifPresent(stdout -> { + if (executorRequest.stdOut().isPresent()) { + OutputStream stdout = executorRequest.stdOut().orElseThrow(); Thread pump = new Thread(() -> { try (InputStream out = p.getInputStream()) { int c; @@ -117,13 +120,18 @@ protected Consumer wrapRequestStandardStreams(ExecutorRequest executorR stdout.flush(); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + latch.countDown(); } }); pump.setName("stdout" + suffix); pump.setDaemon(true); pump.start(); - }); - executorRequest.stdErr().ifPresent(stderr -> { + } else { + latch.countDown(); + } + if (executorRequest.stdErr().isPresent()) { + OutputStream stderr = executorRequest.stdErr().orElseThrow(); Thread pump = new Thread(() -> { try (InputStream err = p.getErrorStream()) { int c; @@ -133,13 +141,18 @@ protected Consumer wrapRequestStandardStreams(ExecutorRequest executorR stderr.flush(); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + latch.countDown(); } }); pump.setName("stderr" + suffix); pump.setDaemon(true); pump.start(); - }); - executorRequest.stdIn().ifPresent(stdin -> { + } else { + latch.countDown(); + } + if (executorRequest.stdIn().isPresent()) { + InputStream stdin = executorRequest.stdIn().orElseThrow(); Thread pump = new Thread(() -> { try (OutputStream in = p.getOutputStream()) { int c; @@ -149,17 +162,22 @@ protected Consumer wrapRequestStandardStreams(ExecutorRequest executorR in.flush(); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + latch.countDown(); } }); pump.setName("stdin" + suffix); pump.setDaemon(true); pump.start(); - }); + } else { + latch.countDown(); + } + return latch; }; } } - protected int doExecute(ExecutorRequest executorRequest, Consumer processConsumer) + protected int doExecute(ExecutorRequest executorRequest, Function processConsumer) throws ExecutorException { ArrayList cmdAndArguments = new ArrayList<>(); cmdAndArguments.add(executorRequest @@ -212,11 +230,14 @@ protected int doExecute(ExecutorRequest executorRequest, Consumer proce pb.environment().putAll(env); } + CountDownLatch latch = new CountDownLatch(0); Process process = pb.start(); if (processConsumer != null) { - processConsumer.accept(process); + latch = processConsumer.apply(process); } - return process.waitFor(); + int exitCode = process.waitFor(); + latch.await(); + return exitCode; } catch (IOException e) { throw new ExecutorException("IO problem while executing command: " + cmdAndArguments, e); } catch (InterruptedException e) {