Skip to content

Commit

Permalink
Merge pull request #725 from modelix/feature/modelix-630-adaptions
Browse files Browse the repository at this point in the history
Changes in modelix to make MODELIX-630 work
  • Loading branch information
benedekh authored Jun 4, 2024
2 parents 73402ad + 3b24507 commit 29cfda7
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,44 @@ import org.modelix.model.operations.OTBranch

/**
* Dispose should be called on this, as otherwise a regular polling will go on.
*
* @property client the model client to connect to the model server
* @property branchRef the model server branch to fetch the data from
* @property providedScope the CoroutineScope to use for the suspendable tasks
* @property initialRemoteVersion the last version on the server from which we want to start the synchronization
*/
class ReplicatedModel(
val client: IModelClientV2,
val branchRef: BranchReference,
private val providedScope: CoroutineScope? = null,
initialRemoteVersion: CLVersion? = null,
) {
private val scope = providedScope ?: CoroutineScope(Dispatchers.Default)
private var state = State.New
private lateinit var localModel: LocalModel
private val remoteVersion = RemoteVersion(client, branchRef)
private var localModel: LocalModel? = null
private val remoteVersion = RemoteVersion(client, branchRef, initialRemoteVersion)
private var pollingJob: Job? = null

init {
if (initialRemoteVersion != null) {
localModel = LocalModel(initialRemoteVersion, client.getIdGenerator()) { client.getUserId() }
}
}

private fun getLocalModel(): LocalModel = checkNotNull(localModel) { "Model is not initialized yet" }

fun getBranch(): IBranch {
if (state != State.Started) throw IllegalStateException("state is $state")
return localModel.otBranch
return getLocalModel().otBranch
}

suspend fun start(): IBranch {
if (state != State.New) throw IllegalStateException("already started")
state = State.Starting

val initialVersion = remoteVersion.pull()
localModel = LocalModel(initialVersion, client.getIdGenerator(), { client.getUserId() })
if (localModel == null) {
val initialVersion = remoteVersion.pull()
localModel = LocalModel(initialVersion, client.getIdGenerator()) { client.getUserId() }
}

// receive changes from the server
pollingJob = scope.launch {
Expand All @@ -66,7 +81,7 @@ class ReplicatedModel(
}
}

localModel.rawBranch.addListener(object : IBranchListener {
getLocalModel().rawBranch.addListener(object : IBranchListener {
override fun treeChanged(oldTree: ITree?, newTree: ITree) {
if (isDisposed()) return
scope.launch {
Expand All @@ -80,7 +95,7 @@ class ReplicatedModel(
}

suspend fun resetToServerVersion() {
localModel.resetToVersion(client.pull(branchRef, lastKnownVersion = null).upcast())
getLocalModel().resetToVersion(client.pull(branchRef, lastKnownVersion = null).upcast())
}

fun isDisposed(): Boolean = state == State.Disposed
Expand All @@ -102,11 +117,11 @@ class ReplicatedModel(
if (isDisposed()) return

val mergedVersion = try {
localModel.mergeRemoteVersion(newRemoteVersion)
getLocalModel().mergeRemoteVersion(newRemoteVersion)
} catch (ex: Exception) {
val currentLocalVersion = localModel.getCurrentVersion()
val currentLocalVersion = getLocalModel().getCurrentVersion()
LOG.warn(ex) { "Failed to merge remote version $newRemoteVersion into local version $currentLocalVersion. Resetting to remote version." }
localModel.resetToVersion(newRemoteVersion)
getLocalModel().resetToVersion(newRemoteVersion)
newRemoteVersion
}

Expand All @@ -121,15 +136,15 @@ class ReplicatedModel(
private suspend fun pushLocalChanges() {
if (isDisposed()) return

val version = localModel.createNewLocalVersion() ?: localModel.getCurrentVersion()
val version = getLocalModel().createNewLocalVersion() ?: getLocalModel().getCurrentVersion()
val received = remoteVersion.push(version)
if (received.getContentHash() != version.getContentHash()) {
remoteVersionReceived(received)
}
}

suspend fun getCurrentVersion(): CLVersion {
return localModel.getCurrentVersion()
return getLocalModel().getCurrentVersion()
}

private enum class State {
Expand Down Expand Up @@ -250,8 +265,11 @@ private class LocalModel(initialVersion: CLVersion, val idGenerator: IIdGenerato
}
}

private class RemoteVersion(val client: IModelClientV2, val branchRef: BranchReference) {
private var lastKnownRemoteVersion: CLVersion? = null
private class RemoteVersion(
val client: IModelClientV2,
val branchRef: BranchReference,
private var lastKnownRemoteVersion: CLVersion? = null,
) {
private val unconfirmedVersions: MutableSet<String> = LinkedHashSet()

fun getNumberOfUnconfirmed() = runSynchronized(unconfirmedVersions) { unconfirmedVersions.size }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright (c) 2024.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.modelix.model.server

import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.server.testing.testApplication
import io.ktor.util.reflect.instanceOf
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.junit.Assert.assertFalse
import org.modelix.authorization.installAuthentication
import org.modelix.model.api.ChildLinkFromName
import org.modelix.model.api.ConceptReference
import org.modelix.model.api.IBranch
import org.modelix.model.api.ITree
import org.modelix.model.api.PBranch
import org.modelix.model.api.getRootNode
import org.modelix.model.client2.ModelClientV2
import org.modelix.model.client2.ReplicatedModel
import org.modelix.model.lazy.BranchReference
import org.modelix.model.lazy.CLTree
import org.modelix.model.lazy.CLVersion
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.operations.OTBranch
import org.modelix.model.server.handlers.ModelReplicationServer
import org.modelix.model.server.store.InMemoryStoreClient
import org.modelix.model.server.store.forContextRepository
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class ReplicatedModelTest {

@Test
fun startsWithLatestVersion() = runTest {
val client = createModelClient()
val repositoryId = RepositoryId(UUID.randomUUID().toString())

// Step 1: prepare repository with two versions beside the initial version
// Step 1.1: create an empty repository
val initialVersion = client.initRepository(repositoryId) as CLVersion

// Step 1.2: add a new child node to get a new version
val defaultBranchReference = repositoryId.getBranchReference()
addHelloChild(initialVersion, client, defaultBranchReference)

// Step 2: in a new client, fetch the latest repository data
val newClient = createModelClient()
// we do not provide an initial version, so we expect to fetch the latest one (with one "hello" child)
val scope = CoroutineScope(Dispatchers.Default)
val replicatedModel = ReplicatedModel(newClient, defaultBranchReference, scope)
try {
replicatedModel.getBranch()
// if we get here, then we missed an expected exception
assertFalse(true)
} catch (ex: Exception) {
/*
Expected exception, because we did not specify an initial version.
So without an explicit start we do not expect anything useful here.
*/
assertTrue(ex.instanceOf(IllegalStateException::class))
}

val branch = replicatedModel.start()
// Step 3: wait a bit so replicated model can fetch the new versions from the server
waitUntilChildArrives(branch, scope, 500)

// Step 4: check, eventually we must have the one "hello" child
val children = getHelloChildrenOfRootNode(branch)
assertEquals(1, children.size)
}

@Test
fun startsWithSpecificVersion() = runTest {
val client = createModelClient()
val repositoryId = RepositoryId(UUID.randomUUID().toString())
val defaultBranchReference = repositoryId.getBranchReference()

// Step 1: prepare repository with two versions beside the initial version
// Step 1.1: create an empty repository
val initialVersion = client.initRepository(repositoryId) as CLVersion

// Step 1.2: add a new child node to get a new version
addHelloChild(initialVersion, client, defaultBranchReference)

// Step 2: in a new client, fetch the oneHelloChildVersion
val newClient = createModelClient()

// Step 2.1: to avoid version was not created by this client exception
val initialVersionClone = newClient.loadVersion(repositoryId, initialVersion.getContentHash(), null)

val scope = CoroutineScope(Dispatchers.Default)
// Step 2.2: we provide an initial version, so we expect to fetch it first (0 "hello" child)
val replicatedModel = ReplicatedModel(
newClient,
defaultBranchReference,
providedScope = scope,
initialRemoteVersion = initialVersionClone as CLVersion,
)
val branch = replicatedModel.getBranch()

// Step 3: check, here we must have 0 "hello" child
val emptyChildren = getHelloChildrenOfRootNode(branch)
assertTrue(emptyChildren.isEmpty())

replicatedModel.start()
// Step 4: wait a bit so replicated model can fetch the new versions from the server
waitUntilChildArrives(branch, scope, 500)

// Step 5: check, eventually we must have 1 "hello" child
val children = getHelloChildrenOfRootNode(branch)
assertEquals(1, children.size)
}

private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication {
application {
installAuthentication(unitTestMode = true)
installDefaultServerPlugins()
ModelReplicationServer(InMemoryStoreClient().forContextRepository()).init(this)
}
block()
}

private fun waitUntilChildArrives(branch: IBranch, scope: CoroutineScope, timeout: Long) {
val barrier = CountDownLatch(1)
scope.launch {
var childArrived = false
while (!childArrived) {
childArrived = getHelloChildrenOfRootNode(branch).isNotEmpty()
}
barrier.countDown()
}
// wait at most timeout ms for the child to arrive
barrier.await(timeout, TimeUnit.MILLISECONDS)
}

private suspend fun addHelloChild(
baseVersion: CLVersion,
client: ModelClientV2,
branchReference: BranchReference,
): CLVersion {
val branch =
OTBranch(
PBranch(baseVersion.getTree(), client.getIdGenerator()),
client.getIdGenerator(),
baseVersion.store,
)
branch.runWriteT { t ->
t.addNewChild(ITree.ROOT_ID, "hello", -1, null as ConceptReference?)
}
val (ops, newTree) = branch.getPendingChanges()
val newVersion = CLVersion.createRegularVersion(
client.getIdGenerator().generate(),
null,
null,
newTree as CLTree,
baseVersion,
ops.map { it.getOriginalOp() }.toTypedArray(),
)
return client.push(branchReference, newVersion, baseVersion) as CLVersion
}

private fun getHelloChildrenOfRootNode(branch: IBranch) =
branch.computeReadT { it.branch.getRootNode().getChildren(ChildLinkFromName("hello")).toList() }
}

0 comments on commit 29cfda7

Please sign in to comment.