Skip to content

Commit

Permalink
refactoring write-recording/rollback and rogging/recovery of Update: …
Browse files Browse the repository at this point in the history
…WIP (6).
  • Loading branch information
ryogrid committed Jun 11, 2024
1 parent 2e8c144 commit c31efba
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 28 deletions.
28 changes: 16 additions & 12 deletions lib/recovery/log_recovery/log_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool)
if page_.GetLSN() < log_record.GetLSN() {
// UpdateTuple overwrites Old_tuple argument
// but it is no problem because log_record is read from log file again in Undo phase
page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, false)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
log_recovery.buffer_pool_manager.UnpinPage(log_record.Update_rid.GetPageId(), true)
} else if log_record.Log_record_type == recovery.FINALIZE_UPDATE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
page_.FinalizeUpdateTuple(&log_record.Update_rid, &log_record.Old_tuple, &log_record.New_tuple, txn, log_recovery.log_manager)
page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -184,6 +175,14 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool)
new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, txn)
//log_recovery.buffer_pool_manager.FlushPage(page_id)
log_recovery.buffer_pool_manager.UnpinPage(page_id, true)
} else if log_record.Log_record_type == recovery.RESERVE_SPACE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
page_.ReserveSpaceForRollbackUpdate(&log_record.Reserving_rid, log_record.Reserving_tuple.Size(), txn, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
}
buffer_offset += log_record.Size
}
Expand Down Expand Up @@ -241,14 +240,19 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool {
} else if log_record.Log_record_type == recovery.UPDATE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId()))
_, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager, true)
_, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager)
if err != nil {
panic(fmt.Sprintln("UpdateTuple at rollback failed! err:", err))
}
log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.RESERVE_SPACE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Reserving_rid.GetPageId()))
page_.ApplyDelete(&log_record.Reserving_rid, txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(log_record.Delete_rid.GetPageId(), true)
isUndoOccured = true
}
// FINALIZE_UPDATE is not considered because it is not necessary to undo

lsn = log_record.Prev_lsn
// fmt.Printf("lsn at Undo loop bottom: %d\n", lsn)
Expand Down
2 changes: 1 addition & 1 deletion lib/storage/access/table_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (t *TableHeap) UpdateTuple(tuple_ *tuple.Tuple, update_col_idxs []int, sche

page_.WLatch()
page_.AddWLatchRecord(int32(txn.txn_id))
is_updated, err, need_follow_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager, isRollback)
is_updated, err, need_follow_tuple := page_.UpdateTuple(tuple_, update_col_idxs, schema_, old_tuple, &rid, txn, t.lock_manager, t.log_manager)
t.bpm.UnpinPage(page_.GetPageId(), is_updated)
if common.EnableDebug && common.ActiveLogKindSetting&common.PIN_COUNT_ASSERT > 0 {
common.SH_Assert(page_.PinCount() == 0, "PinCount is not zero when finish TablePage::UpdateTuple!!!")
Expand Down
49 changes: 42 additions & 7 deletions lib/storage/access/table_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/ryogrid/SamehadaDB/lib/types"
)

// static constexpr uint64_t DELETE_MASK = (1U << (8 * sizeof(uint32_t) - 1));
const deleteMask = uint32(1 << ((8 * 4) - 1))
const reservedMask = uint32(1 << ((8 * 4) - 2))

