Skip to content

Commit

Permalink
Merge branch '0.8.3-integration' to master 0.8.3 release
Browse files Browse the repository at this point in the history
  • Loading branch information
jackson-paul committed Mar 19, 2019
2 parents 4bb89c0 + 3f331b9 commit 6de90f9
Show file tree
Hide file tree
Showing 33 changed files with 2,010 additions and 1,825 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ abstract class ClusterSeedDiscovery(val cluster: Cluster,
response = Http(seedsEndpoint).timeout(2000, 2000).asString
logger.info(s"Seeds endpoint returned a ${response.code}. Response body was ${response.body}")
} catch {
case NonFatal(e) =>
case NonFatal(e) => {
if (e.isInstanceOf[java.net.ConnectException]) {
// Don't bother logging the full the trace for something which is expected.
e.setStackTrace(new Array[StackTraceElement](0))
}
logger.info(s"Seeds endpoint $seedsEndpoint failed. This is expected on cluster bootstrap", e)
}
}
retriesRemaining -= 1
if (retriesRemaining > 0) Thread.sleep(settings.seedsHttpSleepBetweenRetries.toMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ final class QueryActor(memStore: MemStore,
def execPhysicalPlan2(q: ExecPlan, replyTo: ActorRef): Unit = {
epRequests.increment
Kamon.currentSpan().tag("query", q.getClass.getSimpleName)
val span = Kamon.buildSpan(s"execplan2-${q.getClass.getSimpleName}").start()
val span = Kamon.buildSpan(s"execplan2-${q.getClass.getSimpleName}")
.withTag("query-id", q.id)
.start()
implicit val _ = queryConfig.askTimeout
q.execute(memStore, dataset, queryConfig)
.foreach { res =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class QueryEngine(dataset: Dataset,
def dispatchExecPlan(execPlan: ExecPlan)
(implicit sched: ExecutionContext,
timeout: FiniteDuration): Task[QueryResponse] = {
execPlan.dispatcher.dispatch(execPlan)
val currentSpan = Kamon.currentSpan()
Kamon.withSpan(currentSpan) {
execPlan.dispatcher.dispatch(execPlan)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}

import filodb.core.AbstractSpec

object ActorSpecConfig {
def getNewSystem(name: String, config: Config): ActorSystem = {
// Delay between each test, to provide some allowance for the port binding to succeed.
// Ideally the port binding should automatically retry, but this is a simpler fix.
Thread.sleep(5000)
ActorSystem(name, config)
}
}

trait ActorSpecConfig {

val defaultConfig = """
Expand All @@ -27,8 +36,7 @@ trait ActorSpecConfig {
.withFallback(ConfigFactory.parseResources("application_test.conf"))
.withFallback(ConfigFactory.load("filodb-defaults.conf"))

def getNewSystem = ActorSystem("test", config)

def getNewSystem = ActorSpecConfig.getNewSystem("test", config)
}

trait SeedNodeConfig {
Expand Down Expand Up @@ -85,8 +93,9 @@ object AkkaSpec extends SeedNodeConfig {

val settings = new FilodbSettings(userConfig.withFallback(serverConfig))

def getNewSystem(c: Option[Config] = None): ActorSystem =
ActorSystem("test", c.map(_.withFallback(settings.allConfig)) getOrElse settings.allConfig)
def getNewSystem(c: Option[Config] = None): ActorSystem = {
ActorSpecConfig.getNewSystem("test", c.map(_.withFallback(settings.allConfig)) getOrElse settings.allConfig)
}
}

abstract class AkkaSpec(system: ActorSystem) extends AbstractTestKit(system)
Expand All @@ -97,8 +106,9 @@ abstract class AkkaSpec(system: ActorSystem) extends AbstractTestKit(system)
with Matchers
with ScalaFutures {

def this() = this(ActorSystem("akka-test", AkkaSpec.settings.allConfig))
def this(config: Config) = this(ActorSystem("akka-test", config.withFallback(AkkaSpec.settings.allConfig)))
def this() = this(ActorSpecConfig.getNewSystem("akka-test", AkkaSpec.settings.allConfig))
def this(config: Config) =
this(ActorSpecConfig.getNewSystem("akka-test", config.withFallback(AkkaSpec.settings.allConfig)))

}

Expand Down
65 changes: 37 additions & 28 deletions core/src/main/scala/filodb.core/memstore/OnDemandPagingShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import scala.concurrent.ExecutionContext

import debox.Buffer
import kamon.Kamon
import kamon.trace.Span
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.{Observable, OverflowStrategy}
Expand Down Expand Up @@ -38,6 +39,10 @@ TimeSeriesShard(dataset, storeConfig, shardNum, rawStore, metastore, evictionPol
// TODO: make this configurable
private val strategy = OverflowStrategy.BackPressure(1000)

private def startODPSpan: Span = Kamon.buildSpan(s"odp-cassandra-latency")
.withTag("dataset", dataset.name)
.withTag("shard", shardNum)
.start()
// NOTE: the current implementation is as follows
// 1. Fetch partitions from memStore
// 2. Determine, one at a time, what chunks are missing and could be fetched from disk
Expand Down Expand Up @@ -73,42 +78,46 @@ TimeSeriesShard(dataset, storeConfig, shardNum, rawStore, metastore, evictionPol
}
}
shardStats.partitionsQueried.increment(inMemoryPartitions.length)
val span = Kamon.buildSpan(s"odp-cassandra-latency")
.withTag("dataset", dataset.name)
.withTag("shard", shardNum)
.start()
logger.debug(s"dataset=${dataset.ref} shard=$shardNum Querying ${inMemoryPartitions.length} in memory " +
s"partitions, ODPing ${methods.length}")

// NOTE: multiPartitionODP mode does not work with AllChunkScan and unit tests; namely missing partitions will not
// return data that is in memory. TODO: fix
(if (storeConfig.multiPartitionODP) {
Observable.fromIterable(inMemoryPartitions) ++
Observable.fromTask(odpPartTask(indexIt, partKeyBytesToPage, methods, chunkMethod)).flatMap { odpParts =>
val multiPart = MultiPartitionScan(partKeyBytesToPage, shardNum)
shardStats.partitionsQueried.increment(partKeyBytesToPage.length)
if (partKeyBytesToPage.nonEmpty) {
rawStore.readRawPartitions(dataset, allDataCols, multiPart, computeBoundingMethod(methods))
// NOTE: this executes the partMaker single threaded. Needed for now due to concurrency constraints.
// In the future optimize this if needed.
.mapAsync { rawPart => partitionMaker.populateRawChunks(rawPart).executeOn(singleThreadPool) }
// This is needed so future computations happen in a different thread
.asyncBoundary(strategy)
} else { Observable.empty }
}
} else {
Observable.fromIterable(inMemoryPartitions) ++
Observable.fromTask(odpPartTask(indexIt, partKeyBytesToPage, methods, chunkMethod)).flatMap { odpParts =>
shardStats.partitionsQueried.increment(partKeyBytesToPage.length)
Observable.fromIterable(partKeyBytesToPage.zip(methods))
.mapAsync(storeConfig.demandPagingParallelism) { case (partBytes, method) =>
rawStore.readRawPartitions(dataset, allDataCols, SinglePartitionScan(partBytes, shardNum), method)
Observable.fromIterable(inMemoryPartitions) ++ {
if (storeConfig.multiPartitionODP) {
Observable.fromTask(odpPartTask(indexIt, partKeyBytesToPage, methods, chunkMethod)).flatMap { odpParts =>
val multiPart = MultiPartitionScan(partKeyBytesToPage, shardNum)
shardStats.partitionsQueried.increment(partKeyBytesToPage.length)
if (partKeyBytesToPage.nonEmpty) {
val span = startODPSpan
rawStore.readRawPartitions(dataset, allDataCols, multiPart, computeBoundingMethod(methods))
// NOTE: this executes the partMaker single threaded. Needed for now due to concurrency constraints.
// In the future optimize this if needed.
.mapAsync { rawPart => partitionMaker.populateRawChunks(rawPart).executeOn(singleThreadPool) }
.defaultIfEmpty(getPartition(partBytes).get)
.headL
.doOnComplete(() => span.finish())
// This is needed so future computations happen in a different thread
.asyncBoundary(strategy)
} else { Observable.empty }
}
} else {
Observable.fromTask(odpPartTask(indexIt, partKeyBytesToPage, methods, chunkMethod)).flatMap { odpParts =>
shardStats.partitionsQueried.increment(partKeyBytesToPage.length)
if (partKeyBytesToPage.nonEmpty) {
val span = startODPSpan
Observable.fromIterable(partKeyBytesToPage.zip(methods))
.mapAsync(storeConfig.demandPagingParallelism) { case (partBytes, method) =>
rawStore.readRawPartitions(dataset, allDataCols, SinglePartitionScan(partBytes, shardNum), method)
.mapAsync { rawPart => partitionMaker.populateRawChunks(rawPart).executeOn(singleThreadPool) }
.defaultIfEmpty(getPartition(partBytes).get)
.headL
}
.doOnComplete(() => span.finish())
} else {
Observable.empty
}
}
}
}).doOnComplete(() => span.finish())
}
}

// 3. Deal with partitions no longer in memory but still indexed in Lucene.
Expand Down
68 changes: 35 additions & 33 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import scalaxy.loops._
import filodb.core.Types._
import filodb.core.metadata.Dataset
import filodb.core.store._
import filodb.memory.{BinaryRegion, BinaryRegionLarge, BlockMemFactory}
import filodb.memory.data.{MapHolder, OffheapLFSortedIDMapMutator}
import filodb.memory.{BinaryRegion, BinaryRegionLarge, BlockMemFactory, MemFactory}
import filodb.memory.data.ChunkMap
import filodb.memory.format._

object TimeSeriesPartition extends StrictLogging {
Expand Down Expand Up @@ -44,25 +44,24 @@ final case class InfoAppenders(info: ChunkSetInfo, appenders: TimeSeriesPartitio
* This allows for safe and cheap write buffer churn without losing any data.
* switchBuffers() is called before flush() is called in another thread, possibly.
*
* The main data structure used is the "infoMap" - an OffheapLFSortedIDMap, an extremely efficient, offheap sorted map
* Note that other than the variables used in this class, there is NO heap memory used for managing chunks. Thus
* the amount of heap memory used for a partition is O(1) constant regardless of the number of chunks in a TSPartition.
* The partition key and infoMap are both in offheap write buffers, and chunks and chunk metadata are kept in
* offheap block memory.
* The main data structure used is inherited from ChunkMap, an efficient, offheap sorted map.
* Note that other than the variables used in this class, there is NO JVM-managed memory used
* for managing chunks. Thus the amount of JVM-managed memory used for a partition is constant
* regardless of the number of chunks in a TSPartition. The partition key and infoMap are both
* in offheap write buffers, and chunks and chunk metadata are kept in offheap block memory.
*
* Note: Inheritance is chosen over composition to avoid an extra object allocation, which
* speeds up GC and reduces memory overhead a bit.
*/
class TimeSeriesPartition(val partID: Int,
val dataset: Dataset,
partitionKey: BinaryRegion.NativePointer,
val shard: Int,
bufferPool: WriteBufferPool,
val shardStats: TimeSeriesShardStats,
// Volatile pointer to infoMap structure. Name of field MUST match mapKlazz method above
var mapPtr: BinaryRegion.NativePointer,
// Shared class for mutating the infoMap / OffheapLFSortedIDMap given mapPtr above
offheapInfoMap: OffheapLFSortedIDMapMutator,
// Lock state used by OffheapLFSortedIDMap.
var lockState: Int = 0)
extends ReadablePartition with MapHolder {
memFactory: MemFactory,
initMapSize: Int)
extends ChunkMap(memFactory, initMapSize) with ReadablePartition {
import TimeSeriesPartition._

require(bufferPool.dataset == dataset) // Really important that buffer pool schema matches
Expand Down Expand Up @@ -237,21 +236,21 @@ extends ReadablePartition with MapHolder {
appenders = appenders.filterNot(_ == ia)
}

def numChunks: Int = offheapInfoMap.length(this)
def numChunks: Int = chunkmapSize // inherited from ChunkMap
def appendingChunkLen: Int = if (currentInfo != nullInfo) currentInfo.numRows else 0

/**
* Number of unflushed chunksets lying around. Goes up every time a new writebuffer is allocated and goes down
* when flushes happen. Computed dynamically from current infosChunks state.
* NOTE: since sliceToEnd is inclusive, we need to start just past the newestFlushedID
*/
def unflushedChunksets: Int = offheapInfoMap.sliceToEnd(this, newestFlushedID + 1).count
def unflushedChunksets: Int = chunkmapSliceToEnd(newestFlushedID + 1).count

private def allInfos: ChunkInfoIterator = new ElementChunkInfoIterator(offheapInfoMap.iterate(this))
private def allInfos: ChunkInfoIterator = new ElementChunkInfoIterator(chunkmapIterate)

// NOT including currently flushing writeBuffer chunks if there are any
private[memstore] def infosToBeFlushed: ChunkInfoIterator =
new ElementChunkInfoIterator(offheapInfoMap.sliceToEnd(this, newestFlushedID + 1))
new ElementChunkInfoIterator(chunkmapSliceToEnd(newestFlushedID + 1))
.filter(_ != currentInfo) // filter out the appending chunk

def infos(method: ChunkScanMethod): ChunkInfoIterator = method match {
Expand All @@ -266,7 +265,7 @@ extends ReadablePartition with MapHolder {
try {
new OneChunkInfo(currentInfo)
} catch {
case e: Throwable => offheapInfoMap.releaseShared(this); throw e;
case e: Throwable => chunkmapReleaseShared(); throw e;
}
}
}
Expand All @@ -293,7 +292,7 @@ extends ReadablePartition with MapHolder {

private def doClose(): Unit = {
closed = true
offheapInfoMap.releaseShared(TimeSeriesPartition.this)
chunkmapReleaseShared()
}

def hasNext: Boolean = {
Expand All @@ -304,19 +303,22 @@ extends ReadablePartition with MapHolder {
def nextInfo: ChunkSetInfo = {
if (closed) throw new NoSuchElementException()
if (!valueSeen) {
offheapInfoMap.acquireShared(TimeSeriesPartition.this)
chunkmapAcquireShared()
valueSeen = true
}
return info
}

final def lock(): Unit = chunkmapAcquireShared()
final def unlock(): Unit = chunkmapReleaseShared()
}

final def earliestTime: Long = {
if (numChunks == 0) {
Long.MinValue
} else {
// Acquire shared lock to safely access the native pointer.
offheapInfoMap.withShared(this, ChunkSetInfo(offheapInfoMap.first(this)).startTime)
chunkmapWithShared(ChunkSetInfo(chunkmapDoGetFirst).startTime)
}
}

Expand All @@ -331,27 +333,27 @@ extends ReadablePartition with MapHolder {
currentInfo.endTime
} else if (numChunks > 0) {
// Acquire shared lock to safely access the native pointer.
offheapInfoMap.withShared(this, infoLast.endTime)
chunkmapWithShared(infoLast.endTime)
} else {
-1
}
}

// Disabled for now. Requires a shared lock on offheapInfoMap.
// Disabled for now. Requires a shared lock on the inherited map.
//def dataChunkPointer(id: ChunkID, columnID: Int): BinaryVector.BinaryVectorPtr = infoGet(id).vectorPtr(columnID)

final def removeChunksAt(id: ChunkID): Unit = {
offheapInfoMap.withExclusive(this, offheapInfoMap.remove(this, id))
chunkmapWithExclusive(chunkmapDoRemove(id))
shardStats.chunkIdsEvicted.increment()
}

final def hasChunksAt(id: ChunkID): Boolean = offheapInfoMap.contains(this, id)
final def hasChunksAt(id: ChunkID): Boolean = chunkmapContains(id)

// Used for adding chunksets that are paged in, ie that are already persisted
// Atomic and multi-thread safe; only mutates state if chunkID not present
final def addChunkInfoIfAbsent(id: ChunkID, infoAddr: BinaryRegion.NativePointer): Boolean = {
offheapInfoMap.withExclusive(this, {
val inserted = offheapInfoMap.putIfAbsent(this, id, infoAddr)
chunkmapWithExclusive({
val inserted = chunkmapDoPutIfAbsent(infoAddr)
// Make sure to update newestFlushedID so that flushes work correctly and don't try to flush these chunksets
if (inserted) updateFlushedID(infoGet(id))
inserted
Expand All @@ -362,14 +364,14 @@ extends ReadablePartition with MapHolder {
newestFlushedID = Math.max(newestFlushedID, info.id)
}

// Caller must hold lock on offheapInfoMap.
private def infoGet(id: ChunkID): ChunkSetInfo = ChunkSetInfo(offheapInfoMap(this, id))
// Caller must hold lock on the inherited map.
private def infoGet(id: ChunkID): ChunkSetInfo = ChunkSetInfo(chunkmapDoGet(id))

// Caller must hold lock on offheapInfoMap.
private[core] def infoLast(): ChunkSetInfo = ChunkSetInfo(offheapInfoMap.last(this))
// Caller must hold lock on the inherited map.
private[core] def infoLast(): ChunkSetInfo = ChunkSetInfo(chunkmapDoGetLast)

private def infoPut(info: ChunkSetInfo): Unit = {
offheapInfoMap.withExclusive(this, offheapInfoMap.put(this, info.infoAddr))
chunkmapWithExclusive(chunkmapDoPut(info.infoAddr))
}
}

Expand Down
Loading

0 comments on commit 6de90f9

Please sign in to comment.