Skip to content

Commit

Permalink
refactoring write-recording/rollback and rogging/recovery of Update: …
Browse files Browse the repository at this point in the history
…WIP (5).
  • Loading branch information
ryogrid committed Jun 11, 2024
1 parent 012d19c commit 2e8c144
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 68 deletions.
1 change: 1 addition & 0 deletions lib/recovery/log_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (log_manager *LogManager) AppendLogRecord(log_record *LogRecord) types.LSN
pageIdInBytes := buf.Bytes()
copy(log_manager.log_buffer[pos:], pageIdInBytes)
}
// TODO: (SDB) need to implement serialization of RESERVE_SPACE type log

log_manager.latch.WUnlock()
return log_record.Lsn
Expand Down
23 changes: 22 additions & 1 deletion lib/recovery/log_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
APPLYDELETE
ROLLBACKDELETE
UPDATE
FINALIZE_UPDATE
RESERVE_SPACE
BEGIN
COMMIT
ABORT
Expand Down Expand Up @@ -53,6 +53,10 @@ const (
*--------------------------
* | HEADER | prev_page_id |
*--------------------------
* For reserve space type log record
*--------------------------
* | HEADER | tuple_rid |
*--------------------------
*/

type LogRecord struct {
Expand All @@ -79,6 +83,10 @@ type LogRecord struct {

// case4: for new page opeartion
Prev_page_id types.PageID //INVALID_PAGE_ID

// case5: for reserve space
Reserving_rid page.RID
Reserving_tuple tuple.Tuple
}

// constructor for Transaction type(BEGIN/COMMIT/ABORT)
Expand Down Expand Up @@ -137,6 +145,19 @@ func NewLogRecordNewPage(txn_id types.TxnID, prev_lsn types.LSN, log_record_type
return ret
}

func NewLogRecordReserveSpace(txn_id types.TxnID, prev_lsn types.LSN, log_record_type LogRecordType, rid page.RID, tuple *tuple.Tuple) *LogRecord {
ret := new(LogRecord)
ret.Size = HEADER_SIZE
ret.Txn_id = txn_id
ret.Prev_lsn = prev_lsn
ret.Log_record_type = log_record_type
ret.Reserving_rid = rid
ret.Reserving_tuple = *tuple
// calculate log record size
ret.Size = HEADER_SIZE + uint32(unsafe.Sizeof(rid)) + tuple.Size() + uint32(unsafe.Sizeof(int32(0)))
return ret
}

func (log_record *LogRecord) GetDeleteRID() page.RID { return log_record.Delete_rid }
func (log_record *LogRecord) GetInserteTuple() tuple.Tuple { return log_record.Insert_tuple }
func (log_record *LogRecord) GetInsertRID() page.RID { return log_record.Insert_rid }
Expand Down
7 changes: 1 addition & 6 deletions lib/recovery/log_recovery/log_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,10 @@ func (log_recovery *LogRecovery) DeserializeLogRecord(data []byte, log_record *r
log_record.Old_tuple.DeserializeFrom(data[pos:])
pos += log_record.Old_tuple.Size() + uint32(tuple.TupleSizeOffsetInLogrecord)
log_record.New_tuple.DeserializeFrom(data[pos:])
} else if log_record.Log_record_type == recovery.FINALIZE_UPDATE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Update_rid)
pos += uint32(unsafe.Sizeof(log_record.Update_rid))
log_record.Old_tuple.DeserializeFrom(data[pos:])
pos += log_record.Old_tuple.Size() + uint32(tuple.TupleSizeOffsetInLogrecord)
log_record.New_tuple.DeserializeFrom(data[pos:])
} else if log_record.Log_record_type == recovery.NEWPAGE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Prev_page_id)
}
// TODO: (SDB) need to implement deserialization of RESERVE_SPACE type log

//fmt.Println(log_record)

Expand Down
131 changes: 70 additions & 61 deletions lib/storage/access/table_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,87 +238,96 @@ func (tp *TablePage) UpdateTuple(new_tuple *tuple.Tuple, update_col_idxs []int,
txn.SetPrevLSN(lsn)
}