const sizeTablePageHeader = uint32(24)
const sizeTuple = uint32(8)
Expand Down Expand Up @@ -148,7 +148,7 @@ func (tp *TablePage) InsertTuple(tuple *tuple.Tuple, log_manager *recovery.LogMa
// for ensureing existance of enough space at rollback on abort or undo
// return Tuple pointer when updated tuple1 need to be moved new page location and it should be inserted after old data deleted, otherwise returned nil
func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int, schema_ *schema.Schema, old_tuple *tuple.Tuple, rid *page.RID, txn *Transaction,
lock_manager *LockManager, log_manager *recovery.LogManager, isForRollbackOrUndo bool) (bool, error, *tuple.Tuple) {
lock_manager *LockManager, log_manager *recovery.LogManager) (bool, error, *tuple.Tuple) {
if common.EnableDebug {
defer func() {
if common.ActiveLogKindSetting&common.DEBUGGING > 0 {
Expand Down Expand Up @@ -194,6 +194,10 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int,
}
return false, nil, nil
}
if IsReserved(tuple_size) {
// ignore dummy tuple
return false, nil, nil
}

// Copy out the old value.
tuple_offset := tp.GetTupleOffsetAtSlot(slot_num)
Expand Down Expand Up @@ -240,7 +244,7 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int,

if new_tuple.Size() <= tuple_size {
// add dummy tuple which reserves space for update is aborted
tp.ReserveSpaceForRollbackUpdate(tuple_size-new_tuple.Size(), txn, log_manager)
tp.ReserveSpaceForRollbackUpdate(nil, tuple_size-new_tuple.Size(), txn, log_manager)
}

// Perform the update.
Expand All @@ -261,26 +265,37 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int,
tp.SetTupleOffsetAtSlot(uint32(ii), tuple_offset_i+tuple_size-update_tuple.Size())
}
}
// when other condition update, tuples offset updating is not done here (done at commit or redo)

return true, nil, update_tuple
}

