Skip to content

Commit

Permalink
Merge pull request #1235 from modelix/MODELIX-1050-Deadlock-in-model-…
Browse files Browse the repository at this point in the history
…server

fix(model-server): deadlock caused by non-existing lock ordering
  • Loading branch information
slisson authored Dec 12, 2024
2 parents 3556438 + cfebf70 commit f3b4e9f
Show file tree
Hide file tree
Showing 37 changed files with 1,533 additions and 624 deletions.
10 changes: 7 additions & 3 deletions model-server/src/main/kotlin/org/modelix/model/server/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.modelix.model.server.handlers.ui.RepositoryOverview
import org.modelix.model.server.store.IgniteStoreClient
import org.modelix.model.server.store.InMemoryStoreClient
import org.modelix.model.server.store.IsolatingStore
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.forGlobalRepository
import org.modelix.model.server.store.loadDump
import org.modelix.model.server.store.writeDump
Expand Down Expand Up @@ -149,9 +150,12 @@ object Main {
}
var i = 0
val globalStoreClient = storeClient.forGlobalRepository()
while (i < cmdLineArgs.setValues.size) {
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
i += 2
@OptIn(RequiresTransaction::class)
globalStoreClient.getTransactionManager().runWrite {
while (i < cmdLineArgs.setValues.size) {
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
i += 2
}
}
val repositoriesManager = RepositoriesManager(storeClient)
val modelServer = KeyValueLikeModelServer(repositoriesManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.ktor.server.application.call
import io.ktor.server.response.respondText
import io.ktor.util.pipeline.PipelineContext
import org.modelix.model.server.handlers.KeyValueLikeModelServer.Companion.PROTECTED_PREFIX
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager

class HealthApiImpl(
Expand All @@ -25,9 +26,12 @@ class HealthApiImpl(

private fun isHealthy(): Boolean {
val store = stores.getGlobalStoreClient()
val value = toLong(store[HEALTH_KEY]) + 1
store.put(HEALTH_KEY, java.lang.Long.toString(value))
return toLong(store[HEALTH_KEY]) >= value
@OptIn(RequiresTransaction::class)
return store.getTransactionManager().runWrite {
val value = toLong(store[HEALTH_KEY]) + 1
store.put(HEALTH_KEY, java.lang.Long.toString(value))
toLong(store[HEALTH_KEY]) >= value
}
}

private fun toLong(value: String?): Long {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import org.modelix.model.lazy.CLVersion
import org.modelix.model.lazy.IDeserializingKeyValueStore
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.ITransactionManager
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager

interface IRepositoriesManager {
Expand All @@ -18,31 +20,38 @@ interface IRepositoriesManager {
* If the server ID was created previously but is only stored under a legacy database key,
* it also gets stored under the current and all legacy database keys.
*/
suspend fun maybeInitAndGetSeverId(): String
@RequiresTransaction
fun maybeInitAndGetSeverId(): String

@RequiresTransaction
fun getRepositories(): Set<RepositoryId>
suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion
suspend fun removeRepository(repository: RepositoryId): Boolean

fun getBranches(repositoryId: RepositoryId): Set<BranchReference>
@RequiresTransaction
fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion

suspend fun removeBranches(repository: RepositoryId, branchNames: Set<String>)
@RequiresTransaction
fun removeRepository(repository: RepositoryId): Boolean

@RequiresTransaction
fun getBranches(repositoryId: RepositoryId): Set<BranchReference>

/**
* Same as [removeBranches] but blocking.
* Caller is expected to execute it outside the request thread.
*/
fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set<String>)
suspend fun getVersion(branch: BranchReference): CLVersion?
suspend fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?
suspend fun getVersionHash(branch: BranchReference): String?
@RequiresTransaction
fun removeBranches(repository: RepositoryId, branchNames: Set<String>)

@RequiresTransaction
fun getVersion(branch: BranchReference): CLVersion?
fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?

@RequiresTransaction
fun getVersionHash(branch: BranchReference): String?
suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String
suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String

/**
* Same as [mergeChanges] but blocking.
* Caller is expected to execute it outside the request thread.
*/
fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String
@RequiresTransaction
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData

/**
Expand All @@ -54,14 +63,16 @@ interface IRepositoriesManager {
fun isIsolated(repository: RepositoryId): Boolean?

fun getStoreManager(): StoreManager
fun getTransactionManager(): ITransactionManager
}

@RequiresTransaction
fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set<String> {
return getBranches(repositoryId).map { it.branchName }.toSet()
}

fun IRepositoriesManager.getStoreClient(repository: RepositoryId?): IStoreClient {
return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false })
fun IRepositoriesManager.getStoreClient(repository: RepositoryId?, immutable: Boolean): IStoreClient {
return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false }, immutable)
}

fun IRepositoriesManager.getAsyncStore(repository: RepositoryId?): IAsyncObjectStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import org.modelix.authorization.getUserName
import org.modelix.authorization.requiresLogin
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.runReadIO

/**
* Implementation of the REST API that is responsible for handling client and server IDs.
Expand All @@ -24,7 +26,11 @@ class IdsApiImpl(
//
// Functionally, it does not matter if the server ID is created eagerly or lazily,
// as long as the same server ID is returned from the same server.
val serverId = repositoriesManager.maybeInitAndGetSeverId()
val serverId =
@OptIn(RequiresTransaction::class)
repositoriesManager.getTransactionManager().runReadIO {
repositoriesManager.maybeInitAndGetSeverId()
}
call.respondText(serverId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import io.ktor.server.resources.put
import io.ktor.server.response.respondText
import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import kotlinx.coroutines.runBlocking
import kotlinx.html.br
import kotlinx.html.div
import kotlinx.html.h1
Expand All @@ -29,9 +28,11 @@ import org.modelix.model.lazy.BranchReference
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.ModelServerPermissionSchema
import org.modelix.model.server.store.ObjectInRepository
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager
import org.modelix.model.server.store.pollEntry
import org.modelix.model.server.store.runTransactionSuspendable
import org.modelix.model.server.store.runReadIO
import org.modelix.model.server.store.runWriteIO
import org.modelix.model.server.templates.PageWithMenuBar
import java.io.IOException
import java.util.*
Expand Down Expand Up @@ -61,7 +62,8 @@ class KeyValueLikeModelServer(
// request to initialize it lazily, would make the code less robust.
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
// the special conditions in the affected requests to be updated.
runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
@OptIn(RequiresTransaction::class)
repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() }
application.apply {
modelServerModule()
}
Expand Down Expand Up @@ -89,7 +91,8 @@ class KeyValueLikeModelServer(
get<Paths.getKeyGet> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.READ)
val value = stores.getGlobalKeyValueStore()[key]
@OptIn(RequiresTransaction::class)
val value = runRead { stores.getGlobalStoreClient()[key] }
respondValue(key, value)
}
get<Paths.pollKeyGet> {
Expand All @@ -106,21 +109,25 @@ class KeyValueLikeModelServer(
post<Paths.counterKeyPost> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.WRITE)
val value = stores.getGlobalStoreClient().generateId(key)
val value = stores.getGlobalStoreClient(false).generateId(key)
call.respondText(text = value.toString())
}

get<Paths.getRecursivelyKeyGet> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.READ)
call.respondText(collect(key, this).toString(2), contentType = ContentType.Application.Json)
@OptIn(RequiresTransaction::class)
call.respondText(runRead { collect(key, this) }.toString(2), contentType = ContentType.Application.Json)
}

put<Paths.putKeyPut> {
val key = call.parameters["key"]!!
val value = call.receiveText()
try {
putEntries(mapOf(key to value))
@OptIn(RequiresTransaction::class)
runWrite {
putEntries(mapOf(key to value))
}
call.respondText("OK")
} catch (e: NotFoundException) {
throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e)
Expand All @@ -139,7 +146,10 @@ class KeyValueLikeModelServer(
}
entries = sortByDependency(entries)
try {
putEntries(entries)
@OptIn(RequiresTransaction::class)
runWrite {
putEntries(entries)
}
call.respondText(entries.size.toString() + " entries written")
} catch (e: NotFoundException) {
throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e)
Expand All @@ -158,7 +168,8 @@ class KeyValueLikeModelServer(
checkKeyPermission(key, EPermissionType.READ)
keys.add(key)
}
val values = stores.getGlobalStoreClient().getAll(keys)
@OptIn(RequiresTransaction::class)
val values = runRead { stores.getGlobalStoreClient(false).getAll(keys) }
for (i in keys.indices) {
val respEntry = JSONObject()
respEntry.put("key", keys[i])
Expand Down Expand Up @@ -199,6 +210,7 @@ class KeyValueLikeModelServer(
return sorted
}

@RequiresTransaction
fun collect(rootKey: String, callContext: CallContext?): JSONArray {
val result = JSONArray()
val processed: MutableSet<String> = HashSet()
Expand All @@ -210,7 +222,7 @@ class KeyValueLikeModelServer(
if (callContext != null) {
keys.forEach { callContext.checkKeyPermission(it, EPermissionType.READ) }
}
val values = stores.getGlobalStoreClient().getAll(keys)
val values = stores.getGlobalStoreClient(false).getAll(keys)
for (i in keys.indices) {
val key = keys[i]
val value = values[i]
Expand Down Expand Up @@ -240,7 +252,8 @@ class KeyValueLikeModelServer(
return result
}

private suspend fun CallContext.putEntries(newEntries: Map<String, String?>) {
@RequiresTransaction
private fun CallContext.putEntries(newEntries: Map<String, String?>) {
val referencedKeys: MutableSet<String> = HashSet()
for ((key, value) in newEntries) {
checkKeyPermission(key, EPermissionType.WRITE)
Expand Down Expand Up @@ -300,17 +313,15 @@ class KeyValueLikeModelServer(
// We could try to move the objects later, but since this API is deprecated, it's not worth the effort.
}

stores.getGlobalStoreClient().runTransactionSuspendable {
stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) })
stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) })
for ((branch, value) in branchChanges) {
if (value == null) {
checkPermission(ModelServerPermissionSchema.branch(branch).delete)
repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName))
} else {
checkPermission(ModelServerPermissionSchema.branch(branch).push)
repositoriesManager.mergeChangesBlocking(branch, value)
}
stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) })
stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) })
for ((branch, value) in branchChanges) {
if (value == null) {
checkPermission(ModelServerPermissionSchema.branch(branch).delete)
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
} else {
checkPermission(ModelServerPermissionSchema.branch(branch).push)
repositoriesManager.mergeChanges(branch, value)
}
}
}
Expand Down Expand Up @@ -363,4 +374,12 @@ class KeyValueLikeModelServer(
else -> unknown()
}
}

private suspend fun <R> runRead(body: () -> R): R {
return repositoriesManager.getTransactionManager().runReadIO(body)
}

private suspend fun <R> runWrite(body: () -> R): R {
return repositoriesManager.getTransactionManager().runWriteIO(body)
}
}
Loading

0 comments on commit f3b4e9f

Please sign in to comment.