Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an edge case when repeating or(deferred, signalDeferred).await() #269

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions .github/workflows/engine-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,31 @@ on:
pull_request:
branches:
- main
paths:
- ".github/workflows/engine-ci.yml"
- "infinitic-*/**"

jobs:
build:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout Repository
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Setup Java 17
uses: actions/setup-java@v1
uses: actions/setup-java@v3
with:
java-version: 17
distribution: 'adopt'

- name: Assemble with Gradle
run: ./gradlew build --no-daemon -x test -x spotlessCheck --parallel --max-workers=8 # <= -d used to debug if needed
timeout-minutes: 5 # max time allocated (useful if some tests hang)

test:
runs-on: ubuntu-latest

steps:
- name: Checkout Repository
uses: actions/checkout@v2

- name: Setup Java 17
uses: actions/setup-java@v1
- name: Cache Gradle packages
uses: gradle/actions/setup-gradle@v3
with:
java-version: 17
gradle-version: wrapper
cache-read-only: false

- name: Test with Gradle
run: ./gradlew test --no-daemon --parallel --max-workers=8 # <= -d used to debug if needed
run: ./gradlew test --info --scan # <= -d used to debug if needed
timeout-minutes: 12 # max time allocated (useful if some tests hang)

- name: Publish Build Scan
if: success()
run: echo "Build Scan URL is ${{ steps.build.outputs.build-scan-url }}"
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Ci.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object Ci {
private const val SNAPSHOT = "-SNAPSHOT"

// base version number
private const val BASE = "0.16.0"
private const val BASE = "0.16.1"

// GitHub run number
private val githubRunNumber = System.getenv("GITHUB_RUN_NUMBER")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,17 @@ data class SerializedData(

fun toJson() = json.parseToJsonElement(toJsonString())

fun toJsonString(): String = when (dataType) {
fun toJsonString(encodeAvro: Boolean = true): String = when (dataType) {
SerializedDataType.NULL -> "null"
SerializedDataType.JSON,
SerializedDataType.JSON_JACKSON,
SerializedDataType.JSON_KOTLIN -> String(bytes, Charsets.UTF_8)

SerializedDataType.AVRO_WITH_SCHEMA -> JsonPrimitive(
Base64.getEncoder().encodeToString(bytes),
when (encodeAvro) {
true -> Base64.getEncoder().encodeToString(bytes)
false -> decodeAvroWithSchemaFingerprint().toString()
},
).toString()
}

Expand All @@ -185,7 +188,7 @@ data class SerializedData(

/** Readable version */
override fun toString() = mapOf(
"bytes" to toJsonString().replace("\n", ""),
"bytes" to toJsonString(false).replace("\n", ""),
"type" to dataType,
"meta" to meta.mapValues { String(it.value) },
).toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import io.infinitic.common.workflows.data.commands.CommandStatus.Unknown
import io.infinitic.common.workflows.data.commands.PastCommand
import io.infinitic.common.workflows.data.commands.ReceiveSignalPastCommand
import io.infinitic.common.workflows.data.workflowTasks.WorkflowTaskIndex
import io.infinitic.exceptions.workflows.OutOfBoundAwaitException
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
Expand All @@ -64,9 +63,6 @@ sealed class Step {
// increase wait index and update current status
abstract fun nextAwaitIndex()

// check wait index is valid
abstract fun checkAwaitIndex()

/** hash provides a unique hash linked to the structure of the step (excluding commandStatus) */
abstract fun hash(): StepHash

Expand Down Expand Up @@ -139,24 +135,18 @@ sealed class Step {
is StepStatus.Completed -> true
}

override fun checkAwaitIndex() {
// user is asking more than the limit, we consider it as a failure
if (commandStatuses != null &&
commandStatusLimit != null &&
awaitIndex >= commandStatusLimit!!) {
throw OutOfBoundAwaitException
}
}

override fun nextAwaitIndex() {
awaitIndex++

// update commandStatus if needed
if (commandStatuses != null) {
// update current status
commandStatus = commandStatuses!!.firstOrNull {
(it is Completed) && (it.returnIndex == awaitIndex)
} ?: Ongoing
// if there is no limit, or there is a limit but not yet reached
if (commandStatusLimit == null || awaitIndex + 1 < commandStatusLimit!!) {
awaitIndex++

// update commandStatus if needed
// update current status
commandStatus = commandStatuses!!.firstOrNull {
(it is Completed) && (it.returnIndex == awaitIndex)
} ?: Ongoing
}
}
}

Expand Down Expand Up @@ -214,7 +204,7 @@ sealed class Step {
is StepStatus.Completed -> status.returnValue.deserialize(
returnValueType,
returnValueJsonViewClass,
)
).also { nextAwaitIndex() }

else -> thisShouldNotHappen(status.toString())
}
Expand All @@ -235,10 +225,6 @@ sealed class Step {
override fun isTerminatedAt(index: WorkflowTaskIndex) =
steps.all { it.isTerminatedAt(index) }

override fun checkAwaitIndex() {
steps.map { it.checkAwaitIndex() }
}

override fun nextAwaitIndex() {
steps.map { it.nextAwaitIndex() }
}
Expand Down Expand Up @@ -300,10 +286,6 @@ sealed class Step {
override fun isTerminatedAt(index: WorkflowTaskIndex) =
steps.any { it.isTerminatedAt(index) }

override fun checkAwaitIndex() {
steps.map { it.checkAwaitIndex() }
}

override fun nextAwaitIndex() {
steps.map { it.nextAwaitIndex() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ data object MultipleCustomIdException : WorkflowUserException(msg = "", help = "
private fun readResolve(): Any = MultipleCustomIdException
}

data object OutOfBoundAwaitException : WorkflowUserException(msg = "", help = "") {
private fun readResolve(): Any = OutOfBoundAwaitException
}

class NonIdempotentChannelGetterException(workflow: String, method: String) : WorkflowUserException(
msg =
"in workflow $workflow, method $method should return the same object when called multiple times",
Expand Down
Loading
Loading