Skip to content

Commit

Permalink
Fix asFlow() leak (#2164) (#2175)
Browse files Browse the repository at this point in the history
  • Loading branch information
lwasyl authored Apr 16, 2020
1 parent b15f11b commit 744365e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import kotlinx.coroutines.flow.callbackFlow
*/
@ExperimentalCoroutinesApi
fun <T> ApolloCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
clone().enqueue(
val clone = clone()
clone.enqueue(
object : ApolloCall.Callback<T>() {
override fun onResponse(response: Response<T>) {
runCatching {
Expand All @@ -42,7 +43,7 @@ fun <T> ApolloCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
}
}
)
awaitClose { this@toFlow.cancel() }
awaitClose { clone.cancel() }
}

/**
Expand All @@ -53,7 +54,8 @@ fun <T> ApolloCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
*/
@ExperimentalCoroutinesApi
fun <T> ApolloQueryWatcher<T>.toFlow(): Flow<Response<T>> = callbackFlow {
clone().enqueueAndWatch(
val clone = clone()
clone.enqueueAndWatch(
object : ApolloCall.Callback<T>() {
override fun onResponse(response: Response<T>) {
runCatching {
Expand All @@ -72,7 +74,7 @@ fun <T> ApolloQueryWatcher<T>.toFlow(): Flow<Response<T>> = callbackFlow {
}
}
)
awaitClose { this@toFlow.cancel() }
awaitClose { clone.cancel() }
}


Expand Down Expand Up @@ -116,7 +118,8 @@ fun <T> ApolloCall<T>.toDeferred(): Deferred<Response<T>> {
*/
@ExperimentalCoroutinesApi
fun <T> ApolloSubscriptionCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
clone().execute(
val clone = clone()
clone.execute(
object : ApolloSubscriptionCall.Callback<T> {
override fun onConnected() {
}
Expand All @@ -140,7 +143,7 @@ fun <T> ApolloSubscriptionCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
}
}
)
awaitClose { this@toFlow.cancel() }
awaitClose { clone.cancel() }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import com.apollographql.apollo.coroutines.toFlow
import com.apollographql.apollo.coroutines.toJob
import com.apollographql.apollo.exception.ApolloException
import com.apollographql.apollo.integration.normalizer.EpisodeHeroNameQuery
import com.apollographql.apollo.integration.normalizer.HeroNameQuery
import com.apollographql.apollo.integration.normalizer.HeroNameWithIdQuery
import com.apollographql.apollo.integration.normalizer.type.Episode
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -149,6 +151,29 @@ class CoroutinesApolloTest {
assertThat(response.data!!.hero()!!.name()).isEqualTo("R2-D2")
}

@Test
@ExperimentalCoroutinesApi
fun watcherFlowCancellationCancelsWatcher(): Unit = runBlocking {
server.enqueue(mockResponse("HeroNameWithIdResponse.json"))
apolloClient
.query(HeroNameWithIdQuery())
.watcher()
.toFlow()
.first() // Cancels the flow after first response

apolloClient.clearNormalizedCache()
apolloClient.clearHttpCache()

server.enqueue(mockResponse("HeroNameResponse.json"))
apolloClient
.query(HeroNameQuery())
.watcher()
.toFlow()
.first()

assertThat(server.requestCount).isEqualTo(2)
}.let { }

companion object {

private const val FILE_EPISODE_HERO_NAME_WITH_ID = "EpisodeHeroNameResponseWithId.json"
Expand Down

0 comments on commit 744365e

Please sign in to comment.