Skip to content

Commit

Permalink
Merge pull request #516 from broneill/master
Browse files Browse the repository at this point in the history
Version 0.8.7
  • Loading branch information
broneill authored Oct 9, 2019
2 parents 2c6da9a + 047a7db commit ba1f004
Show file tree
Hide file tree
Showing 112 changed files with 3,653 additions and 731 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
language: scala
dist: trusty
env:
global:
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=256m"
Expand Down
19 changes: 11 additions & 8 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import monix.reactive.Observable

import filodb.coordinator._
import filodb.coordinator.client._
import filodb.coordinator.client.QueryCommands.{SpreadChange, SpreadProvider, StaticSpreadProvider}
import filodb.coordinator.client.QueryCommands.StaticSpreadProvider
import filodb.coordinator.queryengine2.{PromQlQueryParams, TsdbQueryParams, UnavailablePromQlQueryParams}
import filodb.core._
import filodb.core.metadata.Column
import filodb.memory.format.RowReader
Expand Down Expand Up @@ -211,35 +212,37 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
timeParams: TimeRangeParams,
options: QOptions): Unit = {
val logicalPlan = Parser.metadataQueryToLogicalPlan(query, timeParams)
executeQuery2(client, dataset, logicalPlan, options)
executeQuery2(client, dataset, logicalPlan, options, UnavailablePromQlQueryParams)
}

def parseLabelValuesQuery(client: LocalClient, labelNames: Seq[String], constraints: Map[String, String], dataset: String,
timeParams: TimeRangeParams,
options: QOptions): Unit = {
val logicalPlan = LabelValues(labelNames, constraints, 3.days.toMillis)
executeQuery2(client, dataset, logicalPlan, options)
executeQuery2(client, dataset, logicalPlan, options, UnavailablePromQlQueryParams)
}

def parsePromQuery2(client: LocalClient, query: String, dataset: String,
timeParams: TimeRangeParams,
options: QOptions): Unit = {
val logicalPlan = Parser.queryRangeToLogicalPlan(query, timeParams)
executeQuery2(client, dataset, logicalPlan, options)
executeQuery2(client, dataset, logicalPlan, options, PromQlQueryParams(query,timeParams.start, timeParams.step,
timeParams.end))
}

