Skip to content

Commit

Permalink
cherry-pick from develop
Browse files Browse the repository at this point in the history
Merge pull request #1466 from alextheimer/main
  • Loading branch information
alextheimer authored Oct 15, 2022
2 parents eb91eab + 5f879dc commit 044b5e2
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.pattern.AskTimeoutException
import kamon.Kamon
import kamon.instrumentation.executor.ExecutorInstrumentation
import kamon.tag.TagSet
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService
import net.ceedubs.ficus.Ficus._
Expand Down Expand Up @@ -145,7 +146,6 @@ final class QueryActor(memStore: TimeSeriesStore,
.map { res =>
// below prevents from calling FiloDB directly (without Query Service)
FiloSchedulers.assertThreadName(QuerySchedName)
querySession.close()
replyTo ! res
res match {
case QueryResult(_, _, vectors, _, _, _) => resultVectors.record(vectors.length)
Expand All @@ -171,9 +171,10 @@ final class QueryActor(memStore: TimeSeriesStore,
}
queryExecuteSpan.finish()
}
.guarantee(Task.eval(querySession.close()))

logger.debug(s"Will now run query execution pipeline for $q")
execTask.runToFuture(queryScheduler).recover { case ex =>
querySession.close()
// Unhandled exception in query, should be rare
logger.error(s"queryId ${q.queryContext.queryId} Unhandled Query Error," +
s" query was ${q.queryContext.origQueryParams}", ex)
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/filodb.core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ object QueryContext {
*
* IMPORTANT: The param catchMultipleLockSetErrors should be false
* only in unit test code for ease of use.
*
* IMPORTANT: QuerySession object should be closed after use as such
* `monixTask.guarantee(Task.eval(querySession.close()))`
*
*/
case class QuerySession(qContext: QueryContext,
queryConfig: QueryConfig,
Expand Down
5 changes: 3 additions & 2 deletions memory/src/main/scala/filodb.memory/EvictionLock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ class EvictionLock(trackQueriesHoldingEvictionLock: Boolean = false,
def releaseExclusive(): Unit = reclaimLock.releaseExclusive()

def acquireSharedLock(timeoutMs: Long, holderId: String, promQL: String): Boolean = {
if (trackQueriesHoldingEvictionLock) runningQueries.put(holderId, promQL)
reclaimLock.tryAcquireSharedNanos(TimeUnit.MILLISECONDS.toNanos(timeoutMs))
val acquired = reclaimLock.tryAcquireSharedNanos(TimeUnit.MILLISECONDS.toNanos(timeoutMs))
if (trackQueriesHoldingEvictionLock && acquired) runningQueries.put(holderId, promQL)
acquired
}

def releaseSharedLock(holderId: String): Unit = {
Expand Down
4 changes: 4 additions & 0 deletions query/src/main/scala/filodb/query/exec/ExecPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ trait ExecPlan extends QueryCommand {
* The returned task can be used to perform post-execution steps
* such as sending off an asynchronous response message etc.
*
* Typically the caller creates the QuerySession parameter object. Remember
* that the creator is also responsible for closing it with
* `returnTask.guarantee(Task.eval(querySession.close()))`
*
*/
// scalastyle:off method.length
def execute(source: ChunkSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import filodb.query.Query.qLogger
Kamon.runWithSpan(Kamon.currentSpan(), false) {
// translate implicit ExecutionContext to monix.Scheduler
val querySession = QuerySession(plan.execPlan.queryContext, queryConfig, catchMultipleLockSetErrors = true)
plan.execPlan.execute(source, querySession).timeout(plan.clientParams.deadline.milliseconds).onErrorRecover {
plan.execPlan.execute(source, querySession)
.timeout(plan.clientParams.deadline.milliseconds)
.guarantee(Task.eval(querySession.close()))
.onErrorRecover {
case e: TimeoutException if (plan.execPlan.queryContext.plannerParams.allowPartialResults)
=>
qLogger.warn(s"Swallowed TimeoutException for query id: ${plan.execPlan.queryContext.queryId} " +
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.9.17.1"
version in ThisBuild := "0.9.17.2"

0 comments on commit 044b5e2

Please sign in to comment.