func (tp *TablePage) ReserveSpaceForRollbackUpdate(size uint32, txn *Transaction, log_manager *recovery.LogManager) *page.RID {
// rid is not null when caller is Redo
func (tp *TablePage) ReserveSpaceForRollbackUpdate(rid *page.RID, size uint32, txn *Transaction, log_manager *recovery.LogManager) *page.RID {
maxSlotNum := tp.GetTupleCount()
buf := make([]byte, size)

// set dummy tuple for rollback
dummy_rid := &page.RID{tp.GetPageId(), maxSlotNum + 1}
var dummy_rid *page.RID
if rid != nil {
dummy_rid = rid
} else {
dummy_rid = &page.RID{tp.GetPageId(), maxSlotNum + 1}
}
dummy_tuple := tuple.NewTuple(dummy_rid, size, buf[:size])
tp.setTuple(maxSlotNum+1, dummy_tuple)
tp.setTuple(dummy_rid.GetSlotNum(), dummy_tuple)

if size > 0 {
tp.SetTupleSize(dummy_rid.GetSlotNum(), SetDeletedFlag(size))
}

if log_manager.IsEnabledLogging() {
log_record := recovery.NewLogRecordReserveSpace(txn.GetTransactionId(), txn.GetPrevLSN(), recovery.RESERVE_SPACE, *dummy_rid, dummy_tuple)
lsn := log_manager.AppendLogRecord(log_record)
tp.SetLSN(lsn)
txn.SetPrevLSN(lsn)
}

return dummy_rid
}

//// called only at commit or redo
Expand Down Expand Up @@ -422,6 +437,9 @@ func (tp *TablePage) ApplyDelete(rid *page.RID, txn *Transaction, log_manager *r
if IsDeleted(tuple_size) {
tuple_size = UnsetDeletedFlag(tuple_size)
}
if IsReserved(tuple_size) {
tuple_size = UnsetReservedFlag(tuple_size)
}
// Otherwise we are rolling back an insert.

if tuple_size <= 0 {
Expand Down Expand Up @@ -691,6 +709,11 @@ func (tp *TablePage) GetTuple(rid *page.RID, log_manager *recovery.LogManager, l
}
}

if IsReserved(tupleSize) {
// handle as same as deleted tuple
return tuple.NewTuple(rid, 0, make([]byte, 0)), ErrSelfDeletedCase
}

tupleData := make([]byte, tupleSize)
copy(tupleData, tp.Data()[tupleOffset:])

Expand Down Expand Up @@ -744,3 +767,15 @@ func SetDeletedFlag(tuple_size uint32) uint32 {
func UnsetDeletedFlag(tuple_size uint32) uint32 {
return tuple_size & (^uint32(deleteMask))
}

func IsReserved(tuple_size uint32) bool {
return tuple_size&uint32(reservedMask) == uint32(reservedMask) || tuple_size == 0
}

func SetReservedFlag(tuple_size uint32) uint32 {
return tuple_size | uint32(reservedMask)
}

func UnsetReservedFlag(tuple_size uint32) uint32 {
return tuple_size & (^uint32(reservedMask))
}
1 change: 1 addition & 0 deletions lib/storage/access/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
INSERT WType = iota
DELETE
UPDATE
RESERVE_SPACE
)

/**
Expand Down
39 changes: 31 additions & 8 deletions lib/storage/access/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ func (transaction_manager *TransactionManager) Commit(catalog_ catalog_interface
table.bpm.UnpinPage(tpage.GetPageId(), true)
tpage.RemoveWLatchRecord(int32(txn.txn_id))
tpage.WUnlatch()
} else if item.wtype == UPDATE {
} else if item.wtype == RESERVE_SPACE {
if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 {
fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, rid)
}
pageID := rid.GetPageId()
tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID))
tpage.WLatch()
tpage.AddWLatchRecord(int32(txn.txn_id))
// move updated tuple to collect position on the page
tpage.FinalizeUpdateTuple(rid, item.tuple1, item.tuple2, txn, transaction_manager.log_manager)
// remove dummy tuple which reserves space for update rollback
tpage.ApplyDelete(item.rid, txn, transaction_manager.log_manager)
table.bpm.UnpinPage(tpage.GetPageId(), true)
tpage.RemoveWLatchRecord(int32(txn.txn_id))
tpage.WUnlatch()
Expand Down Expand Up @@ -201,11 +201,21 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface.
fmt.Printf("TransactionManager::Abort handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid:%v tuple1.Size()=%d \n", txn.txn_id, txn.dbgInfo, item.rid, item.tuple1.Size())
}

var is_updated = false
is_updated, _, _, _, _ = table.UpdateTuple(item.tuple1, nil, nil, item.oid, *item.rid, txn, true)
if !is_updated {
panic("UpdateTuple at rollback failed!")
}
// rollback record data
rid := item.rid
// Note that this also releases the lock when holding the page latch.
pageID := rid.GetPageId()
tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID))
tpage.WLatch()
tpage.UpdateTuple(item.tuple1, nil, nil, item.tuple2, rid, txn, transaction_manager.lock_manager, transaction_manager.log_manager)
table.bpm.UnpinPage(pageID, true)
tpage.WUnlatch()

//var is_updated = false
//is_updated, _, _, _, _ = table.UpdateTuple(item.tuple1, nil, nil, item.oid, *item.rid, txn, true)
//if !is_updated {
// panic("UpdateTuple at rollback failed!")
//}

// rollback is not needed at update
// (RID chaned case is handled at DELETE and INSERT write log handling)
Expand All @@ -226,6 +236,19 @@ func (transaction_manager *TransactionManager) Abort(catalog_ catalog_interface.
}
}
}
} else if item.wtype == RESERVE_SPACE {
if common.EnableDebug && common.ActiveLogKindSetting&common.COMMIT_ABORT_HANDLE_INFO > 0 {
fmt.Printf("TransactionManager::Commit handle UPDATE write log. txn.txn_id:%v dbgInfo:%s rid:%v\n", txn.txn_id, txn.dbgInfo, item.rid)
}
pageID := item.rid.GetPageId()
tpage := CastPageAsTablePage(table.bpm.FetchPage(pageID))
tpage.WLatch()
tpage.AddWLatchRecord(int32(txn.txn_id))
// remove dummy tuple which reserves space for update rollback
tpage.ApplyDelete(item.rid, txn, transaction_manager.log_manager)
table.bpm.UnpinPage(tpage.GetPageId(), true)
tpage.RemoveWLatchRecord(int32(txn.txn_id))
tpage.WUnlatch()
}
write_set = write_set[:len(write_set)-1]
}
Expand Down

0 comments on commit c31efba

Please sign in to comment.