Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE reproducer of concurrency bug in akka 100-continue handling #41

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import sbtprotobuf.{ProtobufPlugin=>PB}
lazy val projectSettings = PB.protobufSettings ++ Seq(
licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))),
organization := "com.tradeshift",
version := "0.0.16",
version := "0.0.17-SNAPSHOT",
scalaVersion := "2.11.8",
publishMavenStyle := true,
javacOptions ++= Seq("-source", "1.8"),
Expand All @@ -32,6 +32,7 @@ lazy val commonSettings = projectSettings ++ Seq(
libraryDependencies ++= {
val akkaVersion = "2.4.10"
val kamonVersion = "0.6.2"
val httpVersion = "3.0.0-SNAPSHOT"

Seq(
"com.google.guava" % "guava" % "18.0",
Expand All @@ -41,11 +42,11 @@ lazy val commonSettings = projectSettings ++ Seq(
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-http-core" % akkaVersion,
"com.typesafe.akka" %% "akka-http-jackson-experimental" % akkaVersion,
"com.typesafe.akka" % "akka-http" % httpVersion,
"com.typesafe.akka" % "akka-http-core" % httpVersion,
"com.typesafe.akka" % "akka-http-jackson" % httpVersion,
"com.typesafe.akka" %% "akka-persistence-query-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test",
"com.typesafe.akka" % "akka-http-testkit" % httpVersion % "test",
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.18",
"com.readytalk" % "metrics3-statsd" % "4.1.0", // to log cassandra (codahale / dropwizard) metrics into statsd
"io.kamon" %% "kamon-core" % kamonVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.tradeshift.reaktive.testkit;

import static akka.http.javadsl.server.Directives.complete;
import static akka.http.javadsl.server.Directives.extractRequest;
import static akka.http.javadsl.server.Directives.onSuccess;
import static akka.http.javadsl.server.Directives.post;
import static org.forgerock.cuppa.Cuppa.describe;
import static org.forgerock.cuppa.Cuppa.it;
import static org.forgerock.cuppa.Cuppa.when;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.forgerock.cuppa.junit.CuppaRunner;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import com.tradeshift.reaktive.marshal.stream.AaltoReader;
import com.tradeshift.reaktive.marshal.stream.StaxWriter;

import akka.Done;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.server.Route;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

@RunWith(CuppaRunner.class)
public class HttpMinimalFailureSpec extends HttpIntegrationSpec {

private final Sink<ByteString,CompletionStage<Long>> counter =
Flow.of(ByteString.class)
.toMat(Sink.fold(0l, (count, b) -> count + b.size()), (m1,m2) -> m2);

/**
* Returns a sink that immediately fails asynchronously when materialized.
* This is a somewhat convoluted example, but it's used in HttpFlow to allow us to expose an HTTP upload target as a Sink.
**/
Sink<ByteString, CompletionStage<Done>> failingSink() {
Sink<ByteString, Publisher<ByteString>> in = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT);
Source<ByteString, Subscriber<ByteString>> out = Source.asSubscriber();

return Flow.fromSinkAndSourceMat(in, out, Keep.both()).mapMaterializedValue(pair -> {
return CompletableFuture.<Done>supplyAsync(() -> {
RuntimeException x = new RuntimeException("Simulated failure");
pair.second().onError(x);
throw x;
});
}).toMat(Sink.ignore(), (m1,m2) -> m1);
}

CompletionStage<Done> failImmediate(HttpEntity entity) {
Sink<ByteString, CompletionStage<Done>> sink = failingSink();

return entity.getDataBytes()
.log("data")
.alsoToMat(counter, (m1,m2) -> m2)
.via(AaltoReader.instance)
.log("counted")
.via(StaxWriter.flow())
.<CompletionStage<Done>, CompletionStage<Done>>toMat(sink, (m1,m2) -> m2)
.run(materializer);
}

{
describe("A route that parses its request as XML, re-serializes it, and then tries to write it to a non-open HTTP port", () -> {
final Route route = post(() ->
extractRequest(request -> {
System.out.println(request);
return onSuccess(() -> failImmediate(request.entity()), result ->
complete(StatusCodes.NO_CONTENT)
);
})
);

when("Uploading a rather large stream", () -> {
it("should complete without logging extra errors", () -> {
serve(route, client -> {
int port = Integer.parseInt(client.baseUrl.substring(client.baseUrl.lastIndexOf(':') + 1));
Socket s = new Socket("localhost", port);
OutputStream out = s.getOutputStream();
out.write((
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"User-Agent: curl/7.50.3\r\n" +
"Accept: */*\r\n" +
"Content-Type:application/octet-stream\r\n" +
"Content-Length: 1000000\r\n" +
"Expect: 100-continue\r\n" +
"\r\n").getBytes());
out.flush();

InputStream in = s.getInputStream();
int length;
byte[] buf = new byte[256*256];
while ((length = in.read(buf)) != -1) {
System.out.print(new String(buf, 0, length));
}

// Console logs the 100-Continue response and then the 500-Internal server error, but ALSO the following:
/*
11:34:44,790 ERROR [a.s.Materializer] akka.stream.Log(akka://http-integration-spec/user/StreamSupervisor-0) - [timed] Upstream failed.
java.lang.IllegalArgumentException: requirement failed: Cannot pull port (requestParsingIn) twice
at scala.Predef$.require(Predef.scala:224)
at akka.stream.stage.GraphStageLogic.pull(GraphStage.scala:355)
at akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage$$anon$12$$anon$15.onPush(HttpServerBluePrint.scala:432)
*/
});
});
});
});
}
}
5 changes: 5 additions & 0 deletions ts-reaktive-marshal-akka/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
akka {
loglevel = "DEBUG"
loggers = ["akka.event.slf4j.Slf4jLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
24 changes: 24 additions & 0 deletions ts-reaktive-marshal-akka/src/test/resources/log4j.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration>
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<param name="Threshold" value="DEBUG" />
<layout class="org.apache.log4j.EnhancedPatternLayout">
<param name="ConversionPattern" value="%d{ABSOLUTE} %-5p [%c{1.}] %X{akkaSource} - %m%n" />
</layout>
</appender>

<logger name="org.apache.cassandra"><level value="WARN" /></logger>
<logger name="com.datastax.driver"><level value="WARN" /></logger>
<logger name="io.netty"><level value="WARN" /></logger>
<logger name="Sigar"><level value="WARN"/></logger>
<logger name="org.apache.http"><level value="WARN"/></logger>
<logger name="org.mortbay"><level value="WARN"/></logger>
<logger name="akka"><level value="DEBUG"/></logger>

<root>
<priority value="DEBUG" />
<appender-ref ref="CONSOLE" />
</root>
</log4j:configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* error scenarios.
*/
public abstract class HttpIntegrationSpec {
public static final Config config = ConfigFactory.defaultReference();
public static final Config config = ConfigFactory.defaultApplication();
public static final ActorSystem system = ActorSystem.create("http-integration-spec", config);
public static final Materializer materializer = ActorMaterializer.create(system);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpEntity.Strict;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.MediaRanges;
import akka.http.javadsl.model.MediaTypes;
import akka.http.javadsl.model.headers.Accept;
import akka.http.javadsl.model.headers.RawHeader;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

/**
Expand All @@ -29,7 +32,8 @@
public class TestHttpClient {
private final ActorSystem system;
private final Materializer materializer;
private final String baseUrl;

protected final String baseUrl;

private long timeout = 10000;

Expand Down Expand Up @@ -57,6 +61,13 @@ public HttpResponse tryPostJSON(String path, String content) {
.withEntity(ContentTypes.APPLICATION_JSON, content.getBytes(Charset.forName("UTF-8"))));
}

public HttpResponse tryPostSource(String path, Source<ByteString,?> data) {
return send(HttpRequest
.POST(getHttpUri(path))
.addHeader(RawHeader.create("Expect", "100-continue"))
.withEntity(HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, data)));
}

public void putXML(String path, String content) {
perform(HttpRequest
.PUT(getHttpUri(path))
Expand Down