Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

share trailers through matval #303

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.grpc.internal

import io.grpc._
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
Expand All @@ -21,15 +22,15 @@ import pekko.stream
import pekko.stream.{ Attributes => _, _ }
import pekko.stream.stage._
import pekko.util.FutureConverters._
import io.grpc._

import scala.concurrent.{ Future, Promise }
import scala.util.Success

@InternalApi
private object PekkoNettyGrpcClientGraphStage {
sealed trait ControlMessage
case object ReadyForSending extends ControlMessage
case class Closed(status: Status, trailer: Metadata) extends ControlMessage
case class Closed(status: Status) extends ControlMessage
}

/**
Expand Down Expand Up @@ -64,8 +65,6 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
inheritedAttributes: stream.Attributes): (GraphStageLogic, Future[GrpcResponseMetadata]) = {
import PekkoNettyGrpcClientGraphStage._
val matVal = Promise[GrpcResponseMetadata]()
val trailerPromise = Promise[Metadata]()

val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
// this is here just to fail single response requests getting more responses
// duplicating behavior in io.grpc.stub.ClientCalls
Expand All @@ -76,8 +75,8 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
val callback = getAsyncCallback[Any] {
case msg: ControlMessage =>
msg match {
case ReadyForSending => if (!isClosed(in) && !hasBeenPulled(in)) tryPull(in)
case Closed(status, trailer) => onCallClosed(status, trailer)
case ReadyForSending => if (!isClosed(in) && !hasBeenPulled(in)) tryPull(in)
case Closed(status) => onCallClosed(status)
}
case element: O @unchecked =>
if (!streamingResponse) {
Expand All @@ -94,29 +93,34 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
val listener = new ClientCall.Listener[O] {
override def onReady(): Unit =
callback.invoke(ReadyForSending)
override def onHeaders(responseHeaders: Metadata): Unit =

override def onHeaders(responseHeaders: Metadata): Unit = {
matVal.success(new GrpcResponseMetadata {
private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(responseHeaders)
private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(responseHeaders)
def headers = sMetadata
def getHeaders() = jMetadata

private lazy val sTrailers =
trailerPromise.future.map(MetadataImpl.scalaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
private lazy val jTrailers = trailerPromise.future
.map(MetadataImpl.javaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
.asJava
private lazy val sTrailers = Future.successful(sMetadata)
private lazy val jTrailers = Future.successful(jMetadata).asJava
def trailers = sTrailers
def getTrailers() = jTrailers
})
}

override def onMessage(message: O): Unit =
callback.invoke(message)

override def onClose(status: Status, trailers: Metadata): Unit = {
trailerPromise.success(trailers)
callback.invoke(Closed(status, trailers))
if (!matVal.isCompleted) {
onHeaders(trailers)
}
callback.invoke(Closed(status))
}
}

override def preStart(): Unit = {

call = channel.newCall(descriptor, options)
call.start(listener, headers.toGoogleGrpcMetadata())

Expand All @@ -134,12 +138,14 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
// request so pull early to get things going
pull(in)
}

override def onPush(): Unit = {
call.sendMessage(grab(in))
if (call.isReady && !hasBeenPulled(in)) {
pull(in)
}
}

override def onUpstreamFinish(): Unit = {
call.halfClose()
if (isClosed(out)) {
Expand All @@ -148,6 +154,7 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
completeStage()
}
}

override def onUpstreamFailure(ex: Throwable): Unit = {
call.cancel("Failure from upstream", ex)
call = null
Expand All @@ -159,19 +166,22 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
call.request(1)
requested += 1
}

override def onDownstreamFinish(cause: Throwable): Unit =
if (isClosed(out)) {
call.cancel("Downstream cancelled", cause)
call = null
completeStage()
}

def onCallClosed(status: Status, trailers: Metadata): Unit = {
def onCallClosed(status: Status): Unit = {
if (status.isOk()) {
// FIXME share trailers through matval
completeStage()
} else {
failStage(status.asRuntimeException(trailers))
matVal.future.onComplete {
case Success(metadata) => failStage(status.asRuntimeException(metadata.headers.raw.orNull))
case _ => failStage(status.asRuntimeException())
}(ExecutionContexts.parasitic)
}
call = null
}
Expand Down