diff --git a/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala b/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala index e8bbe6f416..ae870b77ec 100644 --- a/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala @@ -11,6 +11,7 @@ import akka.pattern.AskTimeoutException import kamon.Kamon import kamon.instrumentation.executor.ExecutorInstrumentation import kamon.tag.TagSet +import monix.eval.Task import monix.execution.Scheduler import monix.execution.schedulers.SchedulerService import net.ceedubs.ficus.Ficus._ @@ -145,7 +146,6 @@ final class QueryActor(memStore: TimeSeriesStore, .map { res => // below prevents from calling FiloDB directly (without Query Service) FiloSchedulers.assertThreadName(QuerySchedName) - querySession.close() replyTo ! res res match { case QueryResult(_, _, vectors, _, _, _) => resultVectors.record(vectors.length) @@ -171,9 +171,10 @@ final class QueryActor(memStore: TimeSeriesStore, } queryExecuteSpan.finish() } + .guarantee(Task.eval(querySession.close())) + logger.debug(s"Will now run query execution pipeline for $q") execTask.runToFuture(queryScheduler).recover { case ex => - querySession.close() // Unhandled exception in query, should be rare logger.error(s"queryId ${q.queryContext.queryId} Unhandled Query Error," + s" query was ${q.queryContext.origQueryParams}", ex) diff --git a/core/src/main/scala/filodb.core/query/QueryContext.scala b/core/src/main/scala/filodb.core/query/QueryContext.scala index 94571bceaa..97b45ca9f1 100644 --- a/core/src/main/scala/filodb.core/query/QueryContext.scala +++ b/core/src/main/scala/filodb.core/query/QueryContext.scala @@ -156,6 +156,10 @@ object QueryContext { * * IMPORTANT: The param catchMultipleLockSetErrors should be false * only in unit test code for ease of use. + * + * IMPORTANT: QuerySession object should be closed after use as such + * `monixTask.guarantee(Task.eval(querySession.close()))` + * */ case class QuerySession(qContext: QueryContext, queryConfig: QueryConfig, diff --git a/memory/src/main/scala/filodb.memory/EvictionLock.scala b/memory/src/main/scala/filodb.memory/EvictionLock.scala index 67c2108425..47a58f5460 100644 --- a/memory/src/main/scala/filodb.memory/EvictionLock.scala +++ b/memory/src/main/scala/filodb.memory/EvictionLock.scala @@ -68,8 +68,9 @@ class EvictionLock(trackQueriesHoldingEvictionLock: Boolean = false, def releaseExclusive(): Unit = reclaimLock.releaseExclusive() def acquireSharedLock(timeoutMs: Long, holderId: String, promQL: String): Boolean = { - if (trackQueriesHoldingEvictionLock) runningQueries.put(holderId, promQL) - reclaimLock.tryAcquireSharedNanos(TimeUnit.MILLISECONDS.toNanos(timeoutMs)) + val acquired = reclaimLock.tryAcquireSharedNanos(TimeUnit.MILLISECONDS.toNanos(timeoutMs)) + if (trackQueriesHoldingEvictionLock && acquired) runningQueries.put(holderId, promQL) + acquired } def releaseSharedLock(holderId: String): Unit = { diff --git a/query/src/main/scala/filodb/query/exec/ExecPlan.scala b/query/src/main/scala/filodb/query/exec/ExecPlan.scala index f3a69489a1..dab0300388 100644 --- a/query/src/main/scala/filodb/query/exec/ExecPlan.scala +++ b/query/src/main/scala/filodb/query/exec/ExecPlan.scala @@ -92,6 +92,10 @@ trait ExecPlan extends QueryCommand { * The returned task can be used to perform post-execution steps * such as sending off an asynchronous response message etc. * + * Typically the caller creates the QuerySession parameter object. Remember + * that the creator is also responsible for closing it with + * `returnTask.guarantee(Task.eval(querySession.close()))` + * */ // scalastyle:off method.length def execute(source: ChunkSource, diff --git a/query/src/main/scala/filodb/query/exec/InProcessPlanDispatcher.scala b/query/src/main/scala/filodb/query/exec/InProcessPlanDispatcher.scala index 1511686418..8c021bc4e5 100644 --- a/query/src/main/scala/filodb/query/exec/InProcessPlanDispatcher.scala +++ b/query/src/main/scala/filodb/query/exec/InProcessPlanDispatcher.scala @@ -37,7 +37,10 @@ import filodb.query.Query.qLogger Kamon.runWithSpan(Kamon.currentSpan(), false) { // translate implicit ExecutionContext to monix.Scheduler val querySession = QuerySession(plan.execPlan.queryContext, queryConfig, catchMultipleLockSetErrors = true) - plan.execPlan.execute(source, querySession).timeout(plan.clientParams.deadline.milliseconds).onErrorRecover { + plan.execPlan.execute(source, querySession) + .timeout(plan.clientParams.deadline.milliseconds) + .guarantee(Task.eval(querySession.close())) + .onErrorRecover { case e: TimeoutException if (plan.execPlan.queryContext.plannerParams.allowPartialResults) => qLogger.warn(s"Swallowed TimeoutException for query id: ${plan.execPlan.queryContext.queryId} " + diff --git a/version.sbt b/version.sbt index 69e1694708..9af36853e1 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.9.17.1" +version in ThisBuild := "0.9.17.2"