From bb9fa46d0ebd7e42dcd9207cee3bf52e97c08e25 Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 11:41:45 +0900 Subject: [PATCH 01/10] fixing dirty read occurence problem at update index: WIP. --- lib/execution/executors/update_executor.go | 20 ++-------- lib/storage/access/table_heap.go | 5 +-- lib/storage/access/table_page.go | 3 -- lib/storage/index/skip_list_index.go | 44 +++++++++++++++++----- lib/storage/index/uniq_skip_list_index.go | 40 +++++++++++++++----- 5 files changed, 70 insertions(+), 42 deletions(-) diff --git a/lib/execution/executors/update_executor.go b/lib/execution/executors/update_executor.go index 41435ec4..f9e188da 100644 --- a/lib/execution/executors/update_executor.go +++ b/lib/execution/executors/update_executor.go @@ -71,7 +71,7 @@ func (e *UpdateExecutor) Next() (*tuple.Tuple, Done, error) { common.NewRIDAtNormal = true } - if !is_updated && updateErr != access.ErrPartialUpdate { + if !is_updated && updateErr != nil { err_ := errors.New("tuple update failed. PageId:SlotNum = " + string(rid.GetPageId()) + ":" + fmt.Sprint(rid.GetSlotNum())) e.txn.SetState(access.ABORTED) return nil, false, err_ @@ -86,27 +86,13 @@ func (e *UpdateExecutor) Next() (*tuple.Tuple, Done, error) { } else { index_ := ret if updateIdxs == nil || samehada_util.IsContainList[int](updateIdxs, ii) { - if updateErr == access.ErrPartialUpdate { - // when tuple is moved page location on update, RID is changed to new value - // removing index entry is done at commit phase because delete operation uses marking technique - - fmt.Println("DeleteEntry due to ErrPartialUpdate occured.") - index_.DeleteEntry(t, *rid, e.txn) - - // do nothing - } else if new_rid != nil { + if new_rid != nil { index_.UpdateEntry(t, *rid, updateTuple, *new_rid, e.txn) } else { index_.UpdateEntry(t, *rid, updateTuple, *rid, e.txn) } } else { - if updateErr == access.ErrPartialUpdate { - // when tuple is moved page location on update, RID is changed to new value - // removing index entry is done at commit phase because delete operation uses marking technique - - fmt.Println("DeleteEntry due to ErrPartialUpdate occured.") - index_.DeleteEntry(t, *rid, e.txn) - } else if new_rid != nil { + if new_rid != nil { index_.UpdateEntry(t, *rid, updateTuple, *new_rid, e.txn) } else { // do nothing diff --git a/lib/storage/access/table_heap.go b/lib/storage/access/table_heap.go index 8f58a6c9..a65d1311 100644 --- a/lib/storage/access/table_heap.go +++ b/lib/storage/access/table_heap.go @@ -197,10 +197,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche var err2 error = nil _, err2 = t.InsertTuple(need_follow_tuple, txn, oid) if err2 != nil { - fmt.Println("TableHeap::UpdateTuple(): InsertTuple failed") - txn.SetState(ABORTED) - //txn.AddIntoWriteSet(NewWriteRecord(&rid, nil, DELETE, old_tuple, nil, t, oid)) - return false, nil, ErrPartialUpdate, nil, old_tuple + panic("InsertTuple does not fail on normal system condition!!!") } // change return values to success diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 37235353..39cf4550 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -36,9 +36,6 @@ const ErrNotEnoughSpace = errors.Error("there is not enough space.") const ErrSelfDeletedCase = errors.Error("encont self deleted tuple1.") const ErrGeneral = errors.Error("some error is occured!") -// delete and insert are needed, but delete is only succeeded case -const ErrPartialUpdate = errors.Error("update with new rid is succeeded partially") - // Slotted page format: // // --------------------------------------------------------- diff --git a/lib/storage/index/skip_list_index.go b/lib/storage/index/skip_list_index.go index 9e317363..1e304fff 100644 --- a/lib/storage/index/skip_list_index.go +++ b/lib/storage/index/skip_list_index.go @@ -10,6 +10,7 @@ import ( "github.com/ryogrid/SamehadaDB/lib/storage/tuple" "github.com/ryogrid/SamehadaDB/lib/types" "math" + "sync" ) type SkipListIndex struct { @@ -17,6 +18,8 @@ type SkipListIndex struct { metadata *IndexMetadata // idx of target column on table col_idx uint32 + // UpdateEntry only get Write lock + updateMtx sync.RWMutex } func NewSkipListIndex(metadata *IndexMetadata, buffer_pool_manager *buffer.BufferPoolManager, col_idx uint32) *SkipListIndex { @@ -27,41 +30,59 @@ func NewSkipListIndex(metadata *IndexMetadata, buffer_pool_manager *buffer.Buffe // for the thechnique, key type is fixed to Varchar (comparison is done on dict order as byte array) ret.container = *skip_list.NewSkipList(buffer_pool_manager, types.Varchar) ret.col_idx = col_idx + ret.updateMtx = sync.RWMutex{} return ret } -func (slidx *SkipListIndex) InsertEntry(key *tuple.Tuple, rid page.RID, txn interface{}) { +func (slidx *SkipListIndex) insertEntryInner(key *tuple.Tuple, rid page.RID, txn interface{}, isNoLock bool) { tupleSchema_ := slidx.GetTupleSchema() orgKeyVal := key.GetValue(tupleSchema_, slidx.col_idx) convedKeyVal := samehada_util.EncodeValueAndRIDToDicOrderComparableVarchar(&orgKeyVal, &rid) + if isNoLock == false { + slidx.updateMtx.RLock() + defer slidx.updateMtx.RUnlock() + } slidx.container.Insert(convedKeyVal, samehada_util.PackRIDtoUint64(&rid)) } -func (slidx *SkipListIndex) DeleteEntry(key *tuple.Tuple, rid page.RID, txn interface{}) { +func (slidx *SkipListIndex) InsertEntry(key *tuple.Tuple, rid page.RID, txn interface{}) { + slidx.insertEntryInner(key, rid, txn, false) +} + +func (slidx *SkipListIndex) deleteEntryInner(key *tuple.Tuple, rid page.RID, txn interface{}, isNoLock bool) { tupleSchema_ := slidx.GetTupleSchema() orgKeyVal := key.GetValue(tupleSchema_, slidx.col_idx) convedKeyVal := samehada_util.EncodeValueAndRIDToDicOrderComparableVarchar(&orgKeyVal, &rid) - revertedOrgKey := samehada_util.ExtractOrgKeyFromDicOrderComparableEncodedVarchar(convedKeyVal, orgKeyVal.ValueType()) - if !revertedOrgKey.CompareEquals(orgKeyVal) { - panic("key conversion may fail!") - } + //revertedOrgKey := samehada_util.ExtractOrgKeyFromDicOrderComparableEncodedVarchar(convedKeyVal, orgKeyVal.ValueType()) + //if !revertedOrgKey.CompareEquals(orgKeyVal) { + // panic("key conversion may fail!") + //} + if isNoLock == false { + slidx.updateMtx.RLock() + defer slidx.updateMtx.RUnlock() + } isSuccess := slidx.container.Remove(convedKeyVal, 0) if isSuccess == false { - panic(fmt.Sprintf("SkipListIndex::DeleteEntry: %v %v\n", convedKeyVal.ToIFValue(), rid)) + panic(fmt.Sprintf("SkipListIndex::deleteEntryInner: %v %v\n", convedKeyVal.ToIFValue(), rid)) } } +func (slidx *SkipListIndex) DeleteEntry(key *tuple.Tuple, rid page.RID, txn interface{}) { + slidx.deleteEntryInner(key, rid, txn, false) +} + func (slidx *SkipListIndex) ScanKey(key *tuple.Tuple, txn interface{}) []page.RID { tupleSchema_ := slidx.GetTupleSchema() orgKeyVal := key.GetValue(tupleSchema_, slidx.col_idx) smallestKeyVal := samehada_util.EncodeValueAndRIDToDicOrderComparableVarchar(&orgKeyVal, &page.RID{0, 0}) biggestKeyVal := samehada_util.EncodeValueAndRIDToDicOrderComparableVarchar(&orgKeyVal, &page.RID{math.MaxInt32, math.MaxUint32}) + slidx.updateMtx.RLock() // Attention: returned itr's containing keys are string type Value which is constructed with byte arr of concatenated original key and value rangeItr := slidx.container.Iterator(smallestKeyVal, biggestKeyVal) @@ -69,13 +90,16 @@ func (slidx *SkipListIndex) ScanKey(key *tuple.Tuple, txn interface{}) []page.RI for done, _, _, rid := rangeItr.Next(); !done; done, _, _, rid = rangeItr.Next() { retArr = append(retArr, *rid) } + slidx.updateMtx.RUnlock() return retArr } func (slidx *SkipListIndex) UpdateEntry(oldKey *tuple.Tuple, oldRID page.RID, newKey *tuple.Tuple, newRID page.RID, txn interface{}) { - slidx.DeleteEntry(oldKey, oldRID, txn) - slidx.InsertEntry(newKey, newRID, txn) + slidx.updateMtx.Lock() + defer slidx.updateMtx.Unlock() + slidx.deleteEntryInner(oldKey, oldRID, txn, true) + slidx.insertEntryInner(newKey, newRID, txn, true) } // get iterator which iterates entry in key sorted order @@ -96,6 +120,8 @@ func (slidx *SkipListIndex) GetRangeScanIterator(start_key *tuple.Tuple, end_key biggestKeyVal = samehada_util.EncodeValueAndRIDToDicOrderComparableVarchar(&orgEndKeyVal, &page.RID{math.MaxInt32, math.MaxUint32}) } + slidx.updateMtx.RLock() + defer slidx.updateMtx.RUnlock() return slidx.container.Iterator(smallestKeyVal, biggestKeyVal) } diff --git a/lib/storage/index/uniq_skip_list_index.go b/lib/storage/index/uniq_skip_list_index.go index f3b8169e..f9f7bd2e 100644 --- a/lib/storage/index/uniq_skip_list_index.go +++ b/lib/storage/index/uniq_skip_list_index.go @@ -10,6 +10,7 @@ import ( "github.com/ryogrid/SamehadaDB/lib/storage/tuple" "github.com/ryogrid/SamehadaDB/lib/types" "math" + "sync" ) type UniqSkipListIndex struct { @@ -17,6 +18,8 @@ type UniqSkipListIndex struct { metadata *IndexMetadata // idx of target column on table col_idx uint32 + // UpdateEntry only get Write lock + updateMtx sync.RWMutex } func NewUniqSkipListIndex(metadata *IndexMetadata, buffer_pool_manager *buffer.BufferPoolManager, col_idx uint32) *UniqSkipListIndex { @@ -24,36 +27,51 @@ func NewUniqSkipListIndex(metadata *IndexMetadata, buffer_pool_manager *buffer.B ret.metadata = metadata ret.container = *skip_list.NewSkipList(buffer_pool_manager, ret.metadata.GetTupleSchema().GetColumn(col_idx).GetType()) ret.col_idx = col_idx + ret.updateMtx = sync.RWMutex{} return ret } -func (slidx *UniqSkipListIndex) InsertEntry(key *tuple.Tuple, rid page.RID, transaction interface{}) { +func (slidx *UniqSkipListIndex) insertEntryInner(key *tuple.Tuple, rid page.RID, txn interface{}, isNoLock bool) { tupleSchema_ := slidx.GetTupleSchema() keyVal := key.GetValue(tupleSchema_, slidx.col_idx) - //fmt.Println("UniqSkipListIndex::InsertEntry: ", keyVal.ToString(), rid) - + if isNoLock == false { + slidx.updateMtx.RLock() + defer slidx.updateMtx.RUnlock() + } slidx.container.Insert(&keyVal, samehada_util.PackRIDtoUint64(&rid)) } -func (slidx *UniqSkipListIndex) DeleteEntry(key *tuple.Tuple, rid page.RID, transaction interface{}) { +func (slidx *UniqSkipListIndex) InsertEntry(key *tuple.Tuple, rid page.RID, transaction interface{}) { + slidx.insertEntryInner(key, rid, transaction, false) +} + +func (slidx *UniqSkipListIndex) deleteEntryInner(key *tuple.Tuple, rid page.RID, txn interface{}, isNoLock bool) { tupleSchema_ := slidx.GetTupleSchema() keyVal := key.GetValue(tupleSchema_, slidx.col_idx) - //fmt.Println("UniqSkipListIndex::DeleteEntry: ", keyVal.ToString(), rid) - + if isNoLock == false { + slidx.updateMtx.RLock() + defer slidx.updateMtx.RUnlock() + } isSuccess := slidx.container.Remove(&keyVal, samehada_util.PackRIDtoUint64(&rid)) if isSuccess == false { - panic(fmt.Sprintf("UniqSkipListIndex::DeleteEntry: %v %v\n", keyVal.ToIFValue(), rid)) + panic(fmt.Sprintf("UniqSkipListIndex::deleteEntryInner: %v %v\n", keyVal.ToIFValue(), rid)) } } +func (slidx *UniqSkipListIndex) DeleteEntry(key *tuple.Tuple, rid page.RID, transaction interface{}) { + slidx.deleteEntryInner(key, rid, transaction, false) +} + func (slidx *UniqSkipListIndex) ScanKey(key *tuple.Tuple, transaction interface{}) []page.RID { tupleSchema_ := slidx.GetTupleSchema() keyVal := key.GetValue(tupleSchema_, slidx.col_idx) ret_arr := make([]page.RID, 0) + slidx.updateMtx.RLock() packed_value := slidx.container.GetValue(&keyVal) + slidx.updateMtx.RUnlock() if packed_value != math.MaxUint64 { // when packed_vale == math.MaxUint32 => true, keyVal is not found on index ret_arr = append(ret_arr, samehada_util.UnpackUint64toRID(packed_value)) @@ -62,8 +80,10 @@ func (slidx *UniqSkipListIndex) ScanKey(key *tuple.Tuple, transaction interface{ } func (slidx *UniqSkipListIndex) UpdateEntry(oldKey *tuple.Tuple, oldRID page.RID, newKey *tuple.Tuple, newRID page.RID, transaction interface{}) { - slidx.DeleteEntry(oldKey, oldRID, transaction) - slidx.InsertEntry(newKey, newRID, transaction) + slidx.updateMtx.Lock() + defer slidx.updateMtx.Unlock() + slidx.deleteEntryInner(oldKey, oldRID, transaction, true) + slidx.insertEntryInner(newKey, newRID, transaction, true) } // get iterator which iterates entry in key sorted order @@ -81,6 +101,8 @@ func (slidx *UniqSkipListIndex) GetRangeScanIterator(start_key *tuple.Tuple, end end_val = samehada_util.GetPonterOfValue(end_key.GetValue(tupleSchema_, slidx.col_idx)) } + slidx.updateMtx.RLock() + defer slidx.updateMtx.RUnlock() return slidx.container.Iterator(start_val, end_val) } From 7c6768a2a08a3026d6a888198c8e8b5c7cdf402f Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 11:46:22 +0900 Subject: [PATCH 02/10] fixing dirty read occurence problem at update index: resolved degrade... --- lib/execution/executors/update_executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/execution/executors/update_executor.go b/lib/execution/executors/update_executor.go index f9e188da..6e1ec459 100644 --- a/lib/execution/executors/update_executor.go +++ b/lib/execution/executors/update_executor.go @@ -71,7 +71,7 @@ func (e *UpdateExecutor) Next() (*tuple.Tuple, Done, error) { common.NewRIDAtNormal = true } - if !is_updated && updateErr != nil { + if !is_updated || updateErr != nil { err_ := errors.New("tuple update failed. PageId:SlotNum = " + string(rid.GetPageId()) + ":" + fmt.Sprint(rid.GetSlotNum())) e.txn.SetState(access.ABORTED) return nil, false, err_ From cdd65b36da05a210dde7373a9651eb6f5e200045 Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 12:52:44 +0900 Subject: [PATCH 03/10] fixing dirty read occurence problem at update index: finished? --- lib/catalog/table_catalog.go | 4 +- lib/execution/executors/delete_executor.go | 2 +- .../skiplist_index_executor_test.go | 1 + lib/execution/executors/insert_executor.go | 2 +- lib/planner/optimizer/optimizer_test.go | 2 +- .../recovery_test/log_recovery_test.go | 14 +-- lib/storage/access/lock_manager.go | 14 +-- lib/storage/access/table_heap.go | 38 ++++---- lib/storage/access/table_heap_test.go | 4 +- lib/storage/access/table_page.go | 20 ++-- lib/storage/access/transaction.go | 12 ++- lib/storage/access/transaction_manager.go | 93 +++++++++++++------ .../testing_tbl_gen/table_generator.go | 2 +- 13 files changed, 125 insertions(+), 83 deletions(-) diff --git a/lib/catalog/table_catalog.go b/lib/catalog/table_catalog.go index f0ee1259..78418dcd 100644 --- a/lib/catalog/table_catalog.go +++ b/lib/catalog/table_catalog.go @@ -199,7 +199,7 @@ func (c *Catalog) insertTable(tableMetadata *TableMetadata, txn *access.Transact first_tuple := tuple.NewTupleFromSchema(row, TableCatalogSchema()) // insert entry to TableCatalogPage (PageId = 0) - c.tableHeap.InsertTuple(first_tuple, txn, tableMetadata.OID()) + c.tableHeap.InsertTuple(first_tuple, txn, tableMetadata.OID(), false) for _, column_ := range tableMetadata.schema.GetColumns() { row := make([]types.Value, 0) row = append(row, types.NewInteger(int32(tableMetadata.oid))) @@ -214,7 +214,7 @@ func (c *Catalog) insertTable(tableMetadata *TableMetadata, txn *access.Transact new_tuple := tuple.NewTupleFromSchema(row, ColumnsCatalogSchema()) // insert entry to ColumnsCatalogPage (PageId = 1) - c.tableIds[ColumnsCatalogOID].Table().InsertTuple(new_tuple, txn, ColumnsCatalogOID) + c.tableIds[ColumnsCatalogOID].Table().InsertTuple(new_tuple, txn, ColumnsCatalogOID, false) } // flush a page having table definitions c.bpm.FlushPage(TableCatalogPageId) diff --git a/lib/execution/executors/delete_executor.go b/lib/execution/executors/delete_executor.go index 3517f99a..f777b92d 100644 --- a/lib/execution/executors/delete_executor.go +++ b/lib/execution/executors/delete_executor.go @@ -52,7 +52,7 @@ func (e *DeleteExecutor) Next() (*tuple.Tuple, Done, error) { rid := t.GetRID() tableMetadata := e.child.GetTableMetaData() - is_marked := tableMetadata.Table().MarkDelete(rid, tableMetadata.OID(), e.txn) + is_marked := tableMetadata.Table().MarkDelete(rid, tableMetadata.OID(), e.txn, false) if !is_marked { err := errors.New("marking tuple deleted failed. PageId:SlotNum = " + string(rid.GetPageId()) + ":" + fmt.Sprint(rid.GetSlotNum())) e.txn.SetState(access.ABORTED) diff --git a/lib/execution/executors/executor_test/skiplist_index_executor_test.go b/lib/execution/executors/executor_test/skiplist_index_executor_test.go index 63c4ad41..b3933420 100644 --- a/lib/execution/executors/executor_test/skiplist_index_executor_test.go +++ b/lib/execution/executors/executor_test/skiplist_index_executor_test.go @@ -871,6 +871,7 @@ func testParallelTxnsQueryingSkipListIndexUsedColumns[T int32 | float32 | string var updateKeyValBase T isFound, updateKeyValBaseP := getKeyAndMarkItInsValsAndDeletedValsForDelete() if !isFound || keyType == types.Varchar { + //if !isFound { if execType == PARALLEL_EXEC { ch <- 1 } diff --git a/lib/execution/executors/insert_executor.go b/lib/execution/executors/insert_executor.go index c4851b19..6070c322 100644 --- a/lib/execution/executors/insert_executor.go +++ b/lib/execution/executors/insert_executor.go @@ -39,7 +39,7 @@ func (e *InsertExecutor) Next() (*tuple.Tuple, Done, error) { for _, values := range e.plan.GetRawValues() { tuple_ := tuple.NewTupleFromSchema(values, e.tableMetadata.Schema()) tableHeap := e.tableMetadata.Table() - rid, err := tableHeap.InsertTuple(tuple_, e.context.txn, e.tableMetadata.OID()) + rid, err := tableHeap.InsertTuple(tuple_, e.context.txn, e.tableMetadata.OID(), false) if err != nil { return nil, true, err } diff --git a/lib/planner/optimizer/optimizer_test.go b/lib/planner/optimizer/optimizer_test.go index 1a6b4832..823fe55e 100644 --- a/lib/planner/optimizer/optimizer_test.go +++ b/lib/planner/optimizer/optimizer_test.go @@ -59,7 +59,7 @@ func SetupTableWithMetadata(exec_ctx *executors.ExecutorContext, tableMeta *Setu vals = append(vals, types.NewValue(genFunc(ii))) } tuple_ := tuple.NewTupleFromSchema(vals, schema_) - rid, _ := tm.Table().InsertTuple(tuple_, txn, tm.OID()) + rid, _ := tm.Table().InsertTuple(tuple_, txn, tm.OID(), false) for jj, colMeta := range tableMeta.Columns { if colMeta.IdxKind != index_constants.INDEX_KIND_INVALID { tm.GetIndex(jj).InsertEntry(tuple_, *rid, txn) diff --git a/lib/recovery/recovery_test/log_recovery_test.go b/lib/recovery/recovery_test/log_recovery_test.go index d89da5ec..adcde33c 100644 --- a/lib/recovery/recovery_test/log_recovery_test.go +++ b/lib/recovery/recovery_test/log_recovery_test.go @@ -59,10 +59,10 @@ func TestRedo(t *testing.T) { val1_1 := tuple1_.GetValue(schema_, 1) val1_0 := tuple1_.GetValue(schema_, 0) - rid, _ = test_table.InsertTuple(tuple_, txn, math.MaxUint32) + rid, _ = test_table.InsertTuple(tuple_, txn, math.MaxUint32, false) // TODO: (SDB) insert index entry if needed testingpkg.Assert(t, rid != nil, "") - rid1, _ = test_table.InsertTuple(tuple1_, txn, math.MaxUint32) + rid1, _ = test_table.InsertTuple(tuple1_, txn, math.MaxUint32, false) // TODO: (SDB) insert index entry if needed testingpkg.Assert(t, rid != nil, "") @@ -172,7 +172,7 @@ func TestUndo(t *testing.T) { val1_1 := tuple1.GetValue(schema_, 1) var rid1 *page.RID fmt.Println("tuple1: ", tuple1.Data()) - rid1, _ = test_table.InsertTuple(tuple1, txn, math.MaxUint32) + rid1, _ = test_table.InsertTuple(tuple1, txn, math.MaxUint32, false) testingpkg.Assert(t, rid1 != nil, "") tuple2 := ConstructTuple(schema_) @@ -180,7 +180,7 @@ func TestUndo(t *testing.T) { val2_1 := tuple2.GetValue(schema_, 1) var rid2 *page.RID - rid2, _ = test_table.InsertTuple(tuple2, txn, math.MaxUint32) + rid2, _ = test_table.InsertTuple(tuple2, txn, math.MaxUint32, false) testingpkg.Assert(t, rid2 != nil, "") bf_commit_tuple2, _ := test_table.GetTuple(rid2, txn) @@ -197,7 +197,7 @@ func TestUndo(t *testing.T) { fmt.Println("bf_commit_tuple2_: ", bf_commit_tuple2__.Data()) // tuple deletion (rid1) - test_table.MarkDelete(rid1, math.MaxUint32, txn) + test_table.MarkDelete(rid1, math.MaxUint32, txn, false) bf_commit_tuple2___, _ := test_table.GetTuple(rid2, txn) fmt.Println("bf_commit_tuple2_: ", bf_commit_tuple2___.Data()) @@ -215,7 +215,7 @@ func TestUndo(t *testing.T) { // tuple insertion (rid3) tuple3 := ConstructTuple(schema_) var rid3 *page.RID - rid3, _ = test_table.InsertTuple(tuple3, txn, math.MaxUint32) + rid3, _ = test_table.InsertTuple(tuple3, txn, math.MaxUint32, false) // TODO: (SDB) insert index entry if needed testingpkg.Assert(t, rid3 != nil, "") @@ -345,7 +345,7 @@ func TestCheckpoint(t *testing.T) { // insert a ton of tuples txn1 := samehada_instance.GetTransactionManager().Begin(nil) for i := 0; i < 1000; i++ { - rid, err := test_table.InsertTuple(tuple_, txn1, math.MaxUint32) + rid, err := test_table.InsertTuple(tuple_, txn1, math.MaxUint32, false) if err != nil { fmt.Println(err) } diff --git a/lib/storage/access/lock_manager.go b/lib/storage/access/lock_manager.go index 2ca6264b..b6b6a387 100644 --- a/lib/storage/access/lock_manager.go +++ b/lib/storage/access/lock_manager.go @@ -125,11 +125,11 @@ func isContainTxnID(list []types.TxnID, txnID types.TxnID) bool { /** * Acquire a lock on RID in shared mode. See [LOCK_NOTE] in header file. * @param txn the transaction requesting the shared lock -* @param rid the RID to be locked in shared mode +* @param rid1 the RID to be locked in shared mode * @return true if the lock is granted, false otherwise */ func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) bool { - //fmt.Printf("called LockShared, %v\n", rid) + //fmt.Printf("called LockShared, %v\n", rid1) lock_manager.mutex.Lock() defer lock_manager.mutex.Unlock() @@ -170,11 +170,11 @@ func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) boo /** * Acquire a lock on RID in exclusive mode. See [LOCK_NOTE] in header file. * @param txn the transaction requesting the exclusive lock -* @param rid the RID to be locked in exclusive mode +* @param rid1 the RID to be locked in exclusive mode * @return true if the lock is granted, false otherwise */ func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID) bool { - //fmt.Printf("called LockExclusive, %v\n", rid) + //fmt.Printf("called LockExclusive, %v\n", rid1) lock_manager.mutex.Lock() defer lock_manager.mutex.Unlock() @@ -207,11 +207,11 @@ func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID) /** * Upgrade a lock from a shared lock to an exclusive access. * @param txn the transaction requesting the lock upgrade -* @param rid the RID that should already be locked in shared mode by the requesting transaction +* @param rid1 the RID that should already be locked in shared mode by the requesting transaction * @return true if the upgrade is successful, false otherwise */ func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bool { - //fmt.Printf("called LockUpgrade %v\n", rid) + //fmt.Printf("called LockUpgrade %v\n", rid1) lock_manager.mutex.Lock() defer lock_manager.mutex.Unlock() @@ -249,7 +249,7 @@ func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bo /** * Release the lock held by the access. * @param txn the transaction releasing the lock, it should actually hold the lock -* @param rid the RID that is locked by the transaction +* @param rid1 the RID that is locked by the transaction * @return true if the unlock is successful, false otherwise */ func (lock_manager *LockManager) Unlock(txn *Transaction, rid_list []page.RID) bool { diff --git a/lib/storage/access/table_heap.go b/lib/storage/access/table_heap.go index a65d1311..64c039d9 100644 --- a/lib/storage/access/table_heap.go +++ b/lib/storage/access/table_heap.go @@ -60,7 +60,7 @@ func (t *TableHeap) GetFirstPageId() types.PageID { // If the tuple1 is too large (>= page_size): // 1. It tries to insert in the next page // 2. If there is no next page, it creates a new page and insert in it -func (t *TableHeap) InsertTuple(tuple_ *tuple.Tuple, txn *Transaction, oid uint32) (rid *page.RID, err error) { +func (t *TableHeap) InsertTuple(tuple_ *tuple.Tuple, txn *Transaction, oid uint32, isForUpdate bool) (rid *page.RID, err error) { if common.EnableDebug { if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { fmt.Printf("TableHeap::InsertTuple called. txn.txn_id:%v dbgInfo:%s tuple_:%v\n", txn.txn_id, txn.dbgInfo, *tuple_) @@ -129,8 +129,12 @@ func (t *TableHeap) InsertTuple(tuple_ *tuple.Tuple, txn *Transaction, oid uint3 } currentPage.RemoveWLatchRecord(int32(txn.txn_id)) currentPage.WUnlatch() - // Update the transaction's write set. - txn.AddIntoWriteSet(NewWriteRecord(rid, INSERT, tuple_, nil, t, oid)) + if !isForUpdate { + // Update the transaction's write set. + txn.AddIntoWriteSet(NewWriteRecord(rid, nil, INSERT, tuple_, nil, t, oid)) + } + //note: when isForUpdate is true, write record of Update is created in caller + return rid, nil } @@ -139,7 +143,7 @@ func (t *TableHeap) InsertTuple(tuple_ *tuple.Tuple, txn *Transaction, oid uint3 func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, schema_ *schema.Schema, oid uint32, rid page.RID, txn *Transaction, isRollback bool) (is_success bool, new_rid_ *page.RID, err_ error, update_tuple_ *tuple.Tuple, old_tuple_ *tuple.Tuple) { if common.EnableDebug { if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TableHeap::UpadteTuple called. txn.txn_id:%v dbgInfo:%s update_col_idxs:%v rid:%v\n", txn.txn_id, txn.dbgInfo, update_col_idxs, rid) + fmt.Printf("TableHeap::UpadteTuple called. txn.txn_id:%v dbgInfo:%s update_col_idxs:%v rid1:%v\n", txn.txn_id, txn.dbgInfo, update_col_idxs, rid) } if common.ActiveLogKindSetting&common.BUFFER_INTERNAL_STATE > 0 { t.bpm.PrintBufferUsageState(fmt.Sprintf("TableHeap::UpdateTuple start. txn.txn_id: %d dbgInfo:%s", txn.txn_id, txn.dbgInfo)) @@ -172,7 +176,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche var new_rid *page.RID var isUpdateWithDelInsert = false if is_updated == false && err == ErrNotEnoughSpace { - // delete old_tuple(rid) + // delete old_tuple(rid1) // and insert need_follow_tuple(new_rid) // as updating @@ -185,7 +189,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche // above ApplyDelete does not fail is_deleted = true } else { - is_deleted = t.MarkDelete(&rid, oid, txn) + is_deleted = t.MarkDelete(&rid, oid, txn, true) } if !is_deleted { @@ -195,7 +199,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche } var err2 error = nil - _, err2 = t.InsertTuple(need_follow_tuple, txn, oid) + new_rid, err2 = t.InsertTuple(need_follow_tuple, txn, oid, true) if err2 != nil { panic("InsertTuple does not fail on normal system condition!!!") } @@ -210,11 +214,12 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche // when err == ErrNotEnoughSpace route and old tuple delete is only succeeded, DELETE write set entry is added above (no come here) if is_updated && txn.GetState() != ABORTED { if isUpdateWithDelInsert { - // adding write record of UPDATE is not needed + t.lastPageId = t.firstPageId + txn.AddIntoWriteSet(NewWriteRecord(&rid, new_rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) } else { // reset seek start point of Insert to first page t.lastPageId = t.firstPageId - txn.AddIntoWriteSet(NewWriteRecord(&rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) + txn.AddIntoWriteSet(NewWriteRecord(&rid, &rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) } } @@ -222,10 +227,10 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche } // when isForUpdate arg is true, write record is not created -func (t *TableHeap) MarkDelete(rid *page.RID, oid uint32, txn *Transaction) bool { +func (t *TableHeap) MarkDelete(rid *page.RID, oid uint32, txn *Transaction, isForUpdate bool) bool { if common.EnableDebug { if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TableHeap::MarkDelete called. txn.txn_id:%v rid:%v dbgInfo:%s\n", txn.txn_id, *rid, txn.dbgInfo) + fmt.Printf("TableHeap::MarkDelete called. txn.txn_id:%v rid1:%v dbgInfo:%s\n", txn.txn_id, *rid, txn.dbgInfo) } if common.ActiveLogKindSetting&common.BUFFER_INTERNAL_STATE > 0 { t.bpm.PrintBufferUsageState(fmt.Sprintf("TableHeap::MarkDelete start. txn.txn_id: %d dbgInfo:%s", txn.txn_id, txn.dbgInfo)) @@ -251,10 +256,11 @@ func (t *TableHeap) MarkDelete(rid *page.RID, oid uint32, txn *Transaction) bool } page_.RemoveWLatchRecord(int32(txn.txn_id)) page_.WUnlatch() - if is_marked { + if is_marked && !isForUpdate { // Update the transaction's write set. - txn.AddIntoWriteSet(NewWriteRecord(rid, DELETE, markedTuple, nil, t, oid)) + txn.AddIntoWriteSet(NewWriteRecord(rid, nil, DELETE, markedTuple, nil, t, oid)) } + //note: when isForUpdate is true, write record of Update is created in caller return is_marked } @@ -262,7 +268,7 @@ func (t *TableHeap) MarkDelete(rid *page.RID, oid uint32, txn *Transaction) bool func (t *TableHeap) ApplyDelete(rid *page.RID, txn *Transaction) { if common.EnableDebug { if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TableHeap::ApplyDelete called. txn.txn_id:%v rid:%v dbgInfo:%s\n", txn.txn_id, *rid, txn.dbgInfo) + fmt.Printf("TableHeap::ApplyDelete called. txn.txn_id:%v rid1:%v dbgInfo:%s\n", txn.txn_id, *rid, txn.dbgInfo) } if common.ActiveLogKindSetting&common.BUFFER_INTERNAL_STATE > 0 { t.bpm.PrintBufferUsageState(fmt.Sprintf("TableHeap::ApplyDelete start. txn.txn_id: %d dbgInfo:%s", txn.txn_id, txn.dbgInfo)) @@ -293,7 +299,7 @@ func (t *TableHeap) ApplyDelete(rid *page.RID, txn *Transaction) { func (t *TableHeap) RollbackDelete(rid *page.RID, txn *Transaction) { if common.EnableDebug { if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TableHeap::RollBackDelete called. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, *rid) + fmt.Printf("TableHeap::RollBackDelete called. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, *rid) } if common.ActiveLogKindSetting&common.BUFFER_INTERNAL_STATE > 0 { t.bpm.PrintBufferUsageState(fmt.Sprintf("TableHeap::RollBackDelete start. txn.txn_id: %d dbgInfo:%s", txn.txn_id, txn.dbgInfo)) @@ -321,7 +327,7 @@ func (t *TableHeap) RollbackDelete(rid *page.RID, txn *Transaction) { func (t *TableHeap) GetTuple(rid *page.RID, txn *Transaction) (*tuple.Tuple, error) { if common.EnableDebug { if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TableHeap::GetTuple called. txn.txn_id:%v rid:%v dbgInfo:%s\n", txn.txn_id, *rid, txn.dbgInfo) + fmt.Printf("TableHeap::GetTuple called. txn.txn_id:%v rid1:%v dbgInfo:%s\n", txn.txn_id, *rid, txn.dbgInfo) } if common.ActiveLogKindSetting&common.BUFFER_INTERNAL_STATE > 0 { t.bpm.PrintBufferUsageState(fmt.Sprintf("TableHeap::GetTuple start. txn.txn_id: %d dbgInfo:%s", txn.txn_id, txn.dbgInfo)) diff --git a/lib/storage/access/table_heap_test.go b/lib/storage/access/table_heap_test.go index c61a92ed..fe0de3f4 100644 --- a/lib/storage/access/table_heap_test.go +++ b/lib/storage/access/table_heap_test.go @@ -48,7 +48,7 @@ func TestTableHeap(t *testing.T) { row = append(row, types.NewInteger(int32((i+1)*2))) tuple_ := tuple.NewTupleFromSchema(row, schema_) - _, err := th.InsertTuple(tuple_, txn, math.MaxUint32) + _, err := th.InsertTuple(tuple_, txn, math.MaxUint32, false) testingpkg.Ok(t, err) } @@ -121,7 +121,7 @@ func TestTableHeapFourCol(t *testing.T) { row = append(row, types.NewInteger(int32((i+3)*2))) tuple_ := tuple.NewTupleFromSchema(row, schema_) - _, err := th.InsertTuple(tuple_, txn, math.MaxUint32) + _, err := th.InsertTuple(tuple_, txn, math.MaxUint32, false) testingpkg.Ok(t, err) } diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 39cf4550..ec8c21fa 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -107,7 +107,7 @@ func (tp *TablePage) InsertTuple(tuple *tuple.Tuple, log_manager *recovery.LogMa if !locked { //txn.SetState(ABORTED) return nil, errors.Error("could not acquire an exclusive lock of found slot (=RID)") - // fmt.Printf("Locking a new tuple1 should always work. rid: %v\n", rid) + // fmt.Printf("Locking a new tuple1 should always work. rid1: %v\n", rid1) // lock_manager.PrintLockTables() // os.Stdout.Sync() // panic("") @@ -118,7 +118,7 @@ func (tp *TablePage) InsertTuple(tuple *tuple.Tuple, log_manager *recovery.LogMa if common.EnableDebug { setFSP := tp.GetFreeSpacePointer() - tuple.Size() - common.SH_Assert(setFSP <= common.PageSize, fmt.Sprintf("illegal pointer value!! txnId:%d txnState:%d txn.dbgInfo:%s rid:%v GetPageId():%d setFSP:%d", txn.txn_id, txn.state, txn.dbgInfo, *rid, tp.GetPageId(), setFSP)) + common.SH_Assert(setFSP <= common.PageSize, fmt.Sprintf("illegal pointer value!! txnId:%d txnState:%d txn.dbgInfo:%s rid1:%v GetPageId():%d setFSP:%d", txn.txn_id, txn.state, txn.dbgInfo, *rid, tp.GetPageId(), setFSP)) } tp.SetFreeSpacePointer(tp.GetFreeSpacePointer() - tuple.Size()) @@ -157,7 +157,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } }() if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TablePage::UpdateTuple called. pageId:%d txn.txn_id:%v dbgInfo:%s new_tuple:%v update_col_idxs:%v rid:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *new_tuple, update_col_idxs, *rid) + fmt.Printf("TablePage::UpdateTuple called. pageId:%d txn.txn_id:%v dbgInfo:%s new_tuple:%v update_col_idxs:%v rid1:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *new_tuple, update_col_idxs, *rid) } } common.SH_Assert(new_tuple.Size() > 0, "Cannot have empty tuples.") @@ -278,7 +278,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, return true, nil, update_tuple } -// rid is not null when caller is Redo +// rid1 is not null when caller is Redo func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager) *page.RID { maxSlotNum := tp.GetTupleCount() buf := make([]byte, size) @@ -325,7 +325,7 @@ func (tp *TablePage) MarkDelete(rid *page.RID, txn *Transaction, lock_manager *L } }() if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TablePage::MarkDelete called. pageId:%d txn.txn_id:%v dbgInfo:%s rid:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) + fmt.Printf("TablePage::MarkDelete called. pageId:%d txn.txn_id:%v dbgInfo:%s rid1:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) } } @@ -394,7 +394,7 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r } }() if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TablePage::ApplyDelete called. pageId:%d txn.txn_id:%v dbgInfo:%s rid:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) + fmt.Printf("TablePage::ApplyDelete called. pageId:%d txn.txn_id:%v dbgInfo:%s rid1:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) } } @@ -465,7 +465,7 @@ func (tp *TablePage) RollbackDelete(rid *page.RID, txn *Transaction, log_manager } }() if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TablePage::RollbackDelete called. pageId:%d txn.txn_id:%v dbgInfo:%s rid:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) + fmt.Printf("TablePage::RollbackDelete called. pageId:%d txn.txn_id:%v dbgInfo:%s rid1:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) } } @@ -610,7 +610,7 @@ func (tp *TablePage) GetTuple(rid *page.RID, log_manager *recovery.LogManager, l } }() if common.ActiveLogKindSetting&common.RDB_OP_FUNC_CALL > 0 { - fmt.Printf("TablePage::GetTuple called. pageId:%d txn.txn_id:%v dbgInfo:%s rid:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) + fmt.Printf("TablePage::GetTuple called. pageId:%d txn.txn_id:%v dbgInfo:%s rid1:%v\n", tp.GetPageId(), txn.txn_id, txn.dbgInfo, *rid) } } @@ -645,7 +645,7 @@ func (tp *TablePage) GetTuple(rid *page.RID, log_manager *recovery.LogManager, l // when Index returned RID of deleted by other txn // in current implementation, this case occurs in not illegal situation - fmt.Printf("TablePage::GetTuple rid of deleted record is passed. rid:%v\n", *rid) + fmt.Printf("TablePage::GetTuple rid1 of deleted record is passed. rid1:%v\n", *rid) txn.SetState(ABORTED) return nil, ErrGeneral } @@ -667,7 +667,7 @@ func (tp *TablePage) GetTuple(rid *page.RID, log_manager *recovery.LogManager, l // when RangeSanWithIndexExecutor or PointScanWithIndexExecutor which uses SkipListIterator as RID itrator is called, // the txn enter here. - fmt.Printf("TablePage::GetTuple faced deleted marked record . rid:%v tupleSize:%d tupleOffset:%d\n", *rid, tupleSize, tupleOffset) + fmt.Printf("TablePage::GetTuple faced deleted marked record . rid1:%v tupleSize:%d tupleOffset:%d\n", *rid, tupleSize, tupleOffset) txn.SetState(ABORTED) return nil, ErrGeneral diff --git a/lib/storage/access/transaction.go b/lib/storage/access/transaction.go index a9bd7d53..96d615e0 100644 --- a/lib/storage/access/transaction.go +++ b/lib/storage/access/transaction.go @@ -45,7 +45,8 @@ const ( * WriteRecord tracks information related to a write. */ type WriteRecord struct { - rid *page.RID + rid1 *page.RID + rid2 *page.RID wtype WType /** The tuple1 is used only for the updateoperation. */ tuple1 *tuple.Tuple @@ -55,9 +56,10 @@ type WriteRecord struct { oid uint32 // for rollback of index data } -func NewWriteRecord(rid *page.RID, wtype WType, tuple1 *tuple.Tuple, tuple2 *tuple.Tuple, table *TableHeap, oid uint32) *WriteRecord { +func NewWriteRecord(rid1 *page.RID, rid2 *page.RID, wtype WType, tuple1 *tuple.Tuple, tuple2 *tuple.Tuple, table *TableHeap, oid uint32) *WriteRecord { ret := new(WriteRecord) - ret.rid = rid + ret.rid1 = rid1 + ret.rid2 = rid2 ret.wtype = wtype ret.tuple1 = tuple1 ret.tuple2 = tuple2 @@ -141,14 +143,14 @@ func isContainsRID(list []page.RID, rid page.RID) bool { return false } -/** @return true if rid is shared locked by this transaction */ +/** @return true if rid1 is shared locked by this transaction */ func (txn *Transaction) IsSharedLocked(rid *page.RID) bool { ret := isContainsRID(txn.shared_lock_set, *rid) //fmt.Printf("called IsSharedLocked: %v\n", ret) return ret } -/** @return true if rid is exclusively locked by this transaction */ +/** @return true if rid1 is exclusively locked by this transaction */ func (txn *Transaction) IsExclusiveLocked(rid *page.RID) bool { ret := isContainsRID(txn.exclusive_lock_set, *rid) //fmt.Printf("called IsExclusiveLocked: %v\n", ret) diff --git a/lib/storage/access/transaction_manager.go b/lib/storage/access/transaction_manager.go index 93a19ea0..433e07d1 100644 --- a/lib/storage/access/transaction_manager.go +++ b/lib/storage/access/transaction_manager.go @@ -76,29 +76,45 @@ func (transaction_manager *TransactionManager) Commit(catalog_ catalog_interface for len(write_set) != 0 { item := write_set[len(write_set)-1] table := item.table - rid := item.rid + rid := item.rid1 if item.wtype == DELETE { if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Commit handle DELETE write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, rid) + fmt.Printf("TransactionManager::Commit handle DELETE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, rid) } pageID := rid.GetPageId() tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) tpage.WLatch() tpage.AddWLatchRecord(int32(txn.txn_id)) - tpage.ApplyDelete(item.rid, txn, transaction_manager.log_manager) + tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) table.bpm.UnpinPage(tpage.GetPageId(), true) tpage.RemoveWLatchRecord(int32(txn.txn_id)) tpage.WUnlatch() + } else if item.wtype == UPDATE { + if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { + fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, rid) + } + + if *item.rid1 != *item.rid2 { + // tuple location change occured case + pageID := item.rid1.GetPageId() + tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) + tpage.WLatch() + tpage.AddWLatchRecord(int32(txn.txn_id)) + tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) + table.bpm.UnpinPage(tpage.GetPageId(), true) + tpage.RemoveWLatchRecord(int32(txn.txn_id)) + tpage.WUnlatch() + } } else if item.wtype == RESERVE_SPACE { if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, rid) + fmt.Printf("TransactionManager::Commit handle RESERVE_SPACE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, rid) } pageID := rid.GetPageId() tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) tpage.WLatch() tpage.AddWLatchRecord(int32(txn.txn_id)) // remove dummy tuple which reserves space for update rollback - tpage.ApplyDelete(item.rid, txn, transaction_manager.log_manager) + tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) table.bpm.UnpinPage(tpage.GetPageId(), true) tpage.RemoveWLatchRecord(int32(txn.txn_id)) tpage.WUnlatch() @@ -163,31 +179,30 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. table := item.table if item.wtype == DELETE { if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Abort handle DELETE write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, item.rid) + fmt.Printf("TransactionManager::Abort handle DELETE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, item.rid1) } // rollback record data - table.RollbackDelete(item.rid, txn) + table.RollbackDelete(item.rid1, txn) // rollback index data indexes := catalog_.GetRollbackNeededIndexes(indexMap, item.oid) for _, index_ := range indexes { if index_ != nil { - index_.InsertEntry(item.tuple1, *item.rid, txn) + index_.InsertEntry(item.tuple1, *item.rid1, txn) } } } else if item.wtype == INSERT { if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Abort handle INSERT write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, item.rid) + fmt.Printf("TransactionManager::Abort handle INSERT write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, item.rid1) } // rollback record data - rid := item.rid - // Note that this also releases the lock when holding the page latch. + rid := item.rid1 pageID := rid.GetPageId() tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) tpage.WLatch() - tpage.ApplyDelete(item.rid, txn, transaction_manager.log_manager) + tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) table.bpm.UnpinPage(pageID, true) tpage.WUnlatch() @@ -196,31 +211,49 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. indexes := catalog_.GetRollbackNeededIndexes(indexMap, item.oid) for _, index_ := range indexes { if index_ != nil { - index_.DeleteEntry(item.tuple1, *item.rid, txn) + index_.DeleteEntry(item.tuple1, *item.rid1, txn) } } } } else if item.wtype == UPDATE { if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Abort handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid:%v tuple1.Size()=%d \n", txn.txn_id, txn.dbgInfo, item.rid, item.tuple1.Size()) + fmt.Printf("TransactionManager::Abort handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid1:%v tuple1.Size()=%d \n", txn.txn_id, txn.dbgInfo, item.rid1, item.tuple1.Size()) } // rollback record data - rid := item.rid - // Note that this also releases the lock when holding the page latch. - pageID := rid.GetPageId() - tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) - if !isReserveSpaceHandled { + if *item.rid1 != *item.rid2 { + // tuple location change occured case + + // rollback inserted record data + pageID := item.rid2.GetPageId() + tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) tpage.WLatch() + tpage.ApplyDelete(item.rid2, txn, transaction_manager.log_manager) + table.bpm.UnpinPage(pageID, true) + tpage.WUnlatch() + + // rollback deleted record data + table.RollbackDelete(item.rid1, txn) } else { - // getting latch is not needed because it is already latched at RESERVE_SPACE handling - } - isReserveSpaceHandled = false + // normal case + + rid := item.rid1 + // Note that this also releases the lock when holding the page latch. + pageID := rid.GetPageId() + tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) + if !isReserveSpaceHandled { + tpage.WLatch() + } else { + // getting latch is not needed because it is already latched at RESERVE_SPACE handling + } + isReserveSpaceHandled = false - tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager) - table.bpm.UnpinPage(pageID, true) - tpage.WUnlatch() + tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager) + table.bpm.UnpinPage(pageID, true) + tpage.WUnlatch() + } + // rollback index data if catalog_ != nil { indexes := catalog_.GetRollbackNeededIndexes(indexMap, item.oid) @@ -230,8 +263,8 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. colIdx := index_.GetKeyAttrs()[0] bfRlbkKeyVal := catalog_.GetColValFromTupleForRollback(item.tuple2, colIdx, item.oid) rlbkKeyVal := catalog_.GetColValFromTupleForRollback(item.tuple1, colIdx, item.oid) - if !bfRlbkKeyVal.CompareEquals(*rlbkKeyVal) { - index_.UpdateEntry(item.tuple2, *item.rid, item.tuple1, *item.rid, txn) + if !bfRlbkKeyVal.CompareEquals(*rlbkKeyVal) || *item.rid1 != *item.rid2 { + index_.UpdateEntry(item.tuple2, *item.rid2, item.tuple1, *item.rid1, txn) } } } @@ -239,14 +272,14 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. } else if item.wtype == RESERVE_SPACE { // TODO: (SDB) critical section for this operation should be concatenated with UPDATE case... if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, item.rid) + fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, item.rid1) } - pageID := item.rid.GetPageId() + pageID := item.rid1.GetPageId() tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) tpage.WLatch() tpage.AddWLatchRecord(int32(txn.txn_id)) // remove dummy tuple which reserves space for update rollback - tpage.ApplyDelete(item.rid, txn, transaction_manager.log_manager) + tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) table.bpm.UnpinPage(tpage.GetPageId(), true) tpage.RemoveWLatchRecord(int32(txn.txn_id)) // does not release the latch because it is needed for rollback of UPDATE diff --git a/lib/testing/testing_tbl_gen/table_generator.go b/lib/testing/testing_tbl_gen/table_generator.go index 496e8ddd..73764e87 100644 --- a/lib/testing/testing_tbl_gen/table_generator.go +++ b/lib/testing/testing_tbl_gen/table_generator.go @@ -143,7 +143,7 @@ func FillTable(info *catalog.TableMetadata, table_meta *TableInsertMeta, txn *ac entry = append(entry, values[idx][i]) } tuple_ := tuple.NewTupleFromSchema(entry, info.Schema()) - rid, err := info.Table().InsertTuple(tuple_, txn, info.OID()) + rid, err := info.Table().InsertTuple(tuple_, txn, info.OID(), false) if rid == nil || err != nil { fmt.Printf("InsertTuple failed on FillTable rid = %v, err = %v", rid, err) panic("InsertTuple failed on FillTable!") From cc210d1d4ed8bae9d7149a93bb4f20ea61272141 Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 14:11:01 +0900 Subject: [PATCH 04/10] fixing dirty read occurence problem at update index: WIP(2). --- .../skiplist_index_executor_test.go | 4 +- lib/recovery/log_recovery/log_recovery.go | 4 +- lib/storage/access/table_heap.go | 5 ++- lib/storage/access/table_page.go | 45 ++++++++++--------- lib/storage/access/transaction_manager.go | 5 ++- lib/storage/index/skip_list_index.go | 3 +- 6 files changed, 37 insertions(+), 29 deletions(-) diff --git a/lib/execution/executors/executor_test/skiplist_index_executor_test.go b/lib/execution/executors/executor_test/skiplist_index_executor_test.go index b3933420..7ba51730 100644 --- a/lib/execution/executors/executor_test/skiplist_index_executor_test.go +++ b/lib/execution/executors/executor_test/skiplist_index_executor_test.go @@ -870,8 +870,8 @@ func testParallelTxnsQueryingSkipListIndexUsedColumns[T int32 | float32 | string randomUpdateOpFunc := func() { var updateKeyValBase T isFound, updateKeyValBaseP := getKeyAndMarkItInsValsAndDeletedValsForDelete() - if !isFound || keyType == types.Varchar { - //if !isFound { + //if !isFound || keyType == types.Varchar { + if !isFound { if execType == PARALLEL_EXEC { ch <- 1 } diff --git a/lib/recovery/log_recovery/log_recovery.go b/lib/recovery/log_recovery/log_recovery.go index b36b59d9..b393f8ba 100644 --- a/lib/recovery/log_recovery/log_recovery.go +++ b/lib/recovery/log_recovery/log_recovery.go @@ -159,7 +159,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool) if page_.GetLSN() < log_record.GetLSN() { // UpdateTuple overwrites Old_tuple argument // but it is no problem because log_record is read from log file again in Undo phase - page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager) + page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true) page_.SetLSN(log_record.GetLSN()) isRedoOccured = true } @@ -243,7 +243,7 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool { } else if log_record.Log_record_type == recovery.UPDATE { page_ := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId())) - _, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager) + _, err, _, _, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true) if err != nil { panic(fmt.Sprintln("UpdateTuple at rollback failed! err:", err)) } diff --git a/lib/storage/access/table_heap.go b/lib/storage/access/table_heap.go index 64c039d9..d87bed75 100644 --- a/lib/storage/access/table_heap.go +++ b/lib/storage/access/table_heap.go @@ -165,7 +165,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche page_.WLatch() page_.AddWLatchRecord(int32(txn.txn_id)) - is_updated, err, need_follow_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager) + is_updated, err, need_follow_tuple, dummy_rid, dummy_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager, false) t.bpm.UnpinPage(page_.GetPageId(), is_updated) if common.EnableDebug && common.ActiveLogKindSetting&common.PIN_COUNT_ASSERT > 0 { common.SH_Assert(page_.PinCount() == 0, "PinCount is not zero when finish TablePage::UpdateTuple!!!") @@ -220,6 +220,9 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche // reset seek start point of Insert to first page t.lastPageId = t.firstPageId txn.AddIntoWriteSet(NewWriteRecord(&rid, &rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) + if dummy_rid != nil && dummy_tuple != nil { + txn.AddIntoWriteSet(NewWriteRecord(dummy_rid, nil, RESERVE_SPACE, dummy_tuple, nil, t, oid)) + } } } diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index ec8c21fa..5ac8f589 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -145,7 +145,7 @@ func (tp *TablePage) InsertTuple(tuple *tuple.Tuple, log_manager *recovery.LogMa // for ensureing existance of enough space at rollback on abort or undo // return Tuple pointer when updated tuple1 need to be moved new page location and it should be inserted after old data deleted, otherwise returned nil func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, schema_ *schema.Schema, old_tuple *tuple.Tuple, rid *page.RID, txn *Transaction, - lock_manager *LockManager, log_manager *recovery.LogManager) (bool, error, *tuple.Tuple) { + lock_manager *LockManager, log_manager *recovery.LogManager, isForRollbackUndo bool) (bool, error, *tuple.Tuple, *page.RID, *tuple.Tuple) { if common.EnableDebug { defer func() { if common.ActiveLogKindSetting&common.DEBUGGING > 0 { @@ -167,11 +167,11 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, if txn.IsSharedLocked(rid) { if !lock_manager.LockUpgrade(txn, rid) { txn.SetState(ABORTED) - return false, nil, nil + return false, nil, nil, nil, nil } } else if !txn.IsExclusiveLocked(rid) && !lock_manager.LockExclusive(txn, rid) { txn.SetState(ABORTED) - return false, nil, nil + return false, nil, nil, nil, nil } } @@ -181,7 +181,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, if !txn.IsRecoveryPhase() { txn.SetState(ABORTED) } - return false, nil, nil + return false, nil, nil, nil, nil } tuple_size := tp.GetTupleSize(slot_num) // If the tuple1 is deleted, abort the transaction. @@ -189,11 +189,11 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, if !txn.IsRecoveryPhase() { txn.SetState(ABORTED) } - return false, nil, nil + return false, nil, nil, nil, nil } if IsReserved(tuple_size) { // ignore dummy tuple - return false, nil, nil + return false, nil, nil, nil, nil } // Copy out the old value. @@ -229,7 +229,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } if tp.getFreeSpaceRemaining()+tuple_size < update_tuple.Size() { - return false, ErrNotEnoughSpace, update_tuple + return false, ErrNotEnoughSpace, update_tuple, nil, nil } if log_manager.IsEnabledLogging() { @@ -239,21 +239,25 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, txn.SetPrevLSN(lsn) } + var dummy_rid *page.RID + var dummy_tuple *tuple.Tuple // whether dummy tuple insertion is needed or not - if update_tuple.Size() < tuple_size { + if update_tuple.Size() < tuple_size && !isForRollbackUndo { reserve_size := tuple_size - update_tuple.Size() // sizeTuple is needed metadata space additonaly - usableSpaceForDummy := tp.getFreeSpaceRemaining() - sizeTuple - if usableSpaceForDummy < reserve_size { - if usableSpaceForDummy < 1 { - // no need to reserve space for rollback + if tp.getFreeSpaceRemaining() >= sizeTuple { + usableSpaceForDummy := tp.getFreeSpaceRemaining() - sizeTuple + if usableSpaceForDummy < reserve_size { + if usableSpaceForDummy < 1 { + // no need to reserve space for rollback + } else { + // add dummy tuple which fill space for rollback of update + dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager) + } } else { - // add dummy tuple which fill space for rollback of update - tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager) + // add dummy tuple which reserves space for rollback of update + dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, reserve_size, txn, log_manager) } - } else { - // add dummy tuple which reserves space for rollback of update - tp.ReserveSpaceForRollbackUpdate(nil, tuple_size-update_tuple.Size(), txn, log_manager) } } @@ -275,11 +279,11 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } } - return true, nil, update_tuple + return true, nil, update_tuple, dummy_rid, dummy_tuple } // rid1 is not null when caller is Redo -func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager) *page.RID { +func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager) (*page.RID, *tuple.Tuple) { maxSlotNum := tp.GetTupleCount() buf := make([]byte, size) @@ -293,7 +297,6 @@ func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, t dummy_tuple := tuple.NewTuple(dummy_rid, size, buf[:size]) tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple) tp.SetFreeSpacePointer(tp.GetFreeSpacePointer() - dummy_tuple.Size()) - tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple) if dummy_rid.GetSlotNum() == tp.GetTupleCount() { tp.SetTupleCount(tp.GetTupleCount() + 1) @@ -310,7 +313,7 @@ func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, t txn.SetPrevLSN(lsn) } - return dummy_rid + return dummy_rid, dummy_tuple } func (tp *TablePage) MarkDelete(rid *page.RID, txn *Transaction, lock_manager *LockManager, log_manager *recovery.LogManager) (isMarked bool, markedTuple *tuple.Tuple) { diff --git a/lib/storage/access/transaction_manager.go b/lib/storage/access/transaction_manager.go index 433e07d1..91fffd24 100644 --- a/lib/storage/access/transaction_manager.go +++ b/lib/storage/access/transaction_manager.go @@ -248,7 +248,10 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. } isReserveSpaceHandled = false - tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager) + is_updated, err, _, _, _ := tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager, true) + if !is_updated || err != nil { + panic("rollback of normal UPDATE failed") + } table.bpm.UnpinPage(pageID, true) tpage.WUnlatch() } diff --git a/lib/storage/index/skip_list_index.go b/lib/storage/index/skip_list_index.go index 1e304fff..5e044ce7 100644 --- a/lib/storage/index/skip_list_index.go +++ b/lib/storage/index/skip_list_index.go @@ -1,7 +1,6 @@ package index import ( - "fmt" "github.com/ryogrid/SamehadaDB/lib/container/skip_list" "github.com/ryogrid/SamehadaDB/lib/samehada/samehada_util" "github.com/ryogrid/SamehadaDB/lib/storage/buffer" @@ -68,7 +67,7 @@ func (slidx *SkipListIndex) deleteEntryInner(key *tuple.Tuple, rid page.RID, txn } isSuccess := slidx.container.Remove(convedKeyVal, 0) if isSuccess == false { - panic(fmt.Sprintf("SkipListIndex::deleteEntryInner: %v %v\n", convedKeyVal.ToIFValue(), rid)) + //panic(fmt.Sprintf("SkipListIndex::deleteEntryInner: %v %v\n", convedKeyVal.ToIFValue(), rid)) } } From f8c8892aa4c167c3d5fc78f9985770c092ebbd40 Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 14:33:15 +0900 Subject: [PATCH 05/10] fixing dirty read occurence problem at update index: WIP(3). --- lib/recovery/log_recovery/log_recovery.go | 2 +- lib/storage/access/table_page.go | 15 ++++++++++++--- lib/storage/access/transaction_manager.go | 6 +++++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/lib/recovery/log_recovery/log_recovery.go b/lib/recovery/log_recovery/log_recovery.go index b393f8ba..3b3a6d01 100644 --- a/lib/recovery/log_recovery/log_recovery.go +++ b/lib/recovery/log_recovery/log_recovery.go @@ -182,7 +182,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool) page_ := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId())) if page_.GetLSN() < log_record.GetLSN() { - page_.ReserveSpaceForRollbackUpdate(&log_record.Reserving_rid, log_record.Reserving_tuple.Size(), txn, log_recovery.log_manager) + page_.ReserveSpaceForRollbackUpdate(&log_record.Reserving_rid, log_record.Reserving_tuple.Size(), txn, log_recovery.log_manager, nil) page_.SetLSN(log_record.GetLSN()) isRedoOccured = true } diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 5ac8f589..0f29f580 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -252,11 +252,11 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, // no need to reserve space for rollback } else { // add dummy tuple which fill space for rollback of update - dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager) + dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) } } else { // add dummy tuple which reserves space for rollback of update - dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, reserve_size, txn, log_manager) + dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, reserve_size, txn, log_manager, lock_manager) } } } @@ -283,7 +283,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } // rid1 is not null when caller is Redo -func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager) (*page.RID, *tuple.Tuple) { +func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager, lock_manager *LockManager) (*page.RID, *tuple.Tuple) { maxSlotNum := tp.GetTupleCount() buf := make([]byte, size) @@ -294,6 +294,15 @@ func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, t } else { dummy_rid = &page.RID{tp.GetPageId(), maxSlotNum} } + + if !txn.IsRecoveryPhase() { + // Acquire an exclusive lock for inserting a dummy tuple + locked := lock_manager.LockExclusive(txn, dummy_rid) + if !locked { + panic("could not acquire an exclusive lock of found slot (=RID)") + } + } + dummy_tuple := tuple.NewTuple(dummy_rid, size, buf[:size]) tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple) tp.SetFreeSpacePointer(tp.GetFreeSpacePointer() - dummy_tuple.Size()) diff --git a/lib/storage/access/transaction_manager.go b/lib/storage/access/transaction_manager.go index 91fffd24..bba49280 100644 --- a/lib/storage/access/transaction_manager.go +++ b/lib/storage/access/transaction_manager.go @@ -245,6 +245,9 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. tpage.WLatch() } else { // getting latch is not needed because it is already latched at RESERVE_SPACE handling + fmt.Println("TransactionManager::Abort getting latch is not needed because it is already latched at RESERVE_SPACE handling") + // unpin the page because it is pinned at RESERVE_SPACE handling + table.bpm.UnpinPage(pageID, true) } isReserveSpaceHandled = false @@ -283,12 +286,13 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. tpage.AddWLatchRecord(int32(txn.txn_id)) // remove dummy tuple which reserves space for update rollback tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) - table.bpm.UnpinPage(tpage.GetPageId(), true) + //table.bpm.UnpinPage(tpage.GetPageId(), true) tpage.RemoveWLatchRecord(int32(txn.txn_id)) // does not release the latch because it is needed for rollback of UPDATE // WriteRecords and logs of RESERVE_SPACE and UPDATE are continuous in single transaction view //tpage.WUnlatch() + isReserveSpaceHandled = true } write_set = write_set[:len(write_set)-1] } From abc5e91e23e7820ce8d08b1750145948ca09c37a Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 20:02:42 +0900 Subject: [PATCH 06/10] fixing dirty read occurence problem at update index: WIP(4). --- lib/storage/access/table_page.go | 80 +++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 0f29f580..c8d26a47 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -229,6 +229,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } if tp.getFreeSpaceRemaining()+tuple_size < update_tuple.Size() { + fmt.Println("getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) return false, ErrNotEnoughSpace, update_tuple, nil, nil } @@ -239,28 +240,6 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, txn.SetPrevLSN(lsn) } - var dummy_rid *page.RID - var dummy_tuple *tuple.Tuple - // whether dummy tuple insertion is needed or not - if update_tuple.Size() < tuple_size && !isForRollbackUndo { - reserve_size := tuple_size - update_tuple.Size() - // sizeTuple is needed metadata space additonaly - if tp.getFreeSpaceRemaining() >= sizeTuple { - usableSpaceForDummy := tp.getFreeSpaceRemaining() - sizeTuple - if usableSpaceForDummy < reserve_size { - if usableSpaceForDummy < 1 { - // no need to reserve space for rollback - } else { - // add dummy tuple which fill space for rollback of update - dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) - } - } else { - // add dummy tuple which reserves space for rollback of update - dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, reserve_size, txn, log_manager, lock_manager) - } - } - } - // Perform the update. free_space_pointer := tp.GetFreeSpacePointer() common.SH_Assert(tuple_offset >= free_space_pointer, "Offset should appear after current free space position.") @@ -279,6 +258,37 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } } + var dummy_rid *page.RID + var dummy_tuple *tuple.Tuple + // whether dummy tuple insertion is needed or not + if update_tuple.Size() < tuple_size && !isForRollbackUndo { + // addition of sizeTuple is needed because sizeTuple bytes is keep used + // after dummy tuple is deleted + reserve_size := tuple_size - update_tuple.Size() + sizeTuple + //reserve_size := tuple_size*2 + sizeTuple + // sizeTuple is needed metadata space additonaly + if tp.getFreeSpaceRemaining() >= sizeTuple+1 { + usableSpaceForDummy := tp.getFreeSpaceRemaining() - sizeTuple + if usableSpaceForDummy < reserve_size { + fmt.Println("fill few rest space for rollback for ", *rid, usableSpaceForDummy) + fmt.Println("new tuple size", update_tuple.Size(), "old tuple size", tuple_size) + dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) + //if usableSpaceForDummy < 1 { + // // no need to reserve space for rollback + //} else { + // // add dummy tuple which fill space for rollback of update + // dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) + //} + } else { + // add dummy tuple which reserves space for rollback of update + dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, reserve_size, txn, log_manager, lock_manager) + } + + //// add dummy tuple which fill rest all space of the page for rollback of update + //dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) + } + } + return true, nil, update_tuple, dummy_rid, dummy_tuple } @@ -304,8 +314,8 @@ func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, t } dummy_tuple := tuple.NewTuple(dummy_rid, size, buf[:size]) - tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple) tp.SetFreeSpacePointer(tp.GetFreeSpacePointer() - dummy_tuple.Size()) + tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple) if dummy_rid.GetSlotNum() == tp.GetTupleCount() { tp.SetTupleCount(tp.GetTupleCount() + 1) @@ -410,6 +420,8 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r } } + spaceRemaining := tp.getFreeSpaceRemaining() + slot_num := rid.GetSlotNum() common.SH_Assert(slot_num < tp.GetTupleCount(), "Cannot have more slots than tuples.") @@ -419,12 +431,15 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r if IsDeleted(tuple_size) { tuple_size = UnsetDeletedFlag(tuple_size) } + isReserved := false if IsReserved(tuple_size) { tuple_size = UnsetReservedFlag(tuple_size) + isReserved = true + fmt.Println("ApplyDelete: freed dummy tuple: ", tuple_size) } // Otherwise we are rolling back an insert. - if tuple_size <= 0 { + if tuple_size < 0 { panic("TablePage::ApplyDelete: target tuple size is illegal!!!") } @@ -448,12 +463,23 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r free_space_pointer := tp.GetFreeSpacePointer() common.SH_Assert(tuple_offset >= free_space_pointer, "Free space appears before tuples.") - - copy(tp.Data()[free_space_pointer+tuple_size:], tp.Data()[free_space_pointer:tuple_offset]) - + //if tuple_offset >= free_space_pointer { + // copy(tp.Data()[free_space_pointer+tuple_size:], tp.Data()[free_space_pointer:tuple_offset]) + //} + // note: copy doesn't occur dummy tuple deleted and free space pointer value is not normal position + + //if isReserved && spaceRemaining == 0 { + // // this case is few rest rest space filled dummy tuple is deleted + // // so header info of tuple also should be deleted + // fmt.Println("ApplyDelete: delete dummy tuple (special case): ", tuple_size) + // tp.SetFreeSpacePointer(free_space_pointer + tuple_size + sizeTuple) + // tp.SetTupleCount(tp.GetTupleCount() - 1) + //} else { + // normal case tp.SetFreeSpacePointer(free_space_pointer + tuple_size) tp.SetTupleSize(slot_num, 0) tp.SetTupleOffsetAtSlot(slot_num, 0) + //} // Update all tuple offsets. tuple_count := int(tp.GetTupleCount()) From 914d6a439a6bb1aa4b66b1b135d2765b0b99a0f2 Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 20:10:18 +0900 Subject: [PATCH 07/10] fixing dirty read occurence problem at update index: WIP(5). --- lib/storage/access/table_page.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index c8d26a47..51c099a4 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -420,7 +420,7 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r } } - spaceRemaining := tp.getFreeSpaceRemaining() + //spaceRemaining := tp.getFreeSpaceRemaining() slot_num := rid.GetSlotNum() common.SH_Assert(slot_num < tp.GetTupleCount(), "Cannot have more slots than tuples.") @@ -431,10 +431,10 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r if IsDeleted(tuple_size) { tuple_size = UnsetDeletedFlag(tuple_size) } - isReserved := false + //isReserved := false if IsReserved(tuple_size) { tuple_size = UnsetReservedFlag(tuple_size) - isReserved = true + //isReserved = true fmt.Println("ApplyDelete: freed dummy tuple: ", tuple_size) } // Otherwise we are rolling back an insert. From f5b8b13b5ef277180eabd1af885fce89e9f1053a Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Sun, 16 Jun 2024 20:36:22 +0900 Subject: [PATCH 08/10] fixing dirty read occurence problem at update index (2): WIP. --- lib/recovery/log_manager.go | 9 -- lib/recovery/log_record.go | 14 --- lib/recovery/log_recovery/log_recovery.go | 22 +--- lib/storage/access/table_heap.go | 7 +- lib/storage/access/table_page.go | 138 +++------------------- lib/storage/access/transaction_manager.go | 47 +------- 6 files changed, 21 insertions(+), 216 deletions(-) diff --git a/lib/recovery/log_manager.go b/lib/recovery/log_manager.go index 37513d07..ccfd37c7 100644 --- a/lib/recovery/log_manager.go +++ b/lib/recovery/log_manager.go @@ -160,16 +160,7 @@ func (log_manager *LogManager) AppendLogRecord(log_record *LogRecord) types.LSN binary.Write(buf, binary.LittleEndian, log_record.Prev_page_id) pageIdInBytes := buf.Bytes() copy(log_manager.log_buffer[pos:], pageIdInBytes) - } else if log_record.Log_record_type == RESERVE_SPACE { - buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, log_record.Reserving_rid) - ridInBytes := buf.Bytes() - copy(log_manager.log_buffer[pos:], ridInBytes) - pos += uint32(unsafe.Sizeof(log_record.Reserving_rid)) - // we have provided serialize function for tuple class - log_record.Reserving_tuple.SerializeTo(log_manager.log_buffer[pos:]) } - // TODO: (SDB) need to implement serialization of RESERVE_SPACE type log log_manager.latch.WUnlock() return log_record.Lsn diff --git a/lib/recovery/log_record.go b/lib/recovery/log_record.go index af777265..3a4221de 100644 --- a/lib/recovery/log_record.go +++ b/lib/recovery/log_record.go @@ -22,7 +22,6 @@ const ( APPLYDELETE ROLLBACKDELETE UPDATE - RESERVE_SPACE BEGIN COMMIT ABORT @@ -145,19 +144,6 @@ func NewLogRecordNewPage(txn_id types.TxnID, prev_lsn types.LSN, log_record_type return ret } -func NewLogRecordReserveSpace(txn_id types.TxnID, prev_lsn types.LSN, log_record_type LogRecordType, rid page.RID, tuple *tuple.Tuple) *LogRecord { - ret := new(LogRecord) - ret.Size = HEADER_SIZE - ret.Txn_id = txn_id - ret.Prev_lsn = prev_lsn - ret.Log_record_type = log_record_type - ret.Reserving_rid = rid - ret.Reserving_tuple = *tuple - // calculate log record size - ret.Size = HEADER_SIZE + uint32(unsafe.Sizeof(rid)) + tuple.Size() + uint32(unsafe.Sizeof(int32(0))) - return ret -} - func (log_record *LogRecord) GetDeleteRID() page.RID { return log_record.Delete_rid } func (log_record *LogRecord) GetInserteTuple() tuple.Tuple { return log_record.Insert_tuple } func (log_record *LogRecord) GetInsertRID() page.RID { return log_record.Insert_rid } diff --git a/lib/recovery/log_recovery/log_recovery.go b/lib/recovery/log_recovery/log_recovery.go index 3b3a6d01..c0c4e84d 100644 --- a/lib/recovery/log_recovery/log_recovery.go +++ b/lib/recovery/log_recovery/log_recovery.go @@ -81,10 +81,6 @@ func (log_recovery *LogRecovery) DeserializeLogRecord(data []byte, log_record *r log_record.New_tuple.DeserializeFrom(data[pos:]) } else if log_record.Log_record_type == recovery.NEWPAGE { binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Prev_page_id) - } else if log_record.Log_record_type == recovery.RESERVE_SPACE { - binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Reserving_rid) - pos += uint32(unsafe.Sizeof(log_record.Reserving_rid)) - log_record.Reserving_tuple.DeserializeFrom(data[pos:]) } //fmt.Println(log_record) @@ -159,7 +155,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool) if page_.GetLSN() < log_record.GetLSN() { // UpdateTuple overwrites Old_tuple argument // but it is no problem because log_record is read from log file again in Undo phase - page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true) + page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager) page_.SetLSN(log_record.GetLSN()) isRedoOccured = true } @@ -178,14 +174,6 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool) new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, txn) //log_recovery.buffer_pool_manager.FlushPage(page_id) log_recovery.buffer_pool_manager.UnpinPage(page_id, true) - } else if log_record.Log_record_type == recovery.RESERVE_SPACE { - page_ := - access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId())) - if page_.GetLSN() < log_record.GetLSN() { - page_.ReserveSpaceForRollbackUpdate(&log_record.Reserving_rid, log_record.Reserving_tuple.Size(), txn, log_recovery.log_manager, nil) - page_.SetLSN(log_record.GetLSN()) - isRedoOccured = true - } } buffer_offset += log_record.Size } @@ -243,18 +231,12 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool { } else if log_record.Log_record_type == recovery.UPDATE { page_ := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId())) - _, err, _, _, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true) + _, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager) if err != nil { panic(fmt.Sprintln("UpdateTuple at rollback failed! err:", err)) } log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true) isUndoOccured = true - } else if log_record.Log_record_type == recovery.RESERVE_SPACE { - page_ := - access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Reserving_rid.GetPageId())) - page_.ApplyDelete(&log_record.Reserving_rid, txn, log_recovery.log_manager) - log_recovery.buffer_pool_manager.UnpinPage(log_record.Reserving_rid.GetPageId(), true) - isUndoOccured = true } lsn = log_record.Prev_lsn diff --git a/lib/storage/access/table_heap.go b/lib/storage/access/table_heap.go index d87bed75..b14b32e4 100644 --- a/lib/storage/access/table_heap.go +++ b/lib/storage/access/table_heap.go @@ -165,7 +165,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche page_.WLatch() page_.AddWLatchRecord(int32(txn.txn_id)) - is_updated, err, need_follow_tuple, dummy_rid, dummy_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager, false) + is_updated, err, need_follow_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager) t.bpm.UnpinPage(page_.GetPageId(), is_updated) if common.EnableDebug && common.ActiveLogKindSetting&common.PIN_COUNT_ASSERT > 0 { common.SH_Assert(page_.PinCount() == 0, "PinCount is not zero when finish TablePage::UpdateTuple!!!") @@ -175,7 +175,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche var new_rid *page.RID var isUpdateWithDelInsert = false - if is_updated == false && err == ErrNotEnoughSpace { + if is_updated == false && (err == ErrNotEnoughSpace || err == ErrRollbackDifficult) { // delete old_tuple(rid1) // and insert need_follow_tuple(new_rid) // as updating @@ -220,9 +220,6 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche // reset seek start point of Insert to first page t.lastPageId = t.firstPageId txn.AddIntoWriteSet(NewWriteRecord(&rid, &rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) - if dummy_rid != nil && dummy_tuple != nil { - txn.AddIntoWriteSet(NewWriteRecord(dummy_rid, nil, RESERVE_SPACE, dummy_tuple, nil, t, oid)) - } } } diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 51c099a4..87364918 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -33,6 +33,7 @@ const offsetTupleSize = uint32(28) const ErrEmptyTuple = errors.Error("tuple1 cannot be empty.") const ErrNotEnoughSpace = errors.Error("there is not enough space.") +const ErrRollbackDifficult = errors.Error("rollback is difficult. tuple need to be moved.") const ErrSelfDeletedCase = errors.Error("encont self deleted tuple1.") const ErrGeneral = errors.Error("some error is occured!") @@ -145,7 +146,7 @@ func (tp *TablePage) InsertTuple(tuple *tuple.Tuple, log_manager *recovery.LogMa // for ensureing existance of enough space at rollback on abort or undo // return Tuple pointer when updated tuple1 need to be moved new page location and it should be inserted after old data deleted, otherwise returned nil func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, schema_ *schema.Schema, old_tuple *tuple.Tuple, rid *page.RID, txn *Transaction, - lock_manager *LockManager, log_manager *recovery.LogManager, isForRollbackUndo bool) (bool, error, *tuple.Tuple, *page.RID, *tuple.Tuple) { + lock_manager *LockManager, log_manager *recovery.LogManager) (bool, error, *tuple.Tuple) { if common.EnableDebug { defer func() { if common.ActiveLogKindSetting&common.DEBUGGING > 0 { @@ -167,11 +168,11 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, if txn.IsSharedLocked(rid) { if !lock_manager.LockUpgrade(txn, rid) { txn.SetState(ABORTED) - return false, nil, nil, nil, nil + return false, nil, nil } } else if !txn.IsExclusiveLocked(rid) && !lock_manager.LockExclusive(txn, rid) { txn.SetState(ABORTED) - return false, nil, nil, nil, nil + return false, nil, nil } } @@ -181,7 +182,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, if !txn.IsRecoveryPhase() { txn.SetState(ABORTED) } - return false, nil, nil, nil, nil + return false, nil, nil } tuple_size := tp.GetTupleSize(slot_num) // If the tuple1 is deleted, abort the transaction. @@ -189,11 +190,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, if !txn.IsRecoveryPhase() { txn.SetState(ABORTED) } - return false, nil, nil, nil, nil - } - if IsReserved(tuple_size) { - // ignore dummy tuple - return false, nil, nil, nil, nil + return false, nil, nil } // Copy out the old value. @@ -229,8 +226,13 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } if tp.getFreeSpaceRemaining()+tuple_size < update_tuple.Size() { - fmt.Println("getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) - return false, ErrNotEnoughSpace, update_tuple, nil, nil + fmt.Println("ErrNotEnoughSpace: getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) + return false, ErrNotEnoughSpace, update_tuple + } + + if tuple_size > update_tuple.Size() { + fmt.Println("ErrRollbackDifficult: getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) + return false, ErrRollbackDifficult, update_tuple } if log_manager.IsEnabledLogging() { @@ -258,81 +260,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } } - var dummy_rid *page.RID - var dummy_tuple *tuple.Tuple - // whether dummy tuple insertion is needed or not - if update_tuple.Size() < tuple_size && !isForRollbackUndo { - // addition of sizeTuple is needed because sizeTuple bytes is keep used - // after dummy tuple is deleted - reserve_size := tuple_size - update_tuple.Size() + sizeTuple - //reserve_size := tuple_size*2 + sizeTuple - // sizeTuple is needed metadata space additonaly - if tp.getFreeSpaceRemaining() >= sizeTuple+1 { - usableSpaceForDummy := tp.getFreeSpaceRemaining() - sizeTuple - if usableSpaceForDummy < reserve_size { - fmt.Println("fill few rest space for rollback for ", *rid, usableSpaceForDummy) - fmt.Println("new tuple size", update_tuple.Size(), "old tuple size", tuple_size) - dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) - //if usableSpaceForDummy < 1 { - // // no need to reserve space for rollback - //} else { - // // add dummy tuple which fill space for rollback of update - // dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) - //} - } else { - // add dummy tuple which reserves space for rollback of update - dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, reserve_size, txn, log_manager, lock_manager) - } - - //// add dummy tuple which fill rest all space of the page for rollback of update - //dummy_rid, dummy_tuple = tp.ReserveSpaceForRollbackUpdate(nil, usableSpaceForDummy, txn, log_manager, lock_manager) - } - } - - return true, nil, update_tuple, dummy_rid, dummy_tuple -} - -// rid1 is not null when caller is Redo -func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager, lock_manager *LockManager) (*page.RID, *tuple.Tuple) { - maxSlotNum := tp.GetTupleCount() - buf := make([]byte, size) - - // set dummy tuple for rollback - var dummy_rid *page.RID - if rid != nil { - dummy_rid = rid - } else { - dummy_rid = &page.RID{tp.GetPageId(), maxSlotNum} - } - - if !txn.IsRecoveryPhase() { - // Acquire an exclusive lock for inserting a dummy tuple - locked := lock_manager.LockExclusive(txn, dummy_rid) - if !locked { - panic("could not acquire an exclusive lock of found slot (=RID)") - } - } - - dummy_tuple := tuple.NewTuple(dummy_rid, size, buf[:size]) - tp.SetFreeSpacePointer(tp.GetFreeSpacePointer() - dummy_tuple.Size()) - tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple) - - if dummy_rid.GetSlotNum() == tp.GetTupleCount() { - tp.SetTupleCount(tp.GetTupleCount() + 1) - } - - if size > 0 { - tp.SetTupleSize(dummy_rid.GetSlotNum(), SetReservedFlag(size)) - } - - if log_manager.IsEnabledLogging() { - log_record := recovery.NewLogRecordReserveSpace(txn.GetTransactionId(), txn.GetPrevLSN(), recovery.RESERVE_SPACE, *dummy_rid, dummy_tuple) - lsn := log_manager.AppendLogRecord(log_record) - tp.SetLSN(lsn) - txn.SetPrevLSN(lsn) - } - - return dummy_rid, dummy_tuple + return true, nil, update_tuple } func (tp *TablePage) MarkDelete(rid *page.RID, txn *Transaction, lock_manager *LockManager, log_manager *recovery.LogManager) (isMarked bool, markedTuple *tuple.Tuple) { @@ -431,12 +359,6 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r if IsDeleted(tuple_size) { tuple_size = UnsetDeletedFlag(tuple_size) } - //isReserved := false - if IsReserved(tuple_size) { - tuple_size = UnsetReservedFlag(tuple_size) - //isReserved = true - fmt.Println("ApplyDelete: freed dummy tuple: ", tuple_size) - } // Otherwise we are rolling back an insert. if tuple_size < 0 { @@ -463,23 +385,10 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r free_space_pointer := tp.GetFreeSpacePointer() common.SH_Assert(tuple_offset >= free_space_pointer, "Free space appears before tuples.") - //if tuple_offset >= free_space_pointer { - // copy(tp.Data()[free_space_pointer+tuple_size:], tp.Data()[free_space_pointer:tuple_offset]) - //} - // note: copy doesn't occur dummy tuple deleted and free space pointer value is not normal position - - //if isReserved && spaceRemaining == 0 { - // // this case is few rest rest space filled dummy tuple is deleted - // // so header info of tuple also should be deleted - // fmt.Println("ApplyDelete: delete dummy tuple (special case): ", tuple_size) - // tp.SetFreeSpacePointer(free_space_pointer + tuple_size + sizeTuple) - // tp.SetTupleCount(tp.GetTupleCount() - 1) - //} else { - // normal case + tp.SetFreeSpacePointer(free_space_pointer + tuple_size) tp.SetTupleSize(slot_num, 0) tp.SetTupleOffsetAtSlot(slot_num, 0) - //} // Update all tuple offsets. tuple_count := int(tp.GetTupleCount()) @@ -717,11 +626,6 @@ func (tp *TablePage) GetTuple(rid *page.RID, log_manager *recovery.LogManager, l } } - if IsReserved(tupleSize) { - // handle as same as deleted tuple - return tuple.NewTuple(rid, 0, make([]byte, 0)), ErrSelfDeletedCase - } - tupleData := make([]byte, tupleSize) copy(tupleData, tp.Data()[tupleOffset:]) @@ -775,15 +679,3 @@ func SetDeletedFlag(tuple_size uint32) uint32 { func UnsetDeletedFlag(tuple_size uint32) uint32 { return tuple_size & (^uint32(deleteMask)) } - -func IsReserved(tuple_size uint32) bool { - return tuple_size&uint32(reservedMask) == uint32(reservedMask) || tuple_size == 0 -} - -func SetReservedFlag(tuple_size uint32) uint32 { - return tuple_size | uint32(reservedMask) -} - -func UnsetReservedFlag(tuple_size uint32) uint32 { - return tuple_size & (^uint32(reservedMask)) -} diff --git a/lib/storage/access/transaction_manager.go b/lib/storage/access/transaction_manager.go index bba49280..a118056a 100644 --- a/lib/storage/access/transaction_manager.go +++ b/lib/storage/access/transaction_manager.go @@ -105,19 +105,6 @@ func (transaction_manager *TransactionManager) Commit(catalog_ catalog_interface tpage.RemoveWLatchRecord(int32(txn.txn_id)) tpage.WUnlatch() } - } else if item.wtype == RESERVE_SPACE { - if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Commit handle RESERVE_SPACE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, rid) - } - pageID := rid.GetPageId() - tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) - tpage.WLatch() - tpage.AddWLatchRecord(int32(txn.txn_id)) - // remove dummy tuple which reserves space for update rollback - tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) - table.bpm.UnpinPage(tpage.GetPageId(), true) - tpage.RemoveWLatchRecord(int32(txn.txn_id)) - tpage.WUnlatch() } write_set = write_set[:len(write_set)-1] } @@ -170,9 +157,6 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. fmt.Printf("TransactionManager::Abort txn.txn_id:%v dbgInfo:%s write_set: %s\n", txn.txn_id, txn.dbgInfo, writeSetStr) } - // flag for making handing RESERVE_SPACE and UPDATE write log atomic - var isReserveSpaceHandled = false - // Rollback before releasing the access. for len(write_set) != 0 { item := write_set[len(write_set)-1] @@ -241,17 +225,8 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. // Note that this also releases the lock when holding the page latch. pageID := rid.GetPageId() tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) - if !isReserveSpaceHandled { - tpage.WLatch() - } else { - // getting latch is not needed because it is already latched at RESERVE_SPACE handling - fmt.Println("TransactionManager::Abort getting latch is not needed because it is already latched at RESERVE_SPACE handling") - // unpin the page because it is pinned at RESERVE_SPACE handling - table.bpm.UnpinPage(pageID, true) - } - isReserveSpaceHandled = false - - is_updated, err, _, _, _ := tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager, true) + tpage.WLatch() + is_updated, err, _ := tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager) if !is_updated || err != nil { panic("rollback of normal UPDATE failed") } @@ -275,24 +250,6 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. } } } - } else if item.wtype == RESERVE_SPACE { - // TODO: (SDB) critical section for this operation should be concatenated with UPDATE case... - if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 { - fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid1:%v\n", txn.txn_id, txn.dbgInfo, item.rid1) - } - pageID := item.rid1.GetPageId() - tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) - tpage.WLatch() - tpage.AddWLatchRecord(int32(txn.txn_id)) - // remove dummy tuple which reserves space for update rollback - tpage.ApplyDelete(item.rid1, txn, transaction_manager.log_manager) - //table.bpm.UnpinPage(tpage.GetPageId(), true) - tpage.RemoveWLatchRecord(int32(txn.txn_id)) - // does not release the latch because it is needed for rollback of UPDATE - // WriteRecords and logs of RESERVE_SPACE and UPDATE are continuous in single transaction view - - //tpage.WUnlatch() - isReserveSpaceHandled = true } write_set = write_set[:len(write_set)-1] } From 33114b67a8c267feee36f3e196878462f8d6a0a7 Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Mon, 17 Jun 2024 08:47:32 +0900 Subject: [PATCH 09/10] fixing dirty read occurence problem at update index (2): WIP(2). --- lib/storage/access/table_page.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 87364918..7ba72124 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -385,6 +385,7 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r free_space_pointer := tp.GetFreeSpacePointer() common.SH_Assert(tuple_offset >= free_space_pointer, "Free space appears before tuples.") + copy(tp.Data()[free_space_pointer+tuple_size:], tp.Data()[free_space_pointer:tuple_offset]) tp.SetFreeSpacePointer(free_space_pointer + tuple_size) tp.SetTupleSize(slot_num, 0) From 0998b4accb90cd8c12e5eff99edc4fd021937cce Mon Sep 17 00:00:00 2001 From: Ryo Kanbayashi Date: Mon, 17 Jun 2024 09:00:50 +0900 Subject: [PATCH 10/10] fixing dirty read occurence problem at update index (2): WIP(3). --- lib/recovery/log_recovery/log_recovery.go | 4 ++-- lib/storage/access/table_heap.go | 6 +++--- lib/storage/access/table_page.go | 8 ++++---- lib/storage/access/transaction_manager.go | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/recovery/log_recovery/log_recovery.go b/lib/recovery/log_recovery/log_recovery.go index c0c4e84d..2ae36c3d 100644 --- a/lib/recovery/log_recovery/log_recovery.go +++ b/lib/recovery/log_recovery/log_recovery.go @@ -155,7 +155,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool) if page_.GetLSN() < log_record.GetLSN() { // UpdateTuple overwrites Old_tuple argument // but it is no problem because log_record is read from log file again in Undo phase - page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager) + page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, false) page_.SetLSN(log_record.GetLSN()) isRedoOccured = true } @@ -231,7 +231,7 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool { } else if log_record.Log_record_type == recovery.UPDATE { page_ := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId())) - _, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager) + _, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true) if err != nil { panic(fmt.Sprintln("UpdateTuple at rollback failed! err:", err)) } diff --git a/lib/storage/access/table_heap.go b/lib/storage/access/table_heap.go index b14b32e4..e8250f4c 100644 --- a/lib/storage/access/table_heap.go +++ b/lib/storage/access/table_heap.go @@ -165,7 +165,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche page_.WLatch() page_.AddWLatchRecord(int32(txn.txn_id)) - is_updated, err, need_follow_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager) + is_updated, err, need_follow_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager, false) t.bpm.UnpinPage(page_.GetPageId(), is_updated) if common.EnableDebug && common.ActiveLogKindSetting&common.PIN_COUNT_ASSERT > 0 { common.SH_Assert(page_.PinCount() == 0, "PinCount is not zero when finish TablePage::UpdateTuple!!!") @@ -214,11 +214,11 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche // when err == ErrNotEnoughSpace route and old tuple delete is only succeeded, DELETE write set entry is added above (no come here) if is_updated && txn.GetState() != ABORTED { if isUpdateWithDelInsert { - t.lastPageId = t.firstPageId + //t.lastPageId = t.firstPageId txn.AddIntoWriteSet(NewWriteRecord(&rid, new_rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) } else { // reset seek start point of Insert to first page - t.lastPageId = t.firstPageId + //t.lastPageId = t.firstPageId txn.AddIntoWriteSet(NewWriteRecord(&rid, &rid, UPDATE, old_tuple, need_follow_tuple, t, oid)) } } diff --git a/lib/storage/access/table_page.go b/lib/storage/access/table_page.go index 7ba72124..23c1ce14 100644 --- a/lib/storage/access/table_page.go +++ b/lib/storage/access/table_page.go @@ -146,7 +146,7 @@ func (tp *TablePage) InsertTuple(tuple *tuple.Tuple, log_manager *recovery.LogMa // for ensureing existance of enough space at rollback on abort or undo // return Tuple pointer when updated tuple1 need to be moved new page location and it should be inserted after old data deleted, otherwise returned nil func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, schema_ *schema.Schema, old_tuple *tuple.Tuple, rid *page.RID, txn *Transaction, - lock_manager *LockManager, log_manager *recovery.LogManager) (bool, error, *tuple.Tuple) { + lock_manager *LockManager, log_manager *recovery.LogManager, isRollbackOrUndo bool) (bool, error, *tuple.Tuple) { if common.EnableDebug { defer func() { if common.ActiveLogKindSetting&common.DEBUGGING > 0 { @@ -226,12 +226,12 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, } if tp.getFreeSpaceRemaining()+tuple_size < update_tuple.Size() { - fmt.Println("ErrNotEnoughSpace: getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) + //fmt.Println("ErrNotEnoughSpace: getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) return false, ErrNotEnoughSpace, update_tuple } - if tuple_size > update_tuple.Size() { - fmt.Println("ErrRollbackDifficult: getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) + if tuple_size > update_tuple.Size() && !isRollbackOrUndo { + //fmt.Println("ErrRollbackDifficult: getFreeSpaceRemaining", tp.getFreeSpaceRemaining(), "tuple_size", tuple_size, "update_tuple.Size()", update_tuple.Size(), "RID", *rid) return false, ErrRollbackDifficult, update_tuple } diff --git a/lib/storage/access/transaction_manager.go b/lib/storage/access/transaction_manager.go index a118056a..edc20527 100644 --- a/lib/storage/access/transaction_manager.go +++ b/lib/storage/access/transaction_manager.go @@ -226,7 +226,7 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface. pageID := rid.GetPageId() tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID)) tpage.WLatch() - is_updated, err, _ := tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager) + is_updated, err, _ := tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager, true) if !is_updated || err != nil { panic("rollback of normal UPDATE failed") }