From 65bc805b7e0edb1a1728fb4de5ca30f8ba8d115b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 31 May 2016 15:44:06 -0700 Subject: [PATCH] Server: Support for request compression. (#175) --- docs/server.md | 8 +++ .../server/http/TranquilityServlet.scala | 19 ++++++- .../server/TranquilityServletTest.scala | 51 +++++++++++++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/docs/server.md b/docs/server.md index a52c13c..bebe0db 100644 --- a/docs/server.md +++ b/docs/server.md @@ -33,6 +33,14 @@ This API is synchronous and will block until your messages are either sent to Dr If there is an error sending the data, you will get an HTTP error code (4xx or 5xx). +### Request compression + +Tranquility Server supports request compression. To use request compression, set the "Content-Encoding" header of your +request and compress your payload. Currently supported options are: + +- gzip: Same scheme as HTTP's gzip response encoding. +- identity: No compression. + ### Direct object option You can also POST objects directly to Tranquility Server without going through the string-oriented parser. To do this, diff --git a/server/src/main/scala/com/metamx/tranquility/server/http/TranquilityServlet.scala b/server/src/main/scala/com/metamx/tranquility/server/http/TranquilityServlet.scala index 08ebcec..45a439b 100644 --- a/server/src/main/scala/com/metamx/tranquility/server/http/TranquilityServlet.scala +++ b/server/src/main/scala/com/metamx/tranquility/server/http/TranquilityServlet.scala @@ -22,6 +22,7 @@ package com.metamx.tranquility.server.http import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.dataformat.smile.SmileFactory import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes +import com.metamx.common.CompressionUtils import com.metamx.common.scala.Abort import com.metamx.common.scala.Jackson import com.metamx.common.scala.Logging @@ -33,8 +34,10 @@ import com.metamx.tranquility.tranquilizer.MessageDroppedException import com.twitter.util.Return import com.twitter.util.Throw import io.druid.data.input.InputRow +import java.io.InputStream import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference +import java.util.zip.GZIPInputStream import javax.ws.rs.core.MediaType import org.jboss.netty.handler.codec.http.HttpResponseStatus import org.scalatra.ScalatraServlet @@ -109,9 +112,10 @@ class TranquilityServlet( private def doV1Post(forceDataSource: Option[String], async: Boolean): Array[Byte] = { val objectMapper = getObjectMapper() + val decompressor = getRequestDecompressor() val messages: Walker[(String, InputRow)] = request.contentType match { case Some(JsonContentType) | Some(SmileContentType) => - Messages.fromObjectStream(request.inputStream, forceDataSource, objectMapper) map { + Messages.fromObjectStream(decompressor(request.inputStream), forceDataSource, objectMapper) map { case (dataSource, d) => val row = getBundle(dataSource).mapParser.parse(d.asJava.asInstanceOf[java.util.Map[String, AnyRef]]) (dataSource, row) @@ -124,7 +128,7 @@ class TranquilityServlet( s"Must include dataSource in URL for contentType[$TextContentType]" ) } - Messages.fromStringStream(request.inputStream, getBundle(dataSource).stringParser) map { row => + Messages.fromStringStream(decompressor(request.inputStream), getBundle(dataSource).stringParser) map { row => (dataSource, row) } @@ -154,6 +158,17 @@ class TranquilityServlet( } } + private def getRequestDecompressor(): InputStream => InputStream = { + request.header("Content-Encoding") match { + case Some("gzip") | Some("x-gzip") => + in => CompressionUtils.gzipInputStream(in) + case Some("identity") | None => + identity + case Some(x) => + throw new HttpException(HttpResponseStatus.BAD_REQUEST, "Unrecognized request Content-Encoding") + } + } + private def doSend(messages: Walker[(String, InputRow)], async: Boolean): (Long, Long) = { val myBundles = mutable.HashMap[String, DataSourceBundle]() val received = new AtomicLong diff --git a/server/src/test/scala/com/metamx/tranquility/server/TranquilityServletTest.scala b/server/src/test/scala/com/metamx/tranquility/server/TranquilityServletTest.scala index cbc1b81..1429df3 100644 --- a/server/src/test/scala/com/metamx/tranquility/server/TranquilityServletTest.scala +++ b/server/src/test/scala/com/metamx/tranquility/server/TranquilityServletTest.scala @@ -20,9 +20,11 @@ package com.metamx.tranquility.server import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.dataformat.smile.SmileFactory import com.google.common.base.Charsets import com.metamx.common.scala.Jackson import com.metamx.common.scala.collection.implicits._ +import com.metamx.common.scala.gz.gzip import com.metamx.common.scala.untyped._ import com.metamx.tranquility.beam.Beam import com.metamx.tranquility.beam.MemoryBeam @@ -221,6 +223,55 @@ class TranquilityServletTest extends FunSuite with ShouldMatchers ) } + test("/v1/post/dataSource, application/x-jackson-smile, gzip encoded, array") { + val objectMapper = Jackson.newObjectMapper(new SmileFactory()) + val events = withBeams { beams => + withTester(beams) { tester => + val body = gzip( + objectMapper.writeValueAsBytes( + Seq( + Dict("dataSource" -> "foo", "n" -> 1), + Dict("dataSource" -> "foo", "n" -> 2), + Dict("feed" -> "bar", "n" -> 3), + Dict("dataSource" -> "bar", "n" -> 4, FailableBeam.ActionKey -> "__drop__"), + Dict("dataSource" -> "bar", "n" -> 5) + ) + ) + ) + + val headers = Map( + "Content-Type" -> "application/x-jackson-smile", + "Content-Encoding" -> "gzip" + ) + + tester.post("/v1/post/foo", body, headers) { + tester.status should be(200) + tester.header("Content-Type") should startWith("application/x-jackson-smile;") + val response = objectMapper.readValue(tester.bodyBytes, classOf[Dict]) + response should be( + Dict( + "result" -> Dict( + "received" -> 5, + "sent" -> 4 + ) + ) + ) + } + } + } + + events should be( + Map( + "foo" -> Seq( + Dict("dataSource" -> "foo", "n" -> 1), + Dict("dataSource" -> "foo", "n" -> 2), + Dict("feed" -> "bar", "n" -> 3), + Dict("dataSource" -> "bar", "n" -> 5) + ) + ) + ) + } + test("/v1/post/dataSource, text/plain, csv") { val events = withBeams { beams => val parseSpec = new CSVParseSpec(