Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: EXPOSED-694 Entities insertion could fail if batches have different column sets #2365

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,15 @@ class EntityCache(private val transaction: Transaction) {

@Suppress("TooGenericExceptionCaught")
internal fun flushInserts(table: IdTable<*>) {
var toFlush: List<Entity<*>> = inserts.remove(table)?.toList().orEmpty()
while (toFlush.isNotEmpty()) {
val partition = toFlush.partition { entity ->
entity.writeValues.none {
val (key, value) = it
key.referee == table.id && value is EntityID<*> && value._value == null
}
}
toFlush = partition.first
var entitiesToInsert = inserts.remove(table)?.toList().orEmpty()

while (entitiesToInsert.isNotEmpty()) {
val (currentBatch, nextBatch) = partitionEntitiesForInsert(entitiesToInsert, table)
entitiesToInsert = nextBatch

val ids = try {
executeAsPartOfEntityLifecycle {
table.batchInsert(toFlush) { entry ->
table.batchInsert(currentBatch) { entry ->
for ((c, v) in entry.writeValues) {
this[c] = v
}
Expand All @@ -250,16 +247,15 @@ class EntityCache(private val transaction: Transaction) {
// this try/catch should help to get information about the flaky test.
// try/catch can be safely removed after the fixing the issue
// TooGenericExceptionCaught suppress also can be removed
val toFlushString = toFlush.joinToString("; ") {
entry ->
val toFlushString = currentBatch.joinToString("; ") { entry ->
entry.writeValues.map { writeValue -> "${writeValue.key.name}=${writeValue.value}" }.joinToString { ", " }
}

exposedLogger.error("ArrayIndexOutOfBoundsException on attempt to make flush inserts. Table: ${table.tableName}, entries: ($toFlushString)", cause)
throw cause
}

for ((entry, genValues) in toFlush.zip(ids)) {
for ((entry, genValues) in currentBatch.zip(ids)) {
if (entry.id._value == null) {
val id = genValues[table.id]
entry.id._value = id._value
Expand All @@ -274,12 +270,30 @@ class EntityCache(private val transaction: Transaction) {
transaction.registerChange(entry.klass, entry.id, EntityChangeType.Created)
pendingInitializationLambdas[entry]?.forEach { it(entry) }
}

toFlush = partition.second
}
transaction.alertSubscribers()
}

/**
* That method places the entities with different `writeValues` column sets into different partitions.
* It prevents the issues with inconsistent batch insert statement.
*
* The entities that have referee in the same table and these referee are not created yet
* are also put into the second partition
*/
private fun partitionEntitiesForInsert(entities: Collection<Entity<*>>, table: IdTable<*>): Pair<List<Entity<*>>, List<Entity<*>>> {
val firstEntityColumns = entities.first().writeValues.keys
return entities.partition { entity ->
val refereeFromSameTableAlreadyCreated = entity.writeValues.none { (key, value) ->
key.referee == table.id && value is EntityID<*> && value._value == null
}

val columnSetAlignedWithFirstEntity = entity.writeValues.keys == firstEntityColumns

refereeFromSameTableAlreadyCreated && columnSetAlignedWithFirstEntity
}
}

/**
* Clears this [EntityCache] of all stored data, including any reference mappings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package org.jetbrains.exposed.sql.tests.shared.entities
import org.jetbrains.exposed.dao.IntEntity
import org.jetbrains.exposed.dao.IntEntityClass
import org.jetbrains.exposed.dao.entityCache
import org.jetbrains.exposed.dao.flushCache
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.tests.DatabaseTestsBase
Expand Down Expand Up @@ -170,4 +173,46 @@ class EntityCacheTests : DatabaseTestsBase() {
assertEquals(entity, TestEntity.testCache(entity.id))
}
}

object TableWithDefaultValue : IdTable<Int>() {
val value = integer("value")
val valueWithDefault = integer("valueWithDefault")
.default(10)

override val id: Column<EntityID<Int>> = integer("id")
.clientDefault { Random.nextInt() }
.entityId()

override val primaryKey: PrimaryKey = PrimaryKey(id)
}

class TableWithDefaultValueEntity(id: EntityID<Int>) : IntEntity(id) {
var value by TableWithDefaultValue.value

var valueWithDefault by TableWithDefaultValue.valueWithDefault

companion object : IntEntityClass<TableWithDefaultValueEntity>(TableWithDefaultValue)
}

@Test
fun entitiesWithDifferentAmountOfFieldsCouldBeCreated() {
withTables(TableWithDefaultValue) {
TableWithDefaultValueEntity.new {
value = 1
}
TableWithDefaultValueEntity.new {
value = 2
valueWithDefault = 1
}

// It's the key flush. It must not fail with inconsistent batch insert statement.
// The table also should have client side default value. Otherwise the `writeValues`
// would be extended with default values inside `EntityClass::new()` method.
flushCache()
entityCache.clear()

val entity = TableWithDefaultValueEntity.find { TableWithDefaultValue.value eq 1 }.first()
assertEquals(10, entity.valueWithDefault)
}
}
}
Loading