if new_tuple.Size() <= tuple_size {
// add dummy tuple which reserves space for update is aborted
tp.ReserveSpaceForRollbackUpdate(tuple_size-new_tuple.Size(), txn, log_manager)
}

// Perform the update.
free_space_pointer := tp.GetFreeSpacePointer()
common.SH_Assert(tuple_offset >= free_space_pointer, "Offset should appear after current free space position.")

if isForRollbackOrUndo || update_tuple.Size() >= old_tuple.Size() {
// occupy correct memory space for new_tuple
// and move other tuples data
copy(tp.GetData()[free_space_pointer+tuple_size-update_tuple.Size():], tp.GetData()[free_space_pointer:tuple_offset])
tp.SetFreeSpacePointer(free_space_pointer + tuple_size - update_tuple.Size())
copy(tp.GetData()[tuple_offset+tuple_size-update_tuple.Size():], update_tuple.Data()[:update_tuple.Size()])
tp.SetTupleSize(slot_num, update_tuple.Size())
tp.SetTupleOffsetAtSlot(rid.GetSlotNum(), tuple_offset+(tuple_size-update_tuple.Size()))
} else {
// occupy the same memory space as the old_tuple until transaction finish

// no move of other tuples and no update of free space pointer here (done at commit or redo)
copy(tp.GetData()[free_space_pointer+tuple_size-update_tuple.Size():], tp.GetData()[free_space_pointer:tuple_offset])
tp.SetFreeSpacePointer(free_space_pointer + tuple_size - update_tuple.Size())
copy(tp.GetData()[tuple_offset+tuple_size-update_tuple.Size():], update_tuple.Data()[:update_tuple.Size()])
tp.SetTupleSize(slot_num, update_tuple.Size())
tp.SetTupleOffsetAtSlot(rid.GetSlotNum(), tuple_offset+(tuple_size-update_tuple.Size()))

copy(tp.GetData()[tuple_offset:], update_tuple.Data()[:update_tuple.Size()])
tp.SetTupleSize(slot_num, update_tuple.Size())
}

