diff --git a/bundles/io.github.linkedfactory.service/src/main/scala/io/github/linkedfactory/service/KvinService.scala b/bundles/io.github.linkedfactory.service/src/main/scala/io/github/linkedfactory/service/KvinService.scala index 694b3c9b..23e6e41a 100644 --- a/bundles/io.github.linkedfactory.service/src/main/scala/io/github/linkedfactory/service/KvinService.scala +++ b/bundles/io.github.linkedfactory.service/src/main/scala/io/github/linkedfactory/service/KvinService.scala @@ -15,11 +15,11 @@ */ package io.github.linkedfactory.service -import io.github.linkedfactory.core.kvin.util.{AsyncExtendedIterator, CsvFormatParser} +import io.github.linkedfactory.core.kvin.util.{AsyncExtendedIterator, CsvFormatParser, JsonFormatWriter} import io.github.linkedfactory.core.kvin.{Kvin, KvinTuple, Record} import io.github.linkedfactory.core.rdf4j.FederatedServiceComponent import io.github.linkedfactory.service.util.{JsonFormatParser, LineProtocolParser} -import net.enilink.commons.iterator.IExtendedIterator +import net.enilink.commons.iterator.{IExtendedIterator, NiceIterator, WrappedIterator} import net.enilink.komma.core.{URI, URIs} import net.enilink.platform.lift.util.Globals import net.liftweb.common.Box.box2Iterable @@ -35,7 +35,8 @@ import org.apache.commons.csv.{CSVFormat, CSVPrinter} import java.io.{InputStream, OutputStream, OutputStreamWriter} import java.text.SimpleDateFormat -import java.util.Date +import java.util +import java.util.{Collections, Date} import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -74,6 +75,9 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga case list Options req if list.endsWith("values" :: Nil) || list.endsWith("properties" :: Nil) || // list.endsWith("**" :: Nil) || list.endsWith("values" :: "size" :: Nil) => InMemoryResponse(Array(), responseHeaders, S.responseCookies, 200) + case list Get req if list.endsWith("values" :: Nil) => serveValues(list, responseType(req)) + case list Post req if list.endsWith("values" :: Nil) && + req.contentType.exists(_ == "application/x-www-form-urlencoded") => serveValues(list, responseType(req)) case list Post req if list.endsWith("values" :: Nil) => val result = req.contentType match { case Full("application/influxdb-line") => @@ -82,163 +86,151 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga req.rawInputStream.flatMap(saveCsvValues(_, path ++ list.dropRight(1), System.currentTimeMillis)) case _ => req.json.flatMap(saveValues(_, path ++ list.dropRight(1), System.currentTimeMillis)) - // req.rawInputStream.flatMap(saveValues(_, path ++ list.dropRight(1), System.currentTimeMillis)) + // req.rawInputStream.flatMap(saveValues(_, path ++ list.dropRight(1), System.currentTimeMillis)) } result match { case Failure(msg, _, _) => FailureResponse(msg) case _ => OkResponse() } - case list Get req if list.endsWith("values" :: Nil) => - val limit = S.param("limit") flatMap (v => tryo(v.toLong)) filter (_ > 0) openOr 10000L - - if (limit > MAX_LIMIT) { - FailureResponse("The maximum limit is " + MAX_LIMIT + ". Please use multiple request if you require more data points.") - } else { - val values = getValues(path ++ list.dropRight(1), limit) - - def filename(defaultExt: String) = S.param("filename") openOr "values." + defaultExt - - val formatDate: Long => String = { - S.param("dateformat").map { f => - val timezoneoffset = S.param("timezoneoffset").map(_.toInt) openOr 0 - val formatter = new SimpleDateFormat(f) - (timestamp: Long) => { - val adjustedTime = timestamp - 60000 * timezoneoffset - formatter.format(new Date(adjustedTime)) - } - } openOr { - // the following does not work for some reason with Scala 2.13 - // (ts : Long) => ts.toString - def toString(ts: Long) = ts.toString - toString _ + case list Get _ if list.endsWith("properties" :: Nil) => createJsonResponse(getProperties(path ++ list.dropRight(1))) + case list Get _ if list.endsWith("**" :: Nil) => createJsonResponse(getDescendants(path ++ list.dropRight(1))) + + case list Delete _ if list.endsWith("values" :: Nil) => createJsonResponse(deleteValues(path ++ list.dropRight(1))) + // case list Get _ => // TODO return RDF description + }) + + def serveValues(path: List[String], contentType: Box[String]): LiftResponse = { + val limit = S.param("limit") flatMap (v => tryo(v.toLong)) filter (_ > 0) openOr 10000L + + if (limit > MAX_LIMIT) { + FailureResponse("The maximum limit is " + MAX_LIMIT + ". Please use multiple request if you require more data points.") + } else { + def filename(defaultExt: String) = S.param("filename") openOr "values." + defaultExt + + val formatDate: Long => String = { + S.param("dateformat").map { f => + val timezoneoffset = S.param("timezoneoffset").map(_.toInt) openOr 0 + val formatter = new SimpleDateFormat(f) + (timestamp: Long) => { + val adjustedTime = timestamp - 60000 * timezoneoffset + formatter.format(new Date(adjustedTime)) } - } + } openOr { + // the following does not work for some reason with Scala 2.13 + // (ts : Long) => ts.toString + def toString(ts: Long) = ts.toString - def recordToJson(r : Record) : JObject = JObject(r.iterator().asScala.map { e => - JField(e.getProperty.toString, e.getValue match { - case r : Record => recordToJson(r) - case uri : URI => JObject(JField("@id", uri.toString)) - case other => decompose(other) - }) - }.toList) - - // fast path to avoid value->JSON->string conversion for simple types - def value2Str(value: Any, quoteStrings: Boolean = true) = value match { - case null => "null" - case b: Boolean => b.toString - case n: Number => n.toString - case uri : URI => compactRender(JObject(JField("@id", uri.toString))) - case x: JValue => compactRender(x) - case e: Record => compactRender(recordToJson(e)) - case other if quoteStrings => compactRender(JString(other.toString)) - case other => other.toString + toString _ } + } - val response = responseType(req) map { - case "application/json" => - // { "item" : { "property1" : [ { "time" : 123, "seqNr" : 2, "value" : 1.3 } ], "property2" : [ { "time" : 123, "seqNr" : 5, "value" : 3.2 } ] } } - val streamer = (os: OutputStream) => { - val streamWriter = new OutputStreamWriter(os) - // open JSON result object, then iterate over the items - streamWriter.write("{") - for (((item, itemData), i) <- values.view.zipWithIndex) { - // open item object, then iterate over the properties - streamWriter.write(s"""\n "$item":{""") - for (((property, propertyData), p) <- itemData.view.zipWithIndex) { - // open array of property values, then fully consume the iterator - streamWriter.write(s"""\n "$property":[""") - for ((entry, k) <- propertyData.iterator.asScala.zipWithIndex) { - if (propertyData.hasNext && k % 100 == 0) streamWriter.write("\n ") - streamWriter.write(s"""{"time":${entry.time},"seqNr":${entry.seqNr},"value":${ value2Str(entry.value) }}""") - if (propertyData.hasNext) streamWriter.write(",") - } - propertyData.close() // close the iterator - streamWriter.write("\n ]") // close array of property values - if (p < itemData.size - 1) streamWriter.write(",") - } - streamWriter.write("\n }") // close item object - if (i < values.size - 1) streamWriter.write(",") - } - streamWriter.write("\n}") // close JSON result object - streamWriter.close() + def recordToJson(r: Record): JObject = JObject(r.iterator().asScala.map { e => + JField(e.getProperty.toString, e.getValue match { + case r: Record => recordToJson(r) + case uri: URI => JObject(JField("@id", uri.toString)) + case other => decompose(other) + }) + }.toList) + + // fast path to avoid value->JSON->string conversion for simple types + def value2Str(value: Any, quoteStrings: Boolean = true) = value match { + case null => "null" + case b: Boolean => b.toString + case n: Number => n.toString + case uri: URI => compactRender(JObject(JField("@id", uri.toString))) + case x: JValue => compactRender(x) + case e: Record => compactRender(recordToJson(e)) + case other if quoteStrings => compactRender(JString(other.toString)) + case other => other.toString + } + + val response = contentType map { + case "application/json" => + val values = getValues(path ++ path.dropRight(1), limit) + // { "item" : { "property1" : [ { "time" : 123, "seqNr" : 2, "value" : 1.3 } ], "property2" : [ { "time" : 123, "seqNr" : 5, "value" : 3.2 } ] } } + val streamer = (os: OutputStream) => { + val writer = new JsonFormatWriter(os, false) + try { + values.forEach(writer.writeTuple(_)) + } finally { + writer.close() } - OutputStreamResponse(streamer, -1, ("Content-Type", "application/json; charset=utf-8") :: - ("Content-Disposition", s"""inline; filename=${filename("json")}""") :: responseHeaders, S.responseCookies, 200) - case "text/csv" => - // { "item" : { "property1" : [ { "time" : 123, "seqNr" : 2, "value" : 1.3 } ], "property2" : [ { "time" : 123, "seqNr" : 5, "value" : 3.2 } ] } } - val streamer = (os: OutputStream) => { - val streamWriter = new OutputStreamWriter(os) - val csvFormat = CSVFormat.EXCEL - val csvPrinter = new CSVPrinter(streamWriter, csvFormat) - - // either use supplied properties or properties retrieved from database - val propertiesParam = S.param("properties").map(_.split("\\s+").toList) - - val itemProperties = values.map(v => { - (propertiesParam openOr v._2.map(_._1).toSet.toList.sorted).map((v._1, _)) - }).flatten.toList - - // print header row - csvPrinter.printRecord(("time" :: itemProperties.map(p => s"<${p._1}>@<${p._2}>")).asJava) - - var itemData = values.map(v => { - val ps = propertiesParam openOr v._2.map(_._1).toSet.toList.sorted - ps.flatMap(p => v._2.get(p).map(it => (if (it.hasNext()) it.next() else null, it))) - }).flatten - - val ordering: Ordering[KvinTuple] = (a: KvinTuple, b: KvinTuple) => { - // compare first by time and then by seqNr - val diffTime = a.time - b.time - if (diffTime != 0) diffTime.toInt else a.seqNr - b.seqNr - } - var finished = false - while (!finished) { - val nextTuples = itemData.map(_._1).filter(_ != null) - val maxTuple = if (nextTuples.isEmpty) null else nextTuples.max(ordering) - if (maxTuple != null) { - // print the row, properties without values at row timestamp stay unset - csvPrinter.printRecord((formatDate(maxTuple.time) :: itemData.map(d => { - if (d._1 != null && ordering.compare(maxTuple, d._1) == 0 && d._1.value != null) value2Str(d._1.value, false) else null - }).toList).asJava) - - // select next tuple from iterators - itemData = itemData.map(d => { - if (d._1 != null && ordering.compare(maxTuple, d._1) == 0) { - val it = d._2 - val next = if (it.hasNext()) it.next() else { - it.close() - null - } - (next, if (next != null) it else null) - } else d - }) - } else { - finished = true - } + } + OutputStreamResponse(streamer, -1, ("Content-Type", "application/json; charset=utf-8") :: + ("Content-Disposition", s"""inline; filename=${filename("json")}""") :: responseHeaders, S.responseCookies, 200) + case "text/csv" => + // { "item" : { "property1" : [ { "time" : 123, "seqNr" : 2, "value" : 1.3 } ], "property2" : [ { "time" : 123, "seqNr" : 5, "value" : 3.2 } ] } } + val streamer = (os: OutputStream) => { + // TODO find solution to not materialize everything as map + val valuesMap = getValuesMap(path ++ path.dropRight(1), limit) + + val streamWriter = new OutputStreamWriter(os) + val csvFormat = CSVFormat.EXCEL + val csvPrinter = new CSVPrinter(streamWriter, csvFormat) + + // either use supplied properties or properties retrieved from database + val propertiesParam = S.param("properties").map(_.split("\\s+").toList) + + val itemProperties = valuesMap.map(v => { + (propertiesParam openOr v._2.map(_._1).toSet.toList.sorted).map((v._1, _)) + }).flatten.toList + + // print header row + csvPrinter.printRecord(("time" :: itemProperties.map(p => s"<${p._1}>@<${p._2}>")).asJava) + + var itemData = valuesMap.flatMap(v => { + val ps = propertiesParam openOr v._2.map(_._1).toSet.toList.sorted + ps.flatMap(p => v._2.get(p).map(it => (if (it.hasNext()) it.next() else null, it))) + }) + + val ordering: Ordering[KvinTuple] = (a: KvinTuple, b: KvinTuple) => { + // compare first by time and then by seqNr + val diffTime = a.time - b.time + if (diffTime != 0) diffTime.toInt else a.seqNr - b.seqNr + } + var finished = false + while (!finished) { + val nextTuples = itemData.map(_._1).filter(_ != null) + val maxTuple = if (nextTuples.isEmpty) null else nextTuples.max(ordering) + if (maxTuple != null) { + // print the row, properties without values at row timestamp stay unset + csvPrinter.printRecord((formatDate(maxTuple.time) :: itemData.map(d => { + if (d._1 != null && ordering.compare(maxTuple, d._1) == 0 && d._1.value != null) value2Str(d._1.value, false) else null + }).toList).asJava) + + // select next tuple from iterators + itemData = itemData.map(d => { + if (d._1 != null && ordering.compare(maxTuple, d._1) == 0) { + val it = d._2 + val next = if (it.hasNext()) it.next() else { + it.close() + null + } + (next, if (next != null) it else null) + } else d + }) + } else { + finished = true } - - csvPrinter.close() } - OutputStreamResponse(streamer, -1, ("Content-Type", "text/csv; charset=utf-8") :: - ("Content-Disposition", s"""inline; filename=${filename("csv")}""") :: responseHeaders, S.responseCookies, 200) - case _ => BadRequestResponse() - } openOr BadRequestResponse() - response - } - case list Get _ if list.endsWith("properties" :: Nil) => createJsonResponse(getProperties(path ++ list.dropRight(1))) - case list Get _ if list.endsWith("**" :: Nil) => createJsonResponse(getDescendants(path ++ list.dropRight(1))) - case list Delete _ if list.endsWith("values" :: Nil) => createJsonResponse(deleteValues(path ++ list.dropRight(1))) - // case list Get _ => // TODO return RDF description - }) + csvPrinter.close() + } + OutputStreamResponse(streamer, -1, ("Content-Type", "text/csv; charset=utf-8") :: + ("Content-Disposition", s"""inline; filename=${filename("csv")}""") :: responseHeaders, S.responseCookies, 200) + case _ => BadRequestResponse() + } openOr BadRequestResponse() + response + } + } // handle JSON post content def saveValues(json: JValue, path: List[String], currentTime: Long): Box[_] = { var parentUri = Data.pathToURI(path) if (parentUri.lastSegment != "") parentUri = parentUri.appendSegment("") - JsonFormatParser.parseItem(parentUri, contextModelUri, json, currentTime) map ( _.foreach { tuple => - store.put(tuple) + JsonFormatParser.parseItem(parentUri, contextModelUri, json, currentTime) map (_.foreach { tuple => + store.put(tuple) }) } @@ -248,11 +240,11 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga if (parentUri.lastSegment != "") parentUri = parentUri.appendSegment("") try { - val tuples : IExtendedIterator[KvinTuple] = new io.github.linkedfactory.core.kvin.util.JsonFormatParser(in).parse(currentTime) + val tuples: IExtendedIterator[KvinTuple] = new io.github.linkedfactory.core.kvin.util.JsonFormatParser(in).parse(currentTime) store.put(tuples) Empty } catch { - case e : Exception => new Failure(e.getMessage(), Full(e), Empty) + case e: Exception => new Failure(e.getMessage(), Full(e), Empty) } } @@ -265,11 +257,11 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga val separator = S.param("separator").map(_.trim).filter(_.nonEmpty).map(_.charAt(0)).getOrElse(',') val parser = new CsvFormatParser(parentUri, separator, in) parser.setContext(contextModelUri) - val tuples : IExtendedIterator[KvinTuple] = parser.parse() + val tuples: IExtendedIterator[KvinTuple] = parser.parse() store.put(tuples) Empty } catch { - case e : Exception => new Failure(e.getMessage(), Full(e), Empty) + case e: Exception => new Failure(e.getMessage(), Full(e), Empty) } } @@ -278,14 +270,14 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga var parentUri = Data.pathToURI(path) if (parentUri.lastSegment != "") parentUri = parentUri.appendSegment("") - LineProtocolParser.parseLines(parentUri, contextModelUri, is, currentTime) map ( _.foreach { tuple => - store.put(tuple) + LineProtocolParser.parseLines(parentUri, contextModelUri, is, currentTime) map (_.foreach { tuple => + store.put(tuple) }) } def getSingleItem(path: List[String]): URI = S.param("item") flatMap { s => tryo(URIs.createURI(s)) } openOr Data.pathToURI(path) - def getValues(path: List[String], limit: Long): Map[String, Map[String, IExtendedIterator[KvinTuple]]] = { + def getValues(path: List[String], limit: Long): IExtendedIterator[KvinTuple] = { val items = (S.param("item") or S.param("items")).map { _.split("\\s+").flatMap { i => tryo(URIs.createURI(i)) }.toList } openOr List(Data.pathToURI(path)) @@ -299,25 +291,57 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga val executorService = FederatedServiceComponent.getExecutorService() val modelUri = contextModelUri - val results = items map { item => - val itemData = for ( - property <- { - (S.param("property") or S.param("properties")).map { - _.split("\\s+").flatMap { s => tryo(URIs.createURI(s)) }.toList - } openOr store.properties(item, contextModelUri).toList.asScala - } - ) yield { - val propertyData = (interval, op) match { - case (_, Full(op)) if interval > 0 => - new AsyncExtendedIterator(() => store.fetch(item, property, modelUri, end, begin, limit, interval, op), () => executorService) - case _ => - new AsyncExtendedIterator(() => store.fetch(item, property, modelUri, end, begin, limit, 0, null), () => executorService) + + val properties = (S.param("property") or S.param("properties")).map { + _.split("\\s+").flatMap { s => tryo(URIs.createURI(s)) }.toList + } openOr Nil + + if (limit > 0 && items.size < 5) { + // for now only optimize case where few items are + // for multiple items it may be faster to query as batch + items.foldLeft(NiceIterator.emptyIterator[KvinTuple]) { (it, item) => + val is = util.List.of(item) + val ps = if (properties.isEmpty) store.properties(item, contextModelUri).toList.asScala else properties + ps.foldLeft(it) { (it, property) => + it.andThen(new AsyncExtendedIterator[KvinTuple]( + () => store.fetch(is, util.List.of(property), modelUri, end, begin, limit, interval, op openOr null), + () => executorService)) } - property.toString -> propertyData } - item.toString -> itemData.toMap + } else { + store.fetch(items.asJava, properties.asJava, modelUri, end, begin, limit, interval, op openOr null) } - results.toMap + } + + def getValuesMap(path: List[String], limit: Long): Map[String, Map[String, IExtendedIterator[KvinTuple]]] = { + val items = (S.param("item") or S.param("items")).map { + _.split("\\s+").flatMap { i => tryo(URIs.createURI(i)) }.toList + } openOr List(Data.pathToURI(path)) + + val end = S.param("to") flatMap (v => tryo(v.toLong)) openOr KvinTuple.TIME_MAX_VALUE + val begin = S.param("from") flatMap (v => tryo(v.toLong)) openOr 0L + + // allow also fractional intervals + val interval = S.param("interval") flatMap (v => tryo(v.toDouble.longValue)) openOr 0L + val op = S.param("op") map (_.trim) + + val executorService = FederatedServiceComponent.getExecutorService() + val modelUri = contextModelUri + + val properties = (S.param("property") or S.param("properties")).map { + _.split("\\s+").flatMap { s => tryo(URIs.createURI(s)) }.toList + } openOr Nil + + items.map { item => + val is = util.List.of(item) + val ps = if (properties.isEmpty) store.properties(item, contextModelUri).toList.asScala else properties + val propertyData = ps.map { property => + (property.toString, new AsyncExtendedIterator[KvinTuple]( + () => store.fetch(is, util.List.of(property), modelUri, end, begin, limit, interval, op openOr null), + () => executorService)) + }.toMap + (item.toString, propertyData) + }.toMap } def deleteValues(path: List[String]): JObject = {