Skip to content

Commit

Permalink
Server: Support for request compression. (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed May 31, 2016
1 parent b399262 commit 65bc805
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
8 changes: 8 additions & 0 deletions docs/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ This API is synchronous and will block until your messages are either sent to Dr

If there is an error sending the data, you will get an HTTP error code (4xx or 5xx).

### Request compression

Tranquility Server supports request compression. To use request compression, set the "Content-Encoding" header of your
request and compress your payload. Currently supported options are:

- gzip: Same scheme as HTTP's gzip response encoding.
- identity: No compression.

### Direct object option

You can also POST objects directly to Tranquility Server without going through the string-oriented parser. To do this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package com.metamx.tranquility.server.http
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.smile.SmileFactory
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes
import com.metamx.common.CompressionUtils
import com.metamx.common.scala.Abort
import com.metamx.common.scala.Jackson
import com.metamx.common.scala.Logging
Expand All @@ -33,8 +34,10 @@ import com.metamx.tranquility.tranquilizer.MessageDroppedException
import com.twitter.util.Return
import com.twitter.util.Throw
import io.druid.data.input.InputRow
import java.io.InputStream
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.GZIPInputStream
import javax.ws.rs.core.MediaType
import org.jboss.netty.handler.codec.http.HttpResponseStatus
import org.scalatra.ScalatraServlet
Expand Down Expand Up @@ -109,9 +112,10 @@ class TranquilityServlet(

private def doV1Post(forceDataSource: Option[String], async: Boolean): Array[Byte] = {
val objectMapper = getObjectMapper()
val decompressor = getRequestDecompressor()
val messages: Walker[(String, InputRow)] = request.contentType match {
case Some(JsonContentType) | Some(SmileContentType) =>
Messages.fromObjectStream(request.inputStream, forceDataSource, objectMapper) map {
Messages.fromObjectStream(decompressor(request.inputStream), forceDataSource, objectMapper) map {
case (dataSource, d) =>
val row = getBundle(dataSource).mapParser.parse(d.asJava.asInstanceOf[java.util.Map[String, AnyRef]])
(dataSource, row)
Expand All @@ -124,7 +128,7 @@ class TranquilityServlet(
s"Must include dataSource in URL for contentType[$TextContentType]"
)
}
Messages.fromStringStream(request.inputStream, getBundle(dataSource).stringParser) map { row =>
Messages.fromStringStream(decompressor(request.inputStream), getBundle(dataSource).stringParser) map { row =>
(dataSource, row)
}

Expand Down Expand Up @@ -154,6 +158,17 @@ class TranquilityServlet(
}
}

private def getRequestDecompressor(): InputStream => InputStream = {
request.header("Content-Encoding") match {
case Some("gzip") | Some("x-gzip") =>
in => CompressionUtils.gzipInputStream(in)
case Some("identity") | None =>
identity
case Some(x) =>
throw new HttpException(HttpResponseStatus.BAD_REQUEST, "Unrecognized request Content-Encoding")
}
}

private def doSend(messages: Walker[(String, InputRow)], async: Boolean): (Long, Long) = {
val myBundles = mutable.HashMap[String, DataSourceBundle]()
val received = new AtomicLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package com.metamx.tranquility.server

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.dataformat.smile.SmileFactory
import com.google.common.base.Charsets
import com.metamx.common.scala.Jackson
import com.metamx.common.scala.collection.implicits._
import com.metamx.common.scala.gz.gzip
import com.metamx.common.scala.untyped._
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.MemoryBeam
Expand Down Expand Up @@ -221,6 +223,55 @@ class TranquilityServletTest extends FunSuite with ShouldMatchers
)
}

test("/v1/post/dataSource, application/x-jackson-smile, gzip encoded, array") {
val objectMapper = Jackson.newObjectMapper(new SmileFactory())
val events = withBeams { beams =>
withTester(beams) { tester =>
val body = gzip(
objectMapper.writeValueAsBytes(
Seq(
Dict("dataSource" -> "foo", "n" -> 1),
Dict("dataSource" -> "foo", "n" -> 2),
Dict("feed" -> "bar", "n" -> 3),
Dict("dataSource" -> "bar", "n" -> 4, FailableBeam.ActionKey -> "__drop__"),
Dict("dataSource" -> "bar", "n" -> 5)
)
)
)

val headers = Map(
"Content-Type" -> "application/x-jackson-smile",
"Content-Encoding" -> "gzip"
)

tester.post("/v1/post/foo", body, headers) {
tester.status should be(200)
tester.header("Content-Type") should startWith("application/x-jackson-smile;")
val response = objectMapper.readValue(tester.bodyBytes, classOf[Dict])
response should be(
Dict(
"result" -> Dict(
"received" -> 5,
"sent" -> 4
)
)
)
}
}
}

events should be(
Map(
"foo" -> Seq(
Dict("dataSource" -> "foo", "n" -> 1),
Dict("dataSource" -> "foo", "n" -> 2),
Dict("feed" -> "bar", "n" -> 3),
Dict("dataSource" -> "bar", "n" -> 5)
)
)
)
}

test("/v1/post/dataSource, text/plain, csv") {
val events = withBeams { beams =>
val parseSpec = new CSVParseSpec(
Expand Down

0 comments on commit 65bc805

Please sign in to comment.