Skip to content

Commit

Permalink
Release 0.9.3
Browse files Browse the repository at this point in the history
  • Loading branch information
tjackpaul authored Jan 27, 2020
2 parents daa230e + ca49c74 commit 64523d4
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ class RecordBuilder(memFactory: MemFactory,
recHash = -1
}

/**
* If somehow the state is inconsistent, and only a partial record is written,
* rewind the curRecordOffset back to the curRecEndOffset. In other words, rewind the write pointer
* back to the end of previous record. Partially written data is lost, but state is consistent again.
*/
def rewind(): Unit = {
curRecEndOffset = curRecordOffset
}

// Check that we are at end of a record. If a partial record is written, just rewind so state is not inconsistent.
private def checkPointers(): Unit = {
if (curRecEndOffset != curRecordOffset) {
logger.warn(s"Partial record was written, perhaps exception occurred. Rewinding to end of previous record.")
rewind()
}
}

// Only reset the container offsets, but not the fieldNo, mapOffset, recHash
private def resetContainerPointers(): Unit = {
curRecordOffset = containers.last.offset + ContainerHeaderLen
Expand All @@ -87,7 +104,7 @@ class RecordBuilder(memFactory: MemFactory,
* for partition keys. However for ingestion records it would be the same.
*/
private[core] final def startNewRecord(recSchema: RecordSchema, schemaID: Int): Unit = {
require(curRecEndOffset == curRecordOffset, s"Illegal state: $curRecEndOffset != $curRecordOffset")
checkPointers()

// Set schema, hashoffset, and write schema ID if needed
setSchema(recSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ final case class InfluxHistogramRecord(bytes: Array[Byte],
InfluxProtocolParser.parseKeyValues(bytes, fieldDelims, fieldEnd, visitor)

// Only create histogram record if we are able to parse above and it contains +Inf bucket
// This also ensures that it's not a blank histogram, which cannot be ingested
if (visitor.gotInf) {
val buckets = CustomBuckets(visitor.bucketTops)
val hist = LongHistogram(buckets, visitor.bucketVals)
Expand Down
30 changes: 16 additions & 14 deletions gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,22 @@ object InputRecord {
case (k, v) => (k.toDouble, v.toLong)
}.sorted

// Built up custom histogram objects and scheme, then encode
val buckets = CustomBuckets(sortedBuckets.map(_._1).toArray)
val hist = LongHistogram(buckets, sortedBuckets.map(_._2).toArray)

// Now, write out histogram
builder.startNewRecord(promHistogram)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
if (sortedBuckets.nonEmpty) {
// Built up custom histogram objects and scheme, then encode
val buckets = CustomBuckets(sortedBuckets.map(_._1).toArray)
val hist = LongHistogram(buckets, sortedBuckets.map(_._2).toArray)

// Now, write out histogram
builder.startNewRecord(promHistogram)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ class InputRecordBuilderSpec extends FunSpec with Matchers {
"shard" -> "0")
val metric = "my_hist"

val counts = Array(10L, 20L, 25, 38, 50, 66)
val sum = counts.sum.toDouble
val count = 50.0
val sumCountKVs = Seq("sum" -> sum, "count" -> count)

it("should writePromHistRecord to BR and be able to deserialize it") {
val buckets = Array(0.5, 1, 2.5, 5, 10, Double.PositiveInfinity)
val counts = Array(10L, 20L, 25, 38, 50, 66)
val expected = LongHistogram(CustomBuckets(buckets), counts)

val sum = counts.sum.toDouble
val count = 50.0
val bucketKVs = buckets.zip(counts).map {
case (Double.PositiveInfinity, c) => "+Inf" -> c.toDouble
case (b, c) => b.toString -> c.toDouble
}.toSeq
val sumCountKVs = Seq("sum" -> sum, "count" -> count)

// 1 - sum/count at end
InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, bucketKVs ++ sumCountKVs)
builder.allContainers.head.iterate(Schemas.promHistogram.ingestionSchema).foreach { row =>
Expand All @@ -36,4 +36,13 @@ class InputRecordBuilderSpec extends FunSpec with Matchers {
row.getHistogram(3) shouldEqual expected
}
}

it("should skip empty histograms via writePromHistRecord, and write subsequent records") {
builder.reset()
InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs)
InputRecord.writeGaugeRecord(builder, metric, baseTags, 100000L, 5.5)

// The empty histogram should have been skipped, so we should have only one record
builder.allContainers.head.countRecords shouldEqual 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,16 @@ object BinaryHistogram extends StrictLogging {
val formatCode = buckets match {
case g: GeometricBuckets if g.minusOne => HistFormat_Geometric1_Delta
case g: GeometricBuckets => HistFormat_Geometric_Delta
case c: CustomBuckets if c.numBuckets == 0 => HistFormat_Null
case c: CustomBuckets => HistFormat_Custom_Delta
}

buf.putByte(2, formatCode)
val valuesIndex = buckets.serialize(buf, 3)
val finalPos = NibblePack.packDelta(values, buf, valuesIndex)

val finalPos = if (formatCode == HistFormat_Null) { 3 }
else {
val valuesIndex = buckets.serialize(buf, 3)
NibblePack.packDelta(values, buf, valuesIndex)
}
require(finalPos <= 65535, s"Histogram data is too large: $finalPos bytes needed")
buf.putShort(0, (finalPos - 2).toShort)
finalPos
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.9.2"
version in ThisBuild := "0.9.3"

0 comments on commit 64523d4

Please sign in to comment.