if isForRollbackOrUndo || update_tuple.Size() > old_tuple.Size() {
// Update all tuples offsets.
tuple_cnt := int(tp.GetTupleCount())
for ii := 0; ii < tuple_cnt; ii++ {
tuple_offset_i := tp.GetTupleOffsetAtSlot(uint32(ii))
if tp.GetTupleSize(uint32(ii)) > 0 && tuple_offset_i < tuple_offset+tuple_size {
tp.SetTupleOffsetAtSlot(uint32(ii), tuple_offset_i+tuple_size-update_tuple.Size())
}
// Update all tuples offsets.
tuple_cnt := int(tp.GetTupleCount())
for ii := 0; ii < tuple_cnt; ii++ {
tuple_offset_i := tp.GetTupleOffsetAtSlot(uint32(ii))
if tp.GetTupleSize(uint32(ii)) > 0 && tuple_offset_i < tuple_offset+tuple_size {
tp.SetTupleOffsetAtSlot(uint32(ii), tuple_offset_i+tuple_size-update_tuple.Size())
}
}
// when other condition update, tuples offset updating is not done here (done at commit or redo)

return true, nil, update_tuple
}

// called only at commit or redo
func (tp *TablePage) FinalizeUpdateTuple(rid *page.RID, old_tuple *tuple.Tuple, update_tuple *tuple.Tuple, txn *Transaction, log_manager *recovery.LogManager) {
// needless
if update_tuple.Size() >= old_tuple.Size() {
// finalize is not needed
return
}
func (tp *TablePage) ReserveSpaceForRollbackUpdate(size uint32, txn *Transaction, log_manager *recovery.LogManager) *page.RID {
maxSlotNum := tp.GetTupleCount()
buf := make([]byte, size)

// set dummy tuple for rollback
dummy_rid := &page.RID{tp.GetPageId(), maxSlotNum + 1}
dummy_tuple := tuple.NewTuple(dummy_rid, size, buf[:size])
tp.setTuple(maxSlotNum+1, dummy_tuple)

if log_manager.IsEnabledLogging() {
log_record := recovery.NewLogRecordUpdate(txn.GetTransactionId(), txn.GetPrevLSN(), recovery.FINALIZE_UPDATE, *rid, *old_tuple, *update_tuple)
log_record := recovery.NewLogRecordReserveSpace(txn.GetTransactionId(), txn.GetPrevLSN(), recovery.RESERVE_SPACE, *dummy_rid, dummy_tuple)
lsn := log_manager.AppendLogRecord(log_record)
tp.SetLSN(lsn)
txn.SetPrevLSN(lsn)
}

slot_size := tp.GetTupleSize(rid.GetSlotNum())
slot_offset := tp.GetTupleOffsetAtSlot(rid.GetSlotNum())
tuple_size := update_tuple.Size()
// If the tuple is deleted, do nothing
if IsDeleted(tuple_size) {
return
}

// update slot_offset to collect location
slide_size := slot_size - tuple_size
new_slot_offset := slot_offset + slide_size
tp.SetTupleSize(rid.GetSlotNum(), tuple_size)
tp.SetTupleOffsetAtSlot(rid.GetSlotNum(), new_slot_offset)

free_space_pointer := tp.GetFreeSpacePointer()
tp.SetFreeSpacePointer(free_space_pointer + slide_size)

// move tuple data to collect location
copy(tp.GetData()[new_slot_offset:], update_tuple.Data())

// Update all tuples offsets
tuple_cnt := int(tp.GetTupleCount())
for ii := 0; ii < tuple_cnt; ii++ {
tuple_offset_i := tp.GetTupleOffsetAtSlot(uint32(ii))
if tp.GetTupleSize(uint32(ii)) > 0 && tuple_offset_i < new_slot_offset {
tp.SetTupleOffsetAtSlot(uint32(ii), tuple_offset_i+slide_size)
}
}
}

//// called only at commit or redo
//func (tp *TablePage) FinalizeUpdateTuple(rid *page.RID, old_tuple *tuple.Tuple, update_tuple *tuple.Tuple, txn *Transaction, log_manager *recovery.LogManager) {
// // needless
// if update_tuple.Size() >= old_tuple.Size() {
// // finalize is not needed
// return
// }
//
// if log_manager.IsEnabledLogging() {
// log_record := recovery.NewLogRecordUpdate(txn.GetTransactionId(), txn.GetPrevLSN(), recovery.FINALIZE_UPDATE, *rid, *old_tuple, *update_tuple)
// lsn := log_manager.AppendLogRecord(log_record)
// tp.SetLSN(lsn)
// txn.SetPrevLSN(lsn)
// }
//
// slot_size := tp.GetTupleSize(rid.GetSlotNum())
// slot_offset := tp.GetTupleOffsetAtSlot(rid.GetSlotNum())
// tuple_size := update_tuple.Size()
// // If the tuple is deleted, do nothing
// if IsDeleted(tuple_size) {
// return
// }
//
// // update slot_offset to collect location
// slide_size := slot_size - tuple_size
// new_slot_offset := slot_offset + slide_size
// tp.SetTupleSize(rid.GetSlotNum(), tuple_size)
// tp.SetTupleOffsetAtSlot(rid.GetSlotNum(), new_slot_offset)
//
// free_space_pointer := tp.GetFreeSpacePointer()
// tp.SetFreeSpacePointer(free_space_pointer + slide_size)
//
// // move tuple data to collect location
// copy(tp.GetData()[new_slot_offset:], update_tuple.Data())
//
// // Update all tuples offsets
// tuple_cnt := int(tp.GetTupleCount())
// for ii := 0; ii < tuple_cnt; ii++ {
// tuple_offset_i := tp.GetTupleOffsetAtSlot(uint32(ii))
// if tp.GetTupleSize(uint32(ii)) > 0 && tuple_offset_i < new_slot_offset {
// tp.SetTupleOffsetAtSlot(uint32(ii), tuple_offset_i+slide_size)
// }
// }
//}

func (tp *TablePage) MarkDelete(rid *page.RID, txn *Transaction, lock_manager *LockManager, log_manager *recovery.LogManager) (isMarked bool, markedTuple *tuple.Tuple) {
if common.EnableDebug {
defer func() {
Expand Down

0 comments on commit 2e8c144

Please sign in to comment.