def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = {
def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions, tsdbQueryParams:
TsdbQueryParams): Unit = {
val ref = DatasetRef(dataset)
val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s)))
val qOpts = QueryCommands.QueryOptions(spreadProvider, options.sampleLimit)
val qOpts = QueryOptions(spreadProvider, options.sampleLimit)
.copy(queryTimeoutSecs = options.timeout.toSeconds.toInt,
shardOverrides = options.shardOverrides)
println(s"Sending query command to server for $ref with options $qOpts...")
println(s"Query Plan:\n$plan")
options.everyN match {
case Some(intervalSecs) =>
val fut = Observable.intervalAtFixedRate(intervalSecs.seconds).foreach { n =>
client.logicalPlan2Query(ref, plan, qOpts) match {
client.logicalPlan2Query(ref, plan, tsdbQueryParams, qOpts) match {
case QueryResult(_, schema, result) => result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
case err: QueryError => throw new ClientException(err)
}
Expand All @@ -250,7 +253,7 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
while (!fut.isCompleted) { Thread sleep 1000 }
case None =>
try {
client.logicalPlan2Query(ref, plan, qOpts) match {
client.logicalPlan2Query(ref, plan, tsdbQueryParams, qOpts) match {
case QueryResult(_, schema, result) => println(s"Number of Range Vectors: ${result.size}")
result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
case QueryError(_,ex) => println(s"QueryError: ${ex.getClass.getSimpleName} ${ex.getMessage}")
Expand Down
14 changes: 10 additions & 4 deletions coordinator/src/main/scala/filodb.coordinator/QueryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader
import scala.util.control.NonFatal

import filodb.coordinator.queryengine2.QueryEngine
import filodb.coordinator.queryengine2.{EmptyFailureProvider, QueryEngine}
import filodb.core._
import filodb.core.memstore.{FiloSchedulers, MemStore, TermInfo}
import filodb.core.metadata.Dataset
import filodb.core.query.ColumnFilter
import filodb.core.store.CorruptVectorException
import filodb.query._
import filodb.query.exec.ExecPlan

Expand Down Expand Up @@ -77,7 +78,8 @@ final class QueryActor(memStore: MemStore,
}.getOrElse { x: Seq[ColumnFilter] => Seq(SpreadChange(defaultSpread)) }
val functionalSpreadProvider = FunctionalSpreadProvider(spreadFunc)

val queryEngine2 = new QueryEngine(dataset, shardMapFunc)
val queryEngine2 = new QueryEngine(dataset, shardMapFunc,
EmptyFailureProvider, functionalSpreadProvider)
val queryConfig = new QueryConfig(config.getConfig("filodb.query"))
val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor") * sys.runtime.availableProcessors)
val queryScheduler = Scheduler.fixedPool(s"$QuerySchedName-${dataset.ref}", numSchedThreads.toInt)
Expand All @@ -103,6 +105,10 @@ final class QueryActor(memStore: MemStore,
case e: QueryError =>
queryErrors.increment
logger.debug(s"queryId ${q.id} Normal QueryError returned from query execution: $e")
e.t match {
case cve: CorruptVectorException => memStore.analyzeAndLogCorruptPtr(dataset.ref, cve)
case t: Throwable =>
}
}
span.finish()
}(queryScheduler).recover { case ex =>
Expand All @@ -121,7 +127,7 @@ final class QueryActor(memStore: MemStore,
// This is for CLI use only. Always prefer clients to materialize logical plan
lpRequests.increment
try {
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, getSpreadProvider(q.queryOptions))
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, q.tsdbQueryParams)
self forward execPlan
} catch {
case NonFatal(ex) =>
Expand All @@ -133,7 +139,7 @@ final class QueryActor(memStore: MemStore,

private def processExplainPlanQuery(q: ExplainPlan2Query, replyTo: ActorRef) = {
try {
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, getSpreadProvider(q.queryOptions))
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, q.tsdbQueryParams)
replyTo ! execPlan
} catch {
case NonFatal(ex) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package filodb.coordinator.client

import filodb.core.query.{ColumnFilter, Filter}
import filodb.query.{LogicalPlan => LogicalPlan2, QueryCommand}
import filodb.coordinator.queryengine2.TsdbQueryParams
import filodb.core.query.ColumnFilter
import filodb.query.{LogicalPlan => LogicalPlan2, QueryCommand, QueryOptions}

object QueryCommands {
import filodb.core._
Expand Down Expand Up @@ -33,11 +34,8 @@ object QueryCommands {
submitTime: Long = System.currentTimeMillis()) extends QueryCommand


final case class SpreadChange(time: Long = 0L, spread: Int = 1)

trait SpreadProvider {
def spreadFunc(filter: Seq[ColumnFilter]): Seq[SpreadChange]
}


final case class StaticSpreadProvider(spreadChange: SpreadChange = SpreadChange()) extends SpreadProvider {
def spreadFunc(filter: Seq[ColumnFilter]): Seq[SpreadChange] = {
Expand All @@ -58,48 +56,6 @@ object QueryCommands {
}
}

/**
* This class provides general query processing parameters
* @param spreadFunc a function that returns chronologically ordered spread changes for the filter
*/
final case class QueryOptions(spreadProvider: Option[SpreadProvider] = None,
parallelism: Int = 16,
queryTimeoutSecs: Int = 30,
sampleLimit: Int = 1000000,
shardOverrides: Option[Seq[Int]] = None)

object QueryOptions {
def apply(constSpread: Option[SpreadProvider], sampleLimit: Int): QueryOptions =
QueryOptions(spreadProvider = constSpread, sampleLimit = sampleLimit)

/**
* Creates a spreadFunc that looks for a particular filter with keyName Equals a value, and then maps values
* present in the spreadMap to specific spread values, with a default if the filter/value not present in the map
*/
def simpleMapSpreadFunc(keyName: String,
spreadMap: collection.mutable.Map[collection.Map[String, String], Int],
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = {
filters: Seq[ColumnFilter] =>
filters.collectFirst {
case ColumnFilter(key, Filter.Equals(filtVal: String)) if key == keyName => filtVal
}.map { tagValue =>
Seq(SpreadChange(spread = spreadMap.getOrElse(collection.mutable.Map(keyName->tagValue), defaultSpread)))
}.getOrElse(Seq(SpreadChange(defaultSpread)))
}

import collection.JavaConverters._

def simpleMapSpreadFunc(keyName: String,
spreadMap: java.util.Map[java.util.Map[String, String], Integer],
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = {
val spreadAssignment: collection.mutable.Map[collection.Map[String, String], Int]= spreadMap.asScala.map {
case (d, v) => d.asScala -> v.toInt
}

simpleMapSpreadFunc(keyName, spreadAssignment, defaultSpread)
}
}

/**
* Executes a query using a LogicalPlan and returns the result as one message to the client.
* Depends on queryOptions, the query will fan out to multiple nodes and shards as needed to gather
Expand All @@ -111,11 +67,13 @@ object QueryCommands {
*/
final case class LogicalPlan2Query(dataset: DatasetRef,
logicalPlan: LogicalPlan2,
tsdbQueryParams: TsdbQueryParams,
queryOptions: QueryOptions = QueryOptions(),
submitTime: Long = System.currentTimeMillis()) extends QueryCommand

final case class ExplainPlan2Query(dataset: DatasetRef,
logicalPlan: LogicalPlan2,
tsdbQueryParams: TsdbQueryParams,
queryOptions: QueryOptions = QueryOptions(),
submitTime: Long = System.currentTimeMillis()) extends QueryCommand
// Error responses from query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import scala.concurrent.duration._

import com.typesafe.scalalogging.StrictLogging

import filodb.coordinator.queryengine2.TsdbQueryParams
import filodb.core._
import filodb.query.{LogicalPlan => LogicalPlan2}
import filodb.query.{QueryResponse => QueryResponse2}
import filodb.query.{LogicalPlan => LogicalPlan2, QueryOptions, QueryResponse => QueryResponse2}

trait QueryOps extends ClientBase with StrictLogging {
import QueryCommands._
Expand Down Expand Up @@ -53,8 +53,9 @@ trait QueryOps extends ClientBase with StrictLogging {
*/
def logicalPlan2Query(dataset: DatasetRef,
plan: LogicalPlan2,
tsdbQueryParams: TsdbQueryParams,
options: QueryOptions = QueryOptions()): QueryResponse2 = {
val qCmd = LogicalPlan2Query(dataset, plan, options)
val qCmd = LogicalPlan2Query(dataset, plan, tsdbQueryParams, options)
// NOTE: It's very important to extend the query timeout for the ask itself, because the queryTimeoutSecs is
// the internal FiloDB scatter-gather timeout. We need additional time for the proper error to get transmitted
// back in case of internal timeouts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import filodb.core.binaryrecord2.{RecordSchema => RecordSchema2}
import filodb.core.metadata.Column
import filodb.core.query.ColumnInfo
import filodb.memory.format.ZeroCopyUTF8String
import filodb.query.QueryOptions

/**
* Register commonly used classes for efficient Kryo serialization. If this is not done then Kryo might have to
Expand Down Expand Up @@ -91,7 +92,7 @@ class KryoInit {
kryo.register(classOf[ShardSplit])

kryo.register(classOf[QueryCommands.BadQuery])
kryo.register(classOf[QueryCommands.QueryOptions])
kryo.register(classOf[QueryOptions])
kryo.register(classOf[QueryCommands.FilteredPartitionQuery])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import monix.reactive.Observable
import org.scalactic._

import filodb.coordinator.ShardMapper
import filodb.core.{ErrorResponse, Types}
import filodb.core.{ErrorResponse, SpreadProvider, Types}
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.Dataset
import filodb.core.query.{ColumnFilter, Filter}
import filodb.core.store._
import filodb.query.QueryOptions

final case class ChildErrorResponse(source: ActorRef, resp: ErrorResponse) extends
Exception(s"From [$source] - $resp")
Expand All @@ -26,7 +27,6 @@ final case class ChildErrorResponse(source: ActorRef, resp: ErrorResponse) exten
* Logical -> Physical Plan conversion and implementing the Distribute* physical primitives
*/
object Utils extends StrictLogging {
import filodb.coordinator.client.QueryCommands._
import TrySugar._
import filodb.coordinator.client.QueryCommands._

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package filodb.coordinator.queryengine2

import filodb.core.DatasetRef

/**
* A provider to get failure ranges. Query engine can use failure ranges while preparing physical
* plan to reroute or skip failure ranges. Ranges are based on dataset and over all clusters.
* Provider will filter failure ranges by current cluster and its replicas. Failures which do not
* belong to current cluster or its replica, will be skipped.
*/
trait FailureProvider {
def getFailures(datasetRef: DatasetRef, queryTimeRange: TimeRange): Seq[FailureTimeRange]
}

object EmptyFailureProvider extends FailureProvider {
override def getFailures(datasetRef: DatasetRef, queryTimeRange: TimeRange): Seq[FailureTimeRange] = Nil
}

/**
* Time range.
*
* @param startInMillis epoch time in millis.
* @param endInMillis epoch time in millis.
*/
case class TimeRange(startInMillis: Long, endInMillis: Long)

/**
* Failure details.
*
* @param clusterName cluster name.
* @param datasetRef Dataset reference for database and dataset.
* @param timeRange time range.
*/
case class FailureTimeRange(clusterName: String, datasetRef: DatasetRef, timeRange: TimeRange,
isRemote: Boolean)

/**
* For rerouting queries for failure ranges, Route trait will offer more context in the form of corrective
* ranges for queries or alternative dispatchers.
* A local route indicates a non-failure range on local cluster. A remote route indicates a non-failure
* range on remote cluster.
*/
trait Route

case class LocalRoute(timeRange: Option[TimeRange]) extends Route

case class RemoteRoute(timeRange: Option[TimeRange]) extends Route
Loading

0 comments on commit ba1f004

Please sign in to comment.