Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: dirty read occurence problem at update index and Update operation rollback fails problem #41

Merged
Merged
4 changes: 2 additions & 2 deletions lib/catalog/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Catalog) insertTable(tableMetadata *TableMetadata, txn *access.Transact
first_tuple := tuple.NewTupleFromSchema(row, TableCatalogSchema())

// insert entry to TableCatalogPage (PageId = 0)
c.tableHeap.InsertTuple(first_tuple, txn, tableMetadata.OID())
c.tableHeap.InsertTuple(first_tuple, txn, tableMetadata.OID(), false)
for _, column_ := range tableMetadata.schema.GetColumns() {
row := make([]types.Value, 0)
row = append(row, types.NewInteger(int32(tableMetadata.oid)))
Expand All @@ -214,7 +214,7 @@ func (c *Catalog) insertTable(tableMetadata *TableMetadata, txn *access.Transact
new_tuple := tuple.NewTupleFromSchema(row, ColumnsCatalogSchema())

// insert entry to ColumnsCatalogPage (PageId = 1)
c.tableIds[ColumnsCatalogOID].Table().InsertTuple(new_tuple, txn, ColumnsCatalogOID)
c.tableIds[ColumnsCatalogOID].Table().InsertTuple(new_tuple, txn, ColumnsCatalogOID, false)
}
// flush a page having table definitions
c.bpm.FlushPage(TableCatalogPageId)
Expand Down
2 changes: 1 addition & 1 deletion lib/execution/executors/delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (e *DeleteExecutor) Next() (*tuple.Tuple, Done, error) {

rid := t.GetRID()
tableMetadata := e.child.GetTableMetaData()
is_marked := tableMetadata.Table().MarkDelete(rid, tableMetadata.OID(), e.txn)
is_marked := tableMetadata.Table().MarkDelete(rid, tableMetadata.OID(), e.txn, false)
if !is_marked {
err := errors.New("marking tuple deleted failed. PageId:SlotNum = " + string(rid.GetPageId()) + ":" + fmt.Sprint(rid.GetSlotNum()))
e.txn.SetState(access.ABORTED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,8 @@ func testParallelTxnsQueryingSkipListIndexUsedColumns[T int32 | float32 | string
randomUpdateOpFunc := func() {
var updateKeyValBase T
isFound, updateKeyValBaseP := getKeyAndMarkItInsValsAndDeletedValsForDelete()
if !isFound || keyType == types.Varchar {
//if !isFound || keyType == types.Varchar {
if !isFound {
if execType == PARALLEL_EXEC {
ch <- 1
}
Expand Down
2 changes: 1 addition & 1 deletion lib/execution/executors/insert_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (e *InsertExecutor) Next() (*tuple.Tuple, Done, error) {
for _, values := range e.plan.GetRawValues() {
tuple_ := tuple.NewTupleFromSchema(values, e.tableMetadata.Schema())
tableHeap := e.tableMetadata.Table()
rid, err := tableHeap.InsertTuple(tuple_, e.context.txn, e.tableMetadata.OID())
rid, err := tableHeap.InsertTuple(tuple_, e.context.txn, e.tableMetadata.OID(), false)
if err != nil {
return nil, true, err
}
Expand Down
20 changes: 3 additions & 17 deletions lib/execution/executors/update_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (e *UpdateExecutor) Next() (*tuple.Tuple, Done, error) {
common.NewRIDAtNormal = true
}

if !is_updated && updateErr != access.ErrPartialUpdate {
if !is_updated || updateErr != nil {
err_ := errors.New("tuple update failed. PageId:SlotNum = " + string(rid.GetPageId()) + ":" + fmt.Sprint(rid.GetSlotNum()))
e.txn.SetState(access.ABORTED)
return nil, false, err_
Expand All @@ -86,27 +86,13 @@ func (e *UpdateExecutor) Next() (*tuple.Tuple, Done, error) {
} else {
index_ := ret
if updateIdxs == nil || samehada_util.IsContainList[int](updateIdxs, ii) {
if updateErr == access.ErrPartialUpdate {
// when tuple is moved page location on update, RID is changed to new value
// removing index entry is done at commit phase because delete operation uses marking technique

fmt.Println("DeleteEntry due to ErrPartialUpdate occured.")
index_.DeleteEntry(t, *rid, e.txn)

// do nothing
} else if new_rid != nil {
if new_rid != nil {
index_.UpdateEntry(t, *rid, updateTuple, *new_rid, e.txn)
} else {
index_.UpdateEntry(t, *rid, updateTuple, *rid, e.txn)
}
} else {
if updateErr == access.ErrPartialUpdate {
// when tuple is moved page location on update, RID is changed to new value
// removing index entry is done at commit phase because delete operation uses marking technique

fmt.Println("DeleteEntry due to ErrPartialUpdate occured.")
index_.DeleteEntry(t, *rid, e.txn)
} else if new_rid != nil {
if new_rid != nil {
index_.UpdateEntry(t, *rid, updateTuple, *new_rid, e.txn)
} else {
// do nothing
Expand Down
2 changes: 1 addition & 1 deletion lib/planner/optimizer/optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func SetupTableWithMetadata(exec_ctx *executors.ExecutorContext, tableMeta *Setu
vals = append(vals, types.NewValue(genFunc(ii)))
}
tuple_ := tuple.NewTupleFromSchema(vals, schema_)
rid, _ := tm.Table().InsertTuple(tuple_, txn, tm.OID())
rid, _ := tm.Table().InsertTuple(tuple_, txn, tm.OID(), false)
for jj, colMeta := range tableMeta.Columns {
if colMeta.IdxKind != index_constants.INDEX_KIND_INVALID {
tm.GetIndex(jj).InsertEntry(tuple_, *rid, txn)
Expand Down
9 changes: 0 additions & 9 deletions lib/recovery/log_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,7 @@ func (log_manager *LogManager) AppendLogRecord(log_record *LogRecord) types.LSN
binary.Write(buf, binary.LittleEndian, log_record.Prev_page_id)
pageIdInBytes := buf.Bytes()
copy(log_manager.log_buffer[pos:], pageIdInBytes)
} else if log_record.Log_record_type == RESERVE_SPACE {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, log_record.Reserving_rid)
ridInBytes := buf.Bytes()
copy(log_manager.log_buffer[pos:], ridInBytes)
pos += uint32(unsafe.Sizeof(log_record.Reserving_rid))
// we have provided serialize function for tuple class
log_record.Reserving_tuple.SerializeTo(log_manager.log_buffer[pos:])
}
// TODO: (SDB) need to implement serialization of RESERVE_SPACE type log

log_manager.latch.WUnlock()
return log_record.Lsn
Expand Down
14 changes: 0 additions & 14 deletions lib/recovery/log_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const (
APPLYDELETE
ROLLBACKDELETE
UPDATE
RESERVE_SPACE
BEGIN
COMMIT
ABORT
Expand Down Expand Up @@ -145,19 +144,6 @@ func NewLogRecordNewPage(txn_id types.TxnID, prev_lsn types.LSN, log_record_type
return ret
}

func NewLogRecordReserveSpace(txn_id types.TxnID, prev_lsn types.LSN, log_record_type LogRecordType, rid page.RID, tuple *tuple.Tuple) *LogRecord {
ret := new(LogRecord)
ret.Size = HEADER_SIZE
ret.Txn_id = txn_id
ret.Prev_lsn = prev_lsn
ret.Log_record_type = log_record_type
ret.Reserving_rid = rid
ret.Reserving_tuple = *tuple
// calculate log record size
ret.Size = HEADER_SIZE + uint32(unsafe.Sizeof(rid)) + tuple.Size() + uint32(unsafe.Sizeof(int32(0)))
return ret
}

func (log_record *LogRecord) GetDeleteRID() page.RID { return log_record.Delete_rid }
func (log_record *LogRecord) GetInserteTuple() tuple.Tuple { return log_record.Insert_tuple }
func (log_record *LogRecord) GetInsertRID() page.RID { return log_record.Insert_rid }
Expand Down
22 changes: 2 additions & 20 deletions lib/recovery/log_recovery/log_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ func (log_recovery *LogRecovery) DeserializeLogRecord(data []byte, log_record *r
log_record.New_tuple.DeserializeFrom(data[pos:])
} else if log_record.Log_record_type == recovery.NEWPAGE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Prev_page_id)
} else if log_record.Log_record_type == recovery.RESERVE_SPACE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Reserving_rid)
pos += uint32(unsafe.Sizeof(log_record.Reserving_rid))
log_record.Reserving_tuple.DeserializeFrom(data[pos:])
}

//fmt.Println(log_record)
Expand Down Expand Up @@ -159,7 +155,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool)
if page_.GetLSN() < log_record.GetLSN() {
// UpdateTuple overwrites Old_tuple argument
// but it is no problem because log_record is read from log file again in Undo phase
page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager)
page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, false)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -178,14 +174,6 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool)
new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, txn)
//log_recovery.buffer_pool_manager.FlushPage(page_id)
log_recovery.buffer_pool_manager.UnpinPage(page_id, true)
} else if log_record.Log_record_type == recovery.RESERVE_SPACE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
page_.ReserveSpaceForRollbackUpdate(&log_record.Reserving_rid, log_record.Reserving_tuple.Size(), txn, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
}
buffer_offset += log_record.Size
}
Expand Down Expand Up @@ -243,18 +231,12 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool {
} else if log_record.Log_record_type == recovery.UPDATE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId()))
_, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager)
_, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true)
if err != nil {
panic(fmt.Sprintln("UpdateTuple at rollback failed! err:", err))
}
log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.RESERVE_SPACE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Reserving_rid.GetPageId()))
page_.ApplyDelete(&log_record.Reserving_rid, txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(log_record.Reserving_rid.GetPageId(), true)
isUndoOccured = true
}

lsn = log_record.Prev_lsn
Expand Down
14 changes: 7 additions & 7 deletions lib/recovery/recovery_test/log_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func TestRedo(t *testing.T) {
val1_1 := tuple1_.GetValue(schema_, 1)
val1_0 := tuple1_.GetValue(schema_, 0)

rid, _ = test_table.InsertTuple(tuple_, txn, math.MaxUint32)
rid, _ = test_table.InsertTuple(tuple_, txn, math.MaxUint32, false)
// TODO: (SDB) insert index entry if needed
testingpkg.Assert(t, rid != nil, "")
rid1, _ = test_table.InsertTuple(tuple1_, txn, math.MaxUint32)
rid1, _ = test_table.InsertTuple(tuple1_, txn, math.MaxUint32, false)
// TODO: (SDB) insert index entry if needed
testingpkg.Assert(t, rid != nil, "")

Expand Down Expand Up @@ -172,15 +172,15 @@ func TestUndo(t *testing.T) {
val1_1 := tuple1.GetValue(schema_, 1)
var rid1 *page.RID
fmt.Println("tuple1: ", tuple1.Data())
rid1, _ = test_table.InsertTuple(tuple1, txn, math.MaxUint32)
rid1, _ = test_table.InsertTuple(tuple1, txn, math.MaxUint32, false)
testingpkg.Assert(t, rid1 != nil, "")

tuple2 := ConstructTuple(schema_)
val2_0 := tuple2.GetValue(schema_, 0)
val2_1 := tuple2.GetValue(schema_, 1)

var rid2 *page.RID
rid2, _ = test_table.InsertTuple(tuple2, txn, math.MaxUint32)
rid2, _ = test_table.InsertTuple(tuple2, txn, math.MaxUint32, false)
testingpkg.Assert(t, rid2 != nil, "")

bf_commit_tuple2, _ := test_table.GetTuple(rid2, txn)
Expand All @@ -197,7 +197,7 @@ func TestUndo(t *testing.T) {
fmt.Println("bf_commit_tuple2_: ", bf_commit_tuple2__.Data())

// tuple deletion (rid1)
test_table.MarkDelete(rid1, math.MaxUint32, txn)
test_table.MarkDelete(rid1, math.MaxUint32, txn, false)

bf_commit_tuple2___, _ := test_table.GetTuple(rid2, txn)
fmt.Println("bf_commit_tuple2_: ", bf_commit_tuple2___.Data())
Expand All @@ -215,7 +215,7 @@ func TestUndo(t *testing.T) {
// tuple insertion (rid3)
tuple3 := ConstructTuple(schema_)
var rid3 *page.RID
rid3, _ = test_table.InsertTuple(tuple3, txn, math.MaxUint32)
rid3, _ = test_table.InsertTuple(tuple3, txn, math.MaxUint32, false)
// TODO: (SDB) insert index entry if needed
testingpkg.Assert(t, rid3 != nil, "")

Expand Down Expand Up @@ -345,7 +345,7 @@ func TestCheckpoint(t *testing.T) {
// insert a ton of tuples
txn1 := samehada_instance.GetTransactionManager().Begin(nil)
for i := 0; i < 1000; i++ {
rid, err := test_table.InsertTuple(tuple_, txn1, math.MaxUint32)
rid, err := test_table.InsertTuple(tuple_, txn1, math.MaxUint32, false)
if err != nil {
fmt.Println(err)
}
Expand Down
14 changes: 7 additions & 7 deletions lib/storage/access/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func isContainTxnID(list []types.TxnID, txnID types.TxnID) bool {
/**
* Acquire a lock on RID in shared mode. See [LOCK_NOTE] in header file.
* @param txn the transaction requesting the shared lock
* @param rid the RID to be locked in shared mode
* @param rid1 the RID to be locked in shared mode
* @return true if the lock is granted, false otherwise
*/
func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) bool {
//fmt.Printf("called LockShared, %v\n", rid)
//fmt.Printf("called LockShared, %v\n", rid1)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

Expand Down Expand Up @@ -170,11 +170,11 @@ func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) boo
/**
* Acquire a lock on RID in exclusive mode. See [LOCK_NOTE] in header file.
* @param txn the transaction requesting the exclusive lock
* @param rid the RID to be locked in exclusive mode
* @param rid1 the RID to be locked in exclusive mode
* @return true if the lock is granted, false otherwise
*/
func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID) bool {
//fmt.Printf("called LockExclusive, %v\n", rid)
//fmt.Printf("called LockExclusive, %v\n", rid1)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

Expand Down Expand Up @@ -207,11 +207,11 @@ func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID)
/**
* Upgrade a lock from a shared lock to an exclusive access.
* @param txn the transaction requesting the lock upgrade
* @param rid the RID that should already be locked in shared mode by the requesting transaction
* @param rid1 the RID that should already be locked in shared mode by the requesting transaction
* @return true if the upgrade is successful, false otherwise
*/
func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bool {
//fmt.Printf("called LockUpgrade %v\n", rid)
//fmt.Printf("called LockUpgrade %v\n", rid1)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

Expand Down Expand Up @@ -249,7 +249,7 @@ func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bo
/**
* Release the lock held by the access.
* @param txn the transaction releasing the lock, it should actually hold the lock
* @param rid the RID that is locked by the transaction
* @param rid1 the RID that is locked by the transaction
* @return true if the unlock is successful, false otherwise
*/
func (lock_manager *LockManager) Unlock(txn *Transaction, rid_list []page.RID) bool {
Expand Down
Loading
Loading