Skip to content

Commit

Permalink
refactoring write-recording/rollback and rogging/recovery of Update: …
Browse files Browse the repository at this point in the history
…WIP.
  • Loading branch information
ryogrid committed Jun 9, 2024
1 parent d18d33c commit 774a849
Show file tree
Hide file tree
Showing 15 changed files with 95 additions and 213 deletions.
4 changes: 2 additions & 2 deletions lib/catalog/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Catalog) insertTable(tableMetadata *TableMetadata, txn *access.Transact
first_tuple := tuple.NewTupleFromSchema(row, TableCatalogSchema())

// insert entry to TableCatalogPage (PageId = 0)
c.tableHeap.InsertTuple(first_tuple, false, txn, tableMetadata.OID())
c.tableHeap.InsertTuple(first_tuple, txn, tableMetadata.OID())
for _, column_ := range tableMetadata.schema.GetColumns() {
row := make([]types.Value, 0)
row = append(row, types.NewInteger(int32(tableMetadata.oid)))
Expand All @@ -214,7 +214,7 @@ func (c *Catalog) insertTable(tableMetadata *TableMetadata, txn *access.Transact
new_tuple := tuple.NewTupleFromSchema(row, ColumnsCatalogSchema())

// insert entry to ColumnsCatalogPage (PageId = 1)
c.tableIds[ColumnsCatalogOID].Table().InsertTuple(new_tuple, false, txn, ColumnsCatalogOID)
c.tableIds[ColumnsCatalogOID].Table().InsertTuple(new_tuple, txn, ColumnsCatalogOID)
}
// flush a page having table definitions
c.bpm.FlushPage(TableCatalogPageId)
Expand Down
4 changes: 2 additions & 2 deletions lib/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ var LogTimeout time.Duration

const EnableDebug bool = false //true

// use virtual storage or not
const EnableOnMemStorage = false //true
// use on memory virtual storage or not
const EnableOnMemStorage = true

// when this is true, virtual storage use is suppressed
// for test case which can't work with virtual storage
Expand Down
2 changes: 1 addition & 1 deletion lib/execution/executors/delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (e *DeleteExecutor) Next() (*tuple.Tuple, Done, error) {

rid := t.GetRID()
tableMetadata := e.child.GetTableMetaData()
is_marked := tableMetadata.Table().MarkDelete(rid, tableMetadata.OID(), false, e.txn)
is_marked := tableMetadata.Table().MarkDelete(rid, tableMetadata.OID(), e.txn)
if !is_marked {
err := errors.New("marking tuple deleted failed. PageId:SlotNum = " + string(rid.GetPageId()) + ":" + fmt.Sprint(rid.GetSlotNum()))
e.txn.SetState(access.ABORTED)
Expand Down
2 changes: 1 addition & 1 deletion lib/execution/executors/insert_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (e *InsertExecutor) Next() (*tuple.Tuple, Done, error) {
for _, values := range e.plan.GetRawValues() {
tuple_ := tuple.NewTupleFromSchema(values, e.tableMetadata.Schema())
tableHeap := e.tableMetadata.Table()
rid, err := tableHeap.InsertTuple(tuple_, false, e.context.txn, e.tableMetadata.OID())
rid, err := tableHeap.InsertTuple(tuple_, e.context.txn, e.tableMetadata.OID())
if err != nil {
return nil, true, err
}
Expand Down
2 changes: 1 addition & 1 deletion lib/planner/optimizer/optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func SetupTableWithMetadata(exec_ctx *executors.ExecutorContext, tableMeta *Setu
vals = append(vals, types.NewValue(genFunc(ii)))
}
tuple_ := tuple.NewTupleFromSchema(vals, schema_)
rid, _ := tm.Table().InsertTuple(tuple_, false, txn, tm.OID())
rid, _ := tm.Table().InsertTuple(tuple_, txn, tm.OID())
for jj, colMeta := range tableMeta.Columns {
if colMeta.IdxKind != index_constants.INDEX_KIND_INVALID {
tm.GetIndex(jj).InsertEntry(tuple_, *rid, txn)
Expand Down
2 changes: 0 additions & 2 deletions lib/recovery/log_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ func NewLogRecordInsertDelete(txn_id types.TxnID, prev_lsn types.LSN, log_record
ret.Insert_rid = rid
ret.Insert_tuple = *tuple
} else {
// assert(log_record_type == LogRecordType::APPLYDELETE || log_record_type == LogRecordType::MARKDELETE ||
// log_record_type == LogRecordType::ROLLBACKDELETE)
ret.Delete_rid = rid
ret.Delete_tuple = *tuple
}
Expand Down
85 changes: 16 additions & 69 deletions lib/recovery/log_recovery/log_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package log_recovery
import (
"bytes"
"encoding/binary"
"github.com/ryogrid/SamehadaDB/lib/storage/page"
"fmt"
"unsafe"

"github.com/ryogrid/SamehadaDB/lib/common"
Expand Down Expand Up @@ -197,23 +197,6 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool {
var log_record recovery.LogRecord
isUndoOccured := false

// table for RID conversion
// when rid of record is changed at UpdateTuple on this method, conversion is needed for appropriate rollback.
RIDConvMap := make(map[page.RID]*page.RID, 0)
convRID := func(orgRID *page.RID) (convedRID *page.RID) {
if tmpRID, ok := RIDConvMap[*orgRID]; ok {
//fmt.Println("Abort: RID conversion occured.")
return tmpRID
} else {
return orgRID
}
}
updateRIDConvMap := func(orgRID *page.RID, changedRID *page.RID) {
RIDConvMap[*orgRID] = changedRID
}

// fmt.Println(log_recovery.active_txn)
// fmt.Println(log_recovery.lsn_mapping)
for _, lsn := range log_recovery.active_txn {
for lsn != common.InvalidLSN {
//fmt.Printf("lsn at Undo loop top: %d\n", lsn)
Expand All @@ -224,74 +207,38 @@ func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool {
log_recovery.DeserializeLogRecord(log_recovery.log_buffer[:readBytes], &log_record)
if log_record.Log_record_type == recovery.INSERT {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Insert_rid).GetPageId()))
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Insert_rid.GetPageId()))
// fmt.Printf("insert log type, page lsn:%d, log lsn:%d", page.GetLSN(), log_record.GetLSN())
page_.ApplyDelete(&log_record.Insert_rid, txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Insert_rid).GetPageId(), true)
log_recovery.buffer_pool_manager.UnpinPage(log_record.Insert_rid.GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.APPLYDELETE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Delete_rid).GetPageId()))
log_record.Delete_tuple.SetRID(convRID(&log_record.Delete_rid))
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
log_record.Delete_tuple.SetRID(&log_record.Delete_rid)
page_.InsertTuple(&log_record.Delete_tuple, log_recovery.log_manager, nil, txn)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Delete_rid).GetPageId(), true)
log_recovery.buffer_pool_manager.UnpinPage(log_record.Delete_rid.GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.MARKDELETE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Delete_rid).GetPageId()))
page_.RollbackDelete(convRID(&log_record.Delete_rid), txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Delete_rid).GetPageId(), true)
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
page_.RollbackDelete(&log_record.Delete_rid, txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(log_record.Delete_rid.GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.ROLLBACKDELETE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Delete_rid).GetPageId()))
page_.MarkDelete(convRID(&log_record.Delete_rid), txn, nil, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Delete_rid).GetPageId(), true)
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
page_.MarkDelete(&log_record.Delete_rid, txn, nil, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(log_record.Delete_rid.GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.UPDATE {
var org_update_rid page.RID = *convRID(&log_record.Update_rid)
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(org_update_rid.GetPageId()))
is_updated, err, need_follow_tuple := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, convRID(&log_record.Update_rid), txn, nil, log_recovery.log_manager)

if is_updated == false && err == access.ErrNotEnoughSpace {
// when rid is changed case (data is move to new page)

// first, delete original data
page_.ApplyDelete(&org_update_rid, txn, log_recovery.log_manager)

var new_rid *page.RID
var err2 error
// second, insert updated tuple to other page
for {
new_rid, err2 = page_.InsertTuple(need_follow_tuple, log_recovery.log_manager, nil, txn)
if err2 == nil || err2 == access.ErrEmptyTuple {
break
}

// traverse pages until find needed capacity having page or create new page
nextPageId := page_.GetNextPageId()
if nextPageId.IsValid() {
nextPage := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(nextPageId))
log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true)
page_ = nextPage
} else {
p := log_recovery.buffer_pool_manager.NewPage()
newPage := access.CastPageAsTablePage(p)
page_.SetNextPageId(p.GetPageId())
currentPageId := page_.GetPageId()
newPage.Init(p.GetPageId(), currentPageId, log_recovery.log_manager, nil, txn)
log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true)
page_ = newPage
}
}

if new_rid != nil {
updateRIDConvMap(&org_update_rid, new_rid)
}
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Update_rid.GetPageId()))
_, err, _ := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager)
if err != nil {
panic(fmt.Sprintln("UpdateTuple at rollback failed! err:", err))
}
log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true)

isUndoOccured = true
}
lsn = log_record.Prev_lsn
Expand Down
14 changes: 7 additions & 7 deletions lib/recovery/recovery_test/log_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func TestRedo(t *testing.T) {
val1_1 := tuple1_.GetValue(schema_, 1)
val1_0 := tuple1_.GetValue(schema_, 0)

rid, _ = test_table.InsertTuple(tuple_, false, txn, math.MaxUint32)
rid, _ = test_table.InsertTuple(tuple_, txn, math.MaxUint32)
// TODO: (SDB) insert index entry if needed
testingpkg.Assert(t, rid != nil, "")
rid1, _ = test_table.InsertTuple(tuple1_, false, txn, math.MaxUint32)
rid1, _ = test_table.InsertTuple(tuple1_, txn, math.MaxUint32)
// TODO: (SDB) insert index entry if needed
testingpkg.Assert(t, rid != nil, "")

Expand Down Expand Up @@ -171,15 +171,15 @@ func TestUndo(t *testing.T) {
val1_0 := tuple1.GetValue(schema_, 0)
val1_1 := tuple1.GetValue(schema_, 1)
var rid1 *page.RID
rid1, _ = test_table.InsertTuple(tuple1, false, txn, math.MaxUint32)
rid1, _ = test_table.InsertTuple(tuple1, txn, math.MaxUint32)
testingpkg.Assert(t, rid1 != nil, "")

tuple2 := ConstructTuple(schema_)
val2_0 := tuple2.GetValue(schema_, 0)
val2_1 := tuple2.GetValue(schema_, 1)

var rid2 *page.RID
rid2, _ = test_table.InsertTuple(tuple2, false, txn, math.MaxUint32)
rid2, _ = test_table.InsertTuple(tuple2, txn, math.MaxUint32)
testingpkg.Assert(t, rid2 != nil, "")

fmt.Println("Log page content is written to disk")
Expand All @@ -190,7 +190,7 @@ func TestUndo(t *testing.T) {
txn = samehada_instance.GetTransactionManager().Begin(nil)

// tuple deletion (rid1)
test_table.MarkDelete(rid1, math.MaxUint32, false, txn)
test_table.MarkDelete(rid1, math.MaxUint32, txn)

// tuple updating (rid2)
row1 := make([]types.Value, 0)
Expand All @@ -201,7 +201,7 @@ func TestUndo(t *testing.T) {
// tuple insertion (rid3)
tuple3 := ConstructTuple(schema_)
var rid3 *page.RID
rid3, _ = test_table.InsertTuple(tuple3, false, txn, math.MaxUint32)
rid3, _ = test_table.InsertTuple(tuple3, txn, math.MaxUint32)
// TODO: (SDB) insert index entry if needed
testingpkg.Assert(t, rid3 != nil, "")

Expand Down Expand Up @@ -326,7 +326,7 @@ func TestCheckpoint(t *testing.T) {
// insert a ton of tuples
txn1 := samehada_instance.GetTransactionManager().Begin(nil)
for i := 0; i < 1000; i++ {
rid, err := test_table.InsertTuple(tuple_, false, txn1, math.MaxUint32)
rid, err := test_table.InsertTuple(tuple_, txn1, math.MaxUint32)
if err != nil {
fmt.Println(err)
}
Expand Down
14 changes: 7 additions & 7 deletions lib/storage/access/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func isContainTxnID(list []types.TxnID, txnID types.TxnID) bool {
/**
* Acquire a lock on RID in shared mode. See [LOCK_NOTE] in header file.
* @param txn the transaction requesting the shared lock
* @param rid1 the RID to be locked in shared mode
* @param rid the RID to be locked in shared mode
* @return true if the lock is granted, false otherwise
*/
func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) bool {
//fmt.Printf("called LockShared, %v\n", rid1)
//fmt.Printf("called LockShared, %v\n", rid)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

Expand Down Expand Up @@ -170,11 +170,11 @@ func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) boo
/**
* Acquire a lock on RID in exclusive mode. See [LOCK_NOTE] in header file.
* @param txn the transaction requesting the exclusive lock
* @param rid1 the RID to be locked in exclusive mode
* @param rid the RID to be locked in exclusive mode
* @return true if the lock is granted, false otherwise
*/
func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID) bool {
//fmt.Printf("called LockExclusive, %v\n", rid1)
//fmt.Printf("called LockExclusive, %v\n", rid)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

Expand Down Expand Up @@ -207,11 +207,11 @@ func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID)
/**
* Upgrade a lock from a shared lock to an exclusive access.
* @param txn the transaction requesting the lock upgrade
* @param rid1 the RID that should already be locked in shared mode by the requesting transaction
* @param rid the RID that should already be locked in shared mode by the requesting transaction
* @return true if the upgrade is successful, false otherwise
*/
func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bool {
//fmt.Printf("called LockUpgrade %v\n", rid1)
//fmt.Printf("called LockUpgrade %v\n", rid)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

Expand Down Expand Up @@ -249,7 +249,7 @@ func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bo
/**
* Release the lock held by the access.
* @param txn the transaction releasing the lock, it should actually hold the lock
* @param rid1 the RID that is locked by the transaction
* @param rid the RID that is locked by the transaction
* @return true if the unlock is successful, false otherwise
*/
func (lock_manager *LockManager) Unlock(txn *Transaction, rid_list []page.RID) bool {
Expand Down
Loading

0 comments on commit 774a849

Please sign in to comment.