Skip to content

Commit

Permalink
Bump NU to 1.2-preview / latest staging (#207)
Browse files Browse the repository at this point in the history
* Bump: nussknacker 1.1.0

* Bump nu to 1.2-previe / latest version

* Remove empty lines

* Update prinz version to 1.2.0-preview-staging

Co-authored-by: Łukasz Ciołecki <[email protected]>
  • Loading branch information
lciolecki and Łukasz Ciołecki authored Dec 14, 2021
1 parent fce076b commit 4f46e4a
Show file tree
Hide file tree
Showing 23 changed files with 77 additions and 94 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ If you are already signed to GitHub in your project, just add any of these lines
to add Prinz dependencies to your `.sbt` project

```sbt
"pl.touk.nussknacker.prinz" %% "prinz" % "1.0.0-SNAPSHOT"
"pl.touk.nussknacker.prinz" %% "prinz-mlflow" % "1.0.0-SNAPSHOT"
"pl.touk.nussknacker.prinz" %% "prinz-pmml" % "1.0.0-SNAPSHOT"
"pl.touk.nussknacker.prinz" %% "prinz-h2o" % "1.0.0-SNAPSHOT"
"pl.touk.nussknacker.prinz" %% "prinz-proxy" % "1.0.0-SNAPSHOT"
"pl.touk.nussknacker.prinz" %% "prinz" % "1.2.0-preview-staging"
"pl.touk.nussknacker.prinz" %% "prinz-mlflow" % "1.2.0-preview-staging"
"pl.touk.nussknacker.prinz" %% "prinz-pmml" % "1.2.0-preview-staging"
"pl.touk.nussknacker.prinz" %% "prinz-h2o" % "1.2.0-preview-staging"
"pl.touk.nussknacker.prinz" %% "prinz-proxy" % "1.2.0-preview-staging"
```

## Authors
Expand Down
15 changes: 7 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import sbtassembly.MergeStrategy

val prinzV = "1.0.0-SNAPSHOT"
val prinzV = "1.2.0-preview-staging"
val prinzOrg = "pl.touk.nussknacker.prinz"
val repositoryOwner = "prinz-nussknacker"
val repositoryName = "prinz"

// Dependency versions
val scalaV = "2.12.10"
val nussknackerV = "1.0.0"
val nussknackerV = "1.2.0-staging-2021-12-09-5983-f15ac11992e3a82d32f651ef4e05b5d9458f46aa-SNAPSHOT"
val sttpV = "3.0.0-RC7"
val scalatestV = "3.2.2"
val minioS3V = "8.0.0"
val circeV = "0.11.1"
val circeV = "0.14.1"
val circeYamlV = "0.11.0-M1"
val jpmmlV = "1.5.11"
val jpmmlTranspilerV = "1.1.7"
Expand Down Expand Up @@ -69,7 +69,8 @@ lazy val commonSettings = Seq(
Seq(
"ch.qos.logback" % "logback-classic" % logbackV,
"com.typesafe.scala-logging" %% "scala-logging" % typesafeLogV,
"pl.touk.nussknacker" %% "nussknacker-process" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-util" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-api" % nussknackerV,
)
},

Expand Down Expand Up @@ -200,13 +201,11 @@ lazy val prinz_sample = (project in file("prinz_sample"))
// "pl.touk.nussknacker.prinz" %% "prinz-pmml" % prinzV,
// "pl.touk.nussknacker.prinz" %% "prinz-h2o" % prinzV,

"pl.touk.nussknacker" %% "nussknacker-process" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-model-flink-util" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-kafka-flink-util" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-flink-engine" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-flink-kafka-util" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-ui" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-flink-manager" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-flink-api" % nussknackerV,
"pl.touk.nussknacker" %% "nussknacker-flink-util" % nussknackerV,
)
},
// add GitHub packages resolver dependency with GitHub token declared to download prinz
Expand Down
2 changes: 1 addition & 1 deletion dev-environment/create_environment.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

scalaV="2.12"
prinzV="1.0.0-SNAPSHOT"
prinzV="1.2.0-preview-staging"

COMP_FILES=""
ENV_FILE="-f docker-compose-env.yaml"
Expand Down
2 changes: 1 addition & 1 deletion dev-environment/docker-compose-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ services:
# this is needed to be able to verify savepoints during deployments
- storage_flink:/opt/flink/data
- ./nussknacker/opt/prinz-sample/prinz.conf:/opt/nussknacker/conf/prinz.conf
- ./nussknacker/opt/prinz-sample/prinz-sample-assembly-1.0.0-SNAPSHOT.jar:/opt/prinz-sample/prinz-sample.jar
- ./nussknacker/opt/prinz-sample/prinz-sample-assembly-1.2.0-preview-staging.jar:/opt/prinz-sample/prinz-sample.jar
networks:
- dev-bridge-net

Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
package pl.touk.nussknacker.prinz.engine

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.definition.{Parameter, ServiceWithExplicitMethod}
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.test.InvocationCollectors
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.{ContextId, MetaData}
import pl.touk.nussknacker.engine.util.service.ServiceWithStaticParametersAndReturnType
import pl.touk.nussknacker.prinz.model.Model
import pl.touk.nussknacker.prinz.model.ModelInstance.ModelInputData
import pl.touk.nussknacker.prinz.util.collection.immutable.VectorMultimap

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}

final case class PrinzEnricher(private val model: Model)
extends ServiceWithExplicitMethod
extends ServiceWithStaticParametersAndReturnType
with LazyLogging {

private lazy val modelInstance = {
model.toModelInstance
}

override def invokeService(params: List[AnyRef])
(implicit ec: ExecutionContext,
collector: InvocationCollectors.ServiceInvocationCollector,
metaData: MetaData,
contextId: ContextId): Future[AnyRef] = {
val inputMap = createInputMap(params)
def invoke(params: Map[String, Any])(implicit ec: ExecutionContext,
collector: InvocationCollectors.ServiceInvocationCollector,
contextId: ContextId,
metaData: MetaData): Future[Any] = {
val inputMap = createInputMap(params.values.toList)
modelInstance.run(inputMap).map {
case Right(runResult) => runResult
case Left(exc) => throw exc
}
}(ec)
}

override def parameterDefinition: List[Parameter] =
override def parameters: List[Parameter] =
model
.getMetadata
.signature
Expand All @@ -44,6 +43,6 @@ final case class PrinzEnricher(private val model: Model)
.signature
.toOutputTypedObjectTypingResult

def createInputMap(inputs: List[AnyRef]): ModelInputData =
VectorMultimap(parameterDefinition.map(_.name) zip inputs)
def createInputMap(inputs: List[Any]): ModelInputData =
VectorMultimap(parameters.map(_.name) zip inputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ object ModelInstance {

type ModelRunResult = Future[Either[ModelRunException, JMap[String, _]]]

type ModelInputData = VectorMultimap[String, AnyRef]
type ModelInputData = VectorMultimap[String, Any]
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait ApiIntegrationSpec extends UnitTest with TestModelsManager {
val model = getModel().get
val instance = getModelInstance().get
val signature = model.getMetadata.signature
val sampleInput = constructInputMap(0.415.asInstanceOf[AnyRef], signature)
val sampleInput = constructInputMap(0.415.asInstanceOf[Any], signature)

val response = Await.result(instance.run(sampleInput), awaitTimeout)
response.toOption.isDefined shouldBe true
Expand Down Expand Up @@ -82,7 +82,7 @@ trait ApiIntegrationSpec extends UnitTest with TestModelsManager {
("gender", "F"),
("category", "es_transportation"),
("amount", 800.0),
).mapValues(_.asInstanceOf[AnyRef])
).mapValues(_.asInstanceOf[Any])

val response = Await.result(instance.run(sampleInput), awaitTimeout)
response.toOption.isDefined shouldBe true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait TestModelsManager {
def getElasticnetWineModelModel(modelId: Int)(models: List[Model]): Model =
models.filter(_.getMetadata.modelName.toString.contains("ElasticnetWineModel-" + modelId)).head

def constructInputMap(value: AnyRef, signature: ModelSignature): VectorMultimap[String, AnyRef] = {
def constructInputMap(value: Any, signature: ModelSignature): VectorMultimap[String, Any] = {
val names = signature.getSignatureInputs.map(_.signatureName.name)
val data = List.fill(names.length)(value)
VectorMultimap(names.zip(data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ object H2ODataConverter extends LazyLogging {
def inputToTypedModelInput(input: ModelInputData, signature: ModelSignature): ModelInputData =
input.mapValuesWithKeys(wrappedByDefinitionFrom(signature))

private def wrappedByDefinitionFrom(signature: ModelSignature)(key: String, value: AnyRef): AnyRef =
private def wrappedByDefinitionFrom(signature: ModelSignature)(key: String, value: Any): Any =
signature.getInputValueType(SignatureName(key)) match {
case Some(signatureType) => mapValueBySignature(value, signatureType)
case None => throw new IllegalStateException(s"Found data column not defined in signature with name: $key")
}

private def mapValueBySignature(value: AnyRef, signatureType: SignatureType): AnyRef =
private def mapValueBySignature(value: Any, signatureType: SignatureType): Any =
signatureType.typingResult match {
case t: TypingResult if t.canBeSubclassOf(Typed[String]) => wrapStringInput(value)
case t: TypingResult if t.canBeSubclassOf(Typed[Double]) => value
case t: TypingResult => throw new IllegalStateException(s"Found not expected type in signature of H2O model: $t")
}

private def wrapStringInput(input: AnyRef): AnyRef = {
private def wrapStringInput(input: Any): Any = {
input match {
case stringValue: String =>
val beginFixed = if (stringValue.startsWith(STRING_WRAPPER)) stringValue else STRING_WRAPPER + stringValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ case class H2OModelInstance(private val modelWrapper: EasyPredictModelWrapper,
}
}

private def evaluateRow(row: Map[String, AnyRef]): AbstractPrediction = {
private def evaluateRow(row: Map[String, Any]): AbstractPrediction = {
val rowData = new RowData()
rowData.putAll(row.asJava)
rowData.putAll(row.map{case (key, value) => key -> value.asInstanceOf[AnyRef]}.asJava)
val result = modelWrapper.predict(rowData)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ object MLFDataConverter extends LazyLogging {
Dataframe(columns, data)
}

private def isMultimapConvertible(multimap: VectorMultimap[String, AnyRef]): Boolean =
private def isMultimapConvertible(multimap: VectorMultimap[String, Any]): Boolean =
multimap.values.map(_.size).toSet.size <= 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder


case class MLFInputDataTypeWrapper private(typing: TypingResult, dataValue: AnyRef) {
case class MLFInputDataTypeWrapper private(typing: TypingResult, dataValue: Any) {
override def toString: String = s"MLFInputDataTypeWrapper($dataValue: ${typing.display})"
}

Expand All @@ -23,7 +23,7 @@ object MLFInputDataTypeWrapper {
case _ => throw new IllegalArgumentException(s"Unknown mlflow data type wrapper type: ${data.typing}")
}

def apply(signature: ModelSignature, columnName: String, value: AnyRef): MLFInputDataTypeWrapper = {
def apply(signature: ModelSignature, columnName: String, value: Any): MLFInputDataTypeWrapper = {
val columnType = extractColumnType(signature, columnName)
new MLFInputDataTypeWrapper(columnType, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ case class MLFModelInstance(config: MLFConfig,

private val invokeRestClient = MLFInvokeRestClient(config.servedModelsUrl.toString, model)

override protected def runVerified(inputMap: VectorMultimap[String, AnyRef]): ModelRunResult = {
override protected def runVerified(inputMap: VectorMultimap[String, Any]): ModelRunResult = {
val dataframe = MLFDataConverter.inputToDataframe(inputMap, model.getMetadata.signature)
invokeRestClient.invoke(dataframe, model.getMetadata.signature, config.modelLocationStrategy)
.map { response =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ class EncodeInputTest extends UnitTest {
ModelSignature(input, List())
}

private def createInputMultimap(input: List[AnyRef], signature: ModelSignature): VectorMultimap[String, AnyRef] = {
private def createInputMultimap(input: List[AnyRef], signature: ModelSignature): VectorMultimap[String, Any] = {
VectorMultimap(signature.getSignatureInputs.map(_.signatureName.name).zip(input))
}

private def buildMultipleInput(inputs: List[List[AnyRef]], signature: ModelSignature): VectorMultimap[String, AnyRef] = {
private def buildMultipleInput(inputs: List[List[AnyRef]], signature: ModelSignature): VectorMultimap[String, Any] = {
val colValuesList = signature.getSignatureInputs.map(_.signatureName.name).zip(inputs)
val colValues = colValuesList.flatMap { case (col, colValues) => colValues.map((col, _)) }
val colValues = colValuesList.flatMap { case (col, colValues) => colValues.map(v => (col, v.asInstanceOf[Any])) }
VectorMultimap(colValues)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class PMMLModelInstance(private val evaluator: Evaluator,
}
}

def evaluateRow(row: Map[String, AnyRef]): Map[String, _] = {
def evaluateRow(row: Map[String, Any]): Map[String, _] = {
val args = EvaluatorUtil.encodeKeys(row.asJava)
val results = evaluator.evaluate(args)
val decodeResult = EvaluatorUtil.decodeAll(results).asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ProxiedInputModel private(originalModel: Model,

def this(model: Model,
proxiedParams: Iterable[ProxiedModelInputParam],
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]) {
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]) {
this(
model,
CompositeProxiedInputModelName(model),
Expand Down Expand Up @@ -76,7 +76,7 @@ object ProxiedInputModel {

def apply(model: Model,
proxiedParams: Iterable[ProxiedModelInputParam],
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]): ProxiedInputModel =
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]): ProxiedInputModel =
new ProxiedInputModel(model, proxiedParams, compositeProxiedParams)

def apply(model: Model,
Expand All @@ -89,14 +89,14 @@ object ProxiedInputModel {
new ProxiedInputModel(model, transformer)

private def collectRemovedParams(proxiedParams: Iterable[ProxiedModelInputParam],
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]): Iterable[SignatureName] = {
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]): Iterable[SignatureName] = {
val proxiedNames = proxiedParams.map(_.paramName)
val composedProxiedNames = compositeProxiedParams.flatMap(_.proxiedParams)
proxiedNames ++ composedProxiedNames
}

private def filteredTransform(proxiedParams: Iterable[ProxiedModelInputParam],
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]
): FilteredSignatureTransformer =
new FilteredSignatureTransformer(collectRemovedParams(proxiedParams, compositeProxiedParams))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ProxiedInputModelBuilder(private val model: Model) {

protected val params: mutable.Map[SignatureName, ProxiedModelInputParam] = mutable.Map[SignatureName, ProxiedModelInputParam]()

protected val composedParams: mutable.MutableList[ProxiedModelCompositeInputParam[_ <: AnyRef]] = mutable.MutableList()
protected val composedParams: mutable.MutableList[ProxiedModelCompositeInputParam[_ <: Any]] = mutable.MutableList()

def proxyParam(paramName: String)(paramSupplier: ParamSupplier): this.type = {
val signatureName = SignatureName(paramName)
Expand All @@ -21,7 +21,7 @@ class ProxiedInputModelBuilder(private val model: Model) {
this
}

def proxyComposedParam[T <: AnyRef](paramSupplier: ComposedParamsSupplier[T],
def proxyComposedParam[T <: Any](paramSupplier: ComposedParamsSupplier[T],
paramsExtractor: ParamsExtractor[T],
proxiedParams: Iterable[SignatureName]): this.type = {
val createdInputParam = ProxiedModelCompositeInputParam(paramSupplier,paramsExtractor, proxiedParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,33 @@ class ProxiedInputModelInstance(originalModelMetadata: ModelMetadata,
originalModelInstance: ModelInstance,
proxiedModel: ProxiedInputModel,
proxiedParams: Iterable[ProxiedModelInputParam],
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]])
compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]])
extends ModelInstance(proxiedModel) {

override protected def runVerified(inputMap: VectorMultimap[String, AnyRef]): ModelRunResult = {
override protected def runVerified(inputMap: VectorMultimap[String, Any]): ModelRunResult = {
val addInputParams = supplyNonProvidedInputs(inputMap)
val addComposedParams = addInputParams.flatMap(supplyNonProvidedComposedInputs)
addComposedParams.flatMap(originalModelInstance.run)
}

private def supplyNonProvidedInputs(inputMap: VectorMultimap[String, AnyRef]): Future[VectorMultimap[String, AnyRef]] =
private def supplyNonProvidedInputs(inputMap: VectorMultimap[String, Any]): Future[VectorMultimap[String, Any]] =
Future.sequence(
proxiedParams
.filter(inputsToBeReplacedIn(inputMap))
.map(_.supplyParamValue(originalModelMetadata))
).map(addExtraInputsTo(inputMap))

private def supplyNonProvidedComposedInputs(inputMap: VectorMultimap[String, AnyRef]): Future[VectorMultimap[String, AnyRef]] =
private def supplyNonProvidedComposedInputs(inputMap: VectorMultimap[String, Any]): Future[VectorMultimap[String, Any]] =
Future.sequence(compositeProxiedParams
.map(_.supplyCompositeParamValues(originalModelMetadata))
)
.map(_.foldLeft(inputMap) { (acc, composedValues) =>
addExtraInputsTo(acc)(composedValues)
})

private def inputsToBeReplacedIn(inputMap: VectorMultimap[String, AnyRef]): ProxiedModelInputParam => Boolean =
private def inputsToBeReplacedIn(inputMap: VectorMultimap[String, Any]): ProxiedModelInputParam => Boolean =
param => !inputMap.containsKey(param.paramName.name)

private def addExtraInputsTo(inputMap: VectorMultimap[String, AnyRef])(extraInputs: Iterable[(String, AnyRef)]): VectorMultimap[String, AnyRef] =
private def addExtraInputsTo(inputMap: VectorMultimap[String, Any])(extraInputs: Iterable[(String, Any)]): VectorMultimap[String, Any] =
extraInputs.foldLeft(inputMap) { (acc, value) => acc.add(value._1, value._2) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait ModelsProxySpec extends UnitTest
val sampleInput = VectorMultimap(
("age", "4"),
("category", "es_transportation"),
).mapValues(_.asInstanceOf[AnyRef])
).mapValues(_.asInstanceOf[Any])

val response = Await.result(instance.run(sampleInput), awaitTimeout)
assertRunResult(response)
Expand Down Expand Up @@ -57,7 +57,7 @@ trait ModelsProxySpec extends UnitTest
val sampleInput = VectorMultimap(
("age", "4"),
("category", "es_transportation"),
).mapValues(_.asInstanceOf[AnyRef])
).mapValues(_.asInstanceOf[Any])

val response = Await.result(instance.run(sampleInput), awaitTimeout)
assertRunResult(response)
Expand All @@ -75,7 +75,7 @@ trait ModelsProxySpec extends UnitTest
(s"${tableName}_id", 1),
("age", "4"),
("category", "es_transportation"),
).mapValues(_.asInstanceOf[AnyRef])
)

val response = Await.result(instance.run(sampleInput), awaitTimeout)
assertRunResult(response)
Expand Down
Loading

0 comments on commit 4f46e4a

Please sign in to comment.