diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala index 2d84893f..a6a2346a 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala @@ -17,7 +17,7 @@ import org.apache.pekko import pekko.actor.ActorSystem import pekko.grpc.internal.{ GrpcProtocolNative, GrpcRequestHelpers, Identity } import pekko.grpc.scaladsl.headers.`Status` -import pekko.http.scaladsl.model.{ AttributeKeys, HttpEntity, HttpRequest, HttpResponse } +import pekko.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } import pekko.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk, Strict } import pekko.grpc.GrpcProtocol import pekko.stream.scaladsl.{ Sink, Source } @@ -58,7 +58,7 @@ class GrpcExceptionDefaultHandleSpec case Seq(LastChunk("", List(`Status`("3")))) => // ok } case _: Strict => - response.attribute(AttributeKeys.trailer).get.headers.contains("grpc-status" -> "3") + response.headers.find(_.is("grpc-status")).map(_.value()) shouldBe Some("3") case other => fail(s"Unexpected [$other]") } @@ -131,14 +131,18 @@ class GrpcExceptionDefaultHandleSpec val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue - val lastChunk = reply.entity.asInstanceOf[Chunked].chunks.runWith(Sink.last).futureValue.asInstanceOf[LastChunk] + reply.entity shouldBe a[Strict] + val strict = reply.entity.asInstanceOf[Strict] + strict.contentType.mediaType.toString shouldBe "application/grpc+proto" + strict.dataBytes.runFold(ByteString.empty)(_ ++ _).futureValue.seq.isEmpty shouldBe true + // Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md - val statusHeader = lastChunk.trailer.find { _.name == "grpc-status" } + val statusHeader = reply.headers.find(_.is("grpc-status")) statusHeader.map(_.value()) should be(Some("3")) - val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" } + val statusMessageHeader = reply.headers.find(_.is("grpc-message")) statusMessageHeader.map(_.value()) should be(Some("No name found")) - val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer) + val metadata = MetadataBuilder.fromHeaders(reply.headers) metadata.getText("test-text") should be(Some("test-text-data")) metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data"))) } diff --git a/project/PekkoHttpDependency.scala b/project/PekkoHttpDependency.scala index f7941bb8..d89422ad 100644 --- a/project/PekkoHttpDependency.scala +++ b/project/PekkoHttpDependency.scala @@ -22,5 +22,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency object PekkoHttpDependency extends PekkoDependency { override val checkProject: String = "pekko-http-testkit" override val module: Option[String] = Some("http") - override val currentVersion: String = "1.1.0" + override val currentVersion: String = "1.1.0+13-750e8f96-SNAPSHOT" } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala index e129b5b3..aa75ead0 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala @@ -112,6 +112,7 @@ private final class PekkoNettyGrpcClientGraphStage[I, O]( override def onMessage(message: O): Unit = callback.invoke(message) override def onClose(status: Status, trailers: Metadata): Unit = { + if (!matVal.isCompleted) onHeaders(trailers) trailerPromise.success(trailers) callback.invoke(Closed(status, trailers)) }