From 67d3bc255473016a26e68114671e7a5aab2f3127 Mon Sep 17 00:00:00 2001 From: Jan Ypma Date: Mon, 31 Oct 2016 11:57:36 +0100 Subject: [PATCH 1/2] DO NOT MERGE reproducer of concurrency bug in akka 100-continue handling --- .../testkit/HttpMinimalFailureSpec.java | 121 ++++++++++++++++++ .../src/test/resources/application.conf | 5 + .../src/test/resources/log4j.xml | 24 ++++ .../reaktive/testkit/HttpIntegrationSpec.java | 2 +- .../reaktive/testkit/TestHttpClient.java | 13 +- 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 ts-reaktive-marshal-akka/src/test/java/com/tradeshift/reaktive/testkit/HttpMinimalFailureSpec.java create mode 100644 ts-reaktive-marshal-akka/src/test/resources/application.conf create mode 100644 ts-reaktive-marshal-akka/src/test/resources/log4j.xml diff --git a/ts-reaktive-marshal-akka/src/test/java/com/tradeshift/reaktive/testkit/HttpMinimalFailureSpec.java b/ts-reaktive-marshal-akka/src/test/java/com/tradeshift/reaktive/testkit/HttpMinimalFailureSpec.java new file mode 100644 index 00000000..c3f13e80 --- /dev/null +++ b/ts-reaktive-marshal-akka/src/test/java/com/tradeshift/reaktive/testkit/HttpMinimalFailureSpec.java @@ -0,0 +1,121 @@ +package com.tradeshift.reaktive.testkit; + +import static akka.http.javadsl.server.Directives.complete; +import static akka.http.javadsl.server.Directives.extractRequest; +import static akka.http.javadsl.server.Directives.onSuccess; +import static akka.http.javadsl.server.Directives.post; +import static org.forgerock.cuppa.Cuppa.describe; +import static org.forgerock.cuppa.Cuppa.it; +import static org.forgerock.cuppa.Cuppa.when; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.forgerock.cuppa.junit.CuppaRunner; +import org.junit.runner.RunWith; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import com.tradeshift.reaktive.marshal.stream.AaltoReader; +import com.tradeshift.reaktive.marshal.stream.StaxWriter; + +import akka.Done; +import akka.http.javadsl.model.HttpEntity; +import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.server.Route; +import akka.stream.javadsl.AsPublisher; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; + +@RunWith(CuppaRunner.class) +public class HttpMinimalFailureSpec extends HttpIntegrationSpec { + + private final Sink> counter = + Flow.of(ByteString.class) + .toMat(Sink.fold(0l, (count, b) -> count + b.size()), (m1,m2) -> m2); + + /** + * Returns a sink that immediately fails asynchronously when materialized. + * This is a somewhat convoluted example, but it's used in HttpFlow to allow us to expose an HTTP upload target as a Sink. + **/ + Sink> failingSink() { + Sink> in = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT); + Source> out = Source.asSubscriber(); + + return Flow.fromSinkAndSourceMat(in, out, Keep.both()).mapMaterializedValue(pair -> { + return CompletableFuture.supplyAsync(() -> { + RuntimeException x = new RuntimeException("Simulated failure"); + pair.second().onError(x); + throw x; + }); + }).toMat(Sink.ignore(), (m1,m2) -> m1); + } + + CompletionStage failImmediate(HttpEntity entity) { + Sink> sink = failingSink(); + + return entity.getDataBytes() + .log("data") + .alsoToMat(counter, (m1,m2) -> m2) + .via(AaltoReader.instance) + .log("counted") + .via(StaxWriter.flow()) + ., CompletionStage>toMat(sink, (m1,m2) -> m2) + .run(materializer); + } + + { + describe("A route that parses its request as XML, re-serializes it, and then tries to write it to a non-open HTTP port", () -> { + final Route route = post(() -> + extractRequest(request -> { + System.out.println(request); + return onSuccess(() -> failImmediate(request.entity()), result -> + complete(StatusCodes.NO_CONTENT) + ); + }) + ); + + when("Uploading a rather large stream", () -> { + it("should complete without logging extra errors", () -> { + serve(route, client -> { + int port = Integer.parseInt(client.baseUrl.substring(client.baseUrl.lastIndexOf(':') + 1)); + Socket s = new Socket("localhost", port); + OutputStream out = s.getOutputStream(); + out.write(( + "POST / HTTP/1.1\r\n" + + "Host: localhost:" + port + "\r\n" + + "User-Agent: curl/7.50.3\r\n" + + "Accept: */*\r\n" + + "Content-Type:application/octet-stream\r\n" + + "Content-Length: 1000000\r\n" + + "Expect: 100-continue\r\n" + + "\r\n").getBytes()); + out.flush(); + + InputStream in = s.getInputStream(); + int length; + byte[] buf = new byte[256*256]; + while ((length = in.read(buf)) != -1) { + System.out.print(new String(buf, 0, length)); + } + + // Console logs the 100-Continue response and then the 500-Internal server error, but ALSO the following: + /* +11:34:44,790 ERROR [a.s.Materializer] akka.stream.Log(akka://http-integration-spec/user/StreamSupervisor-0) - [timed] Upstream failed. +java.lang.IllegalArgumentException: requirement failed: Cannot pull port (requestParsingIn) twice + at scala.Predef$.require(Predef.scala:224) + at akka.stream.stage.GraphStageLogic.pull(GraphStage.scala:355) + at akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage$$anon$12$$anon$15.onPush(HttpServerBluePrint.scala:432) + */ + }); + }); + }); + }); + } +} diff --git a/ts-reaktive-marshal-akka/src/test/resources/application.conf b/ts-reaktive-marshal-akka/src/test/resources/application.conf new file mode 100644 index 00000000..7c28a0e7 --- /dev/null +++ b/ts-reaktive-marshal-akka/src/test/resources/application.conf @@ -0,0 +1,5 @@ +akka { + loglevel = "DEBUG" + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" +} \ No newline at end of file diff --git a/ts-reaktive-marshal-akka/src/test/resources/log4j.xml b/ts-reaktive-marshal-akka/src/test/resources/log4j.xml new file mode 100644 index 00000000..4796d677 --- /dev/null +++ b/ts-reaktive-marshal-akka/src/test/resources/log4j.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/HttpIntegrationSpec.java b/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/HttpIntegrationSpec.java index b9fb189c..a536dfbe 100644 --- a/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/HttpIntegrationSpec.java +++ b/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/HttpIntegrationSpec.java @@ -22,7 +22,7 @@ * error scenarios. */ public abstract class HttpIntegrationSpec { - public static final Config config = ConfigFactory.defaultReference(); + public static final Config config = ConfigFactory.defaultApplication(); public static final ActorSystem system = ActorSystem.create("http-integration-spec", config); public static final Materializer materializer = ActorMaterializer.create(system); diff --git a/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/TestHttpClient.java b/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/TestHttpClient.java index fc62d9bf..5ed1992b 100644 --- a/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/TestHttpClient.java +++ b/ts-reaktive-testkit/src/main/java/com/tradeshift/reaktive/testkit/TestHttpClient.java @@ -13,6 +13,7 @@ import akka.actor.ActorSystem; import akka.http.javadsl.Http; import akka.http.javadsl.model.ContentTypes; +import akka.http.javadsl.model.HttpEntities; import akka.http.javadsl.model.HttpEntity; import akka.http.javadsl.model.HttpEntity.Strict; import akka.http.javadsl.model.HttpRequest; @@ -20,7 +21,9 @@ import akka.http.javadsl.model.MediaRanges; import akka.http.javadsl.model.MediaTypes; import akka.http.javadsl.model.headers.Accept; +import akka.http.javadsl.model.headers.RawHeader; import akka.stream.Materializer; +import akka.stream.javadsl.Source; import akka.util.ByteString; /** @@ -29,7 +32,8 @@ public class TestHttpClient { private final ActorSystem system; private final Materializer materializer; - private final String baseUrl; + + protected final String baseUrl; private long timeout = 10000; @@ -57,6 +61,13 @@ public HttpResponse tryPostJSON(String path, String content) { .withEntity(ContentTypes.APPLICATION_JSON, content.getBytes(Charset.forName("UTF-8")))); } + public HttpResponse tryPostSource(String path, Source data) { + return send(HttpRequest + .POST(getHttpUri(path)) + .addHeader(RawHeader.create("Expect", "100-continue")) + .withEntity(HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, data))); + } + public void putXML(String path, String content) { perform(HttpRequest .PUT(getHttpUri(path)) From e14f69c5d0e2232523d19a6325d9d841978b36dc Mon Sep 17 00:00:00 2001 From: Jan Ypma Date: Mon, 31 Oct 2016 13:47:02 +0100 Subject: [PATCH 2/2] Depend on akka snapshot for further testing --- build.sbt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index a16acc8f..2624fb50 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ import sbtprotobuf.{ProtobufPlugin=>PB} lazy val projectSettings = PB.protobufSettings ++ Seq( licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))), organization := "com.tradeshift", - version := "0.0.16", + version := "0.0.17-SNAPSHOT", scalaVersion := "2.11.8", publishMavenStyle := true, javacOptions ++= Seq("-source", "1.8"), @@ -32,6 +32,7 @@ lazy val commonSettings = projectSettings ++ Seq( libraryDependencies ++= { val akkaVersion = "2.4.10" val kamonVersion = "0.6.2" + val httpVersion = "3.0.0-SNAPSHOT" Seq( "com.google.guava" % "guava" % "18.0", @@ -41,11 +42,11 @@ lazy val commonSettings = projectSettings ++ Seq( "com.typesafe.akka" %% "akka-persistence" % akkaVersion, "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, "com.typesafe.akka" %% "akka-stream" % akkaVersion, - "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion, - "com.typesafe.akka" %% "akka-http-core" % akkaVersion, - "com.typesafe.akka" %% "akka-http-jackson-experimental" % akkaVersion, + "com.typesafe.akka" % "akka-http" % httpVersion, + "com.typesafe.akka" % "akka-http-core" % httpVersion, + "com.typesafe.akka" % "akka-http-jackson" % httpVersion, "com.typesafe.akka" %% "akka-persistence-query-experimental" % akkaVersion, - "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test", + "com.typesafe.akka" % "akka-http-testkit" % httpVersion % "test", "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.18", "com.readytalk" % "metrics3-statsd" % "4.1.0", // to log cassandra (codahale / dropwizard) metrics into statsd "io.kamon" %% "kamon-core" % kamonVersion,