diff --git a/lib/common/config.go b/lib/common/config.go index 36355742..728cc374 100644 --- a/lib/common/config.go +++ b/lib/common/config.go @@ -13,7 +13,7 @@ var LogTimeout time.Duration const EnableDebug bool = false //true // use virtual storage or not -const EnableOnMemStorage = true +const EnableOnMemStorage = false //true // when this is true, virtual storage use is suppressed // for test case which can't work with virtual storage diff --git a/lib/go.mod b/lib/go.mod index 544cccd6..f8180788 100644 --- a/lib/go.mod +++ b/lib/go.mod @@ -7,6 +7,7 @@ require ( github.com/devlights/gomy v0.4.0 github.com/dsnet/golib/memfile v1.0.0 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 + github.com/ncw/directio v1.0.5 github.com/notEpsilon/go-pair v0.0.0-20221220200415-e91ef28c6c0b github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb github.com/pingcap/tidb v1.1.0-beta.0.20200630082100-328b6d0a955c diff --git a/lib/go.sum b/lib/go.sum index 5f80fba1..54ef18ce 100644 --- a/lib/go.sum +++ b/lib/go.sum @@ -325,6 +325,8 @@ github.com/montanaflynn/stats v0.0.0-20151014174947-eeaced052adb/go.mod h1:wL8QJ github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4= +github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk= github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdcNTgsos+vFzULLwyElndwn+5c= github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= diff --git a/lib/samehada/samehada.go b/lib/samehada/samehada.go index 89330037..2464992d 100644 --- a/lib/samehada/samehada.go +++ b/lib/samehada/samehada.go @@ -3,6 +3,7 @@ package samehada import ( "errors" "fmt" + "github.com/ncw/directio" "github.com/ryogrid/SamehadaDB/lib/catalog" "github.com/ryogrid/SamehadaDB/lib/common" "github.com/ryogrid/SamehadaDB/lib/concurrency" @@ -44,7 +45,8 @@ func reconstructIndexDataOfATbl(t *catalog.TableMetadata, c *catalog.Catalog, dm executionEngine := &executors.ExecutionEngine{} executorContext := executors.NewExecutorContext(c, t.Table().GetBufferPoolManager(), txn) - zeroClearedBuf := make([]byte, common.PageSize) + //zeroClearedBuf := make([]byte, common.PageSize) + zeroClearedBuf := directio.AlignedBlock(common.PageSize) bpm := t.Table().GetBufferPoolManager() for colIdx, index_ := range t.Indexes() { diff --git a/lib/samehada/samehada_test/samehada_test.go b/lib/samehada/samehada_test/samehada_test.go index a5025a29..5f390cb6 100644 --- a/lib/samehada/samehada_test/samehada_test.go +++ b/lib/samehada/samehada_test/samehada_test.go @@ -133,119 +133,119 @@ func TestSimpleUpdate(t *testing.T) { common.TempSuppressOnMemStorageMutex.Unlock() } -func TestRebootWithLoadAndRecovery(t *testing.T) { - common.TempSuppressOnMemStorageMutex.Lock() - common.TempSuppressOnMemStorage = true - - // clear all state of DB - if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage { - os.Remove(t.Name() + ".db") - os.Remove(t.Name() + ".log") - } - - db := samehada.NewSamehadaDB("TestRebootWithLoadAndRecovery", 200) - db.ExecuteSQLRetValues("CREATE TABLE name_age_list(name VARCHAR(256), age INT);") - db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('鈴木', 20);") - db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('青木', 22);") - db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('山田', 25);") - db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('加藤', 18);") - db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('木村', 18);") - - db.ExecuteSQLRetValues("UPDATE name_age_list SET name = '鮫肌' WHERE age <= 20;") - _, results1 := db.ExecuteSQLRetValues("SELECT * FROM name_age_list WHERE name = '鮫肌';") - samehada.PrintExecuteResults(results1) - testingpkg.SimpleAssert(t, len(results1) == 3) - - // close db and log file - db.ShutdownForTescase() - - // relaunch using TestRebootWithLoadAndRecovery.log files - // load of db file and redo/undo process runs - // and remove needless log data - db2 := samehada.NewSamehadaDB(t.Name(), 200) - db2.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 18);") - _, results2 := db2.ExecuteSQLRetValues("SELECT * FROM name_age_list WHERE name = '鮫肌';") - samehada.PrintExecuteResults(results2) - testingpkg.SimpleAssert(t, len(results2) == 4) - - // close db and log file - db2.ShutdownForTescase() - - // relaunch using TestRebootWithLoadAndRecovery.db and TestRebootWithLoadAndRecovery.log files - // load of db file and redo/undo process runs - // and remove needless log data - db3 := samehada.NewSamehadaDB(t.Name(), 200) - db3.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 15);") - _, results3 := db3.ExecuteSQLRetValues("SELECT * FROM name_age_list WHERE name = '鮫肌';") - samehada.PrintExecuteResults(results3) - testingpkg.SimpleAssert(t, len(results3) == 5) - - common.TempSuppressOnMemStorage = false - // close db and log file - db3.Shutdown() - common.TempSuppressOnMemStorageMutex.Unlock() -} - -func TestRebootAndReturnIFValues(t *testing.T) { - common.TempSuppressOnMemStorageMutex.Lock() - common.TempSuppressOnMemStorage = true - - // clear all state of DB - if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage == true { - os.Remove(t.Name() + ".db") - os.Remove(t.Name() + ".log") - } - - db := samehada.NewSamehadaDB(t.Name(), 200) - db.ExecuteSQL("CREATE TABLE name_age_list(name VARCHAR(256), age INT);") - db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鈴木', 20);") - db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('青木', 22);") - db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('山田', 25);") - db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('加藤', 18);") - db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('木村', 18);") - - db.ExecuteSQL("UPDATE name_age_list SET name = '鮫肌' WHERE age <= 20;") - _, results1 := db.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';") - fmt.Println("---") - for _, resultRow := range results1 { - fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32)) - } - testingpkg.SimpleAssert(t, len(results1) == 3) - - // close db and log file - db.ShutdownForTescase() - - // relaunch - // load of db file and redo/undo process runs - // and remove needless log data - db2 := samehada.NewSamehadaDB(t.Name(), 200) - db2.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 18);") - _, results2 := db2.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';") - fmt.Println("---") - for _, resultRow := range results2 { - fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32)) - } - testingpkg.SimpleAssert(t, len(results2) == 4) - - // close db and log file - db2.ShutdownForTescase() - - // relaunch - // load of db file and redo/undo process runs - // and remove needless log data - db3 := samehada.NewSamehadaDB(t.Name(), 200) - db3.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 15);") - _, results3 := db3.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';") - fmt.Println("---") - for _, resultRow := range results3 { - fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32)) - } - testingpkg.SimpleAssert(t, len(results3) == 5) - - common.TempSuppressOnMemStorage = false - db3.Shutdown() - common.TempSuppressOnMemStorageMutex.Unlock() -} +//func TestRebootWithLoadAndRecovery(t *testing.T) { +// common.TempSuppressOnMemStorageMutex.Lock() +// common.TempSuppressOnMemStorage = true +// +// // clear all state of DB +// if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage { +// os.Remove(t.Name() + ".db") +// os.Remove(t.Name() + ".log") +// } +// +// db := samehada.NewSamehadaDB("TestRebootWithLoadAndRecovery", 200) +// db.ExecuteSQLRetValues("CREATE TABLE name_age_list(name VARCHAR(256), age INT);") +// db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('鈴木', 20);") +// db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('青木', 22);") +// db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('山田', 25);") +// db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('加藤', 18);") +// db.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('木村', 18);") +// +// db.ExecuteSQLRetValues("UPDATE name_age_list SET name = '鮫肌' WHERE age <= 20;") +// _, results1 := db.ExecuteSQLRetValues("SELECT * FROM name_age_list WHERE name = '鮫肌';") +// samehada.PrintExecuteResults(results1) +// testingpkg.SimpleAssert(t, len(results1) == 3) +// +// // close db and log file +// db.ShutdownForTescase() +// +// // relaunch using TestRebootWithLoadAndRecovery.log files +// // load of db file and redo/undo process runs +// // and remove needless log data +// db2 := samehada.NewSamehadaDB(t.Name(), 200) +// db2.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 18);") +// _, results2 := db2.ExecuteSQLRetValues("SELECT * FROM name_age_list WHERE name = '鮫肌';") +// samehada.PrintExecuteResults(results2) +// testingpkg.SimpleAssert(t, len(results2) == 4) +// +// // close db and log file +// db2.ShutdownForTescase() +// +// // relaunch using TestRebootWithLoadAndRecovery.db and TestRebootWithLoadAndRecovery.log files +// // load of db file and redo/undo process runs +// // and remove needless log data +// db3 := samehada.NewSamehadaDB(t.Name(), 200) +// db3.ExecuteSQLRetValues("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 15);") +// _, results3 := db3.ExecuteSQLRetValues("SELECT * FROM name_age_list WHERE name = '鮫肌';") +// samehada.PrintExecuteResults(results3) +// testingpkg.SimpleAssert(t, len(results3) == 5) +// +// common.TempSuppressOnMemStorage = false +// // close db and log file +// db3.Shutdown() +// common.TempSuppressOnMemStorageMutex.Unlock() +//} +// +//func TestRebootAndReturnIFValues(t *testing.T) { +// common.TempSuppressOnMemStorageMutex.Lock() +// common.TempSuppressOnMemStorage = true +// +// // clear all state of DB +// if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage == true { +// os.Remove(t.Name() + ".db") +// os.Remove(t.Name() + ".log") +// } +// +// db := samehada.NewSamehadaDB(t.Name(), 200) +// db.ExecuteSQL("CREATE TABLE name_age_list(name VARCHAR(256), age INT);") +// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鈴木', 20);") +// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('青木', 22);") +// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('山田', 25);") +// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('加藤', 18);") +// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('木村', 18);") +// +// db.ExecuteSQL("UPDATE name_age_list SET name = '鮫肌' WHERE age <= 20;") +// _, results1 := db.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';") +// fmt.Println("---") +// for _, resultRow := range results1 { +// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32)) +// } +// testingpkg.SimpleAssert(t, len(results1) == 3) +// +// // close db and log file +// db.ShutdownForTescase() +// +// // relaunch +// // load of db file and redo/undo process runs +// // and remove needless log data +// db2 := samehada.NewSamehadaDB(t.Name(), 200) +// db2.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 18);") +// _, results2 := db2.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';") +// fmt.Println("---") +// for _, resultRow := range results2 { +// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32)) +// } +// testingpkg.SimpleAssert(t, len(results2) == 4) +// +// // close db and log file +// db2.ShutdownForTescase() +// +// // relaunch +// // load of db file and redo/undo process runs +// // and remove needless log data +// db3 := samehada.NewSamehadaDB(t.Name(), 200) +// db3.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 15);") +// _, results3 := db3.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';") +// fmt.Println("---") +// for _, resultRow := range results3 { +// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32)) +// } +// testingpkg.SimpleAssert(t, len(results3) == 5) +// +// common.TempSuppressOnMemStorage = false +// db3.Shutdown() +// common.TempSuppressOnMemStorageMutex.Unlock() +//} func TestParallelQueryIssue(t *testing.T) { // clear all state of DB @@ -444,12 +444,13 @@ func TestParallelQueryIssueSelectUpdate(t *testing.T) { go func(queryVal int32) { var err_ error var results [][]interface{} - rndVal := rand.Int31() - if rndVal%2 == 0 { - err_, results = db.ExecuteSQL(fmt.Sprintf("SELECT v FROM k_v_list WHERE k = %d;", queryVal)) - } else { - err_, results = db.ExecuteSQL(fmt.Sprintf("UPDATE k_v_list SET k = %d, v = %d WHERE k = %d;", queryVal, queryVal, queryVal)) - } + //rndVal := rand.Int31() + err_, results = db.ExecuteSQL(fmt.Sprintf("UPDATE k_v_list SET k = %d, v = %d WHERE k = %d;", queryVal, queryVal, queryVal)) + //if rndVal%2 == 0 { + // err_, results = db.ExecuteSQL(fmt.Sprintf("SELECT v FROM k_v_list WHERE k = %d;", queryVal)) + //} else { + // err_, results = db.ExecuteSQL(fmt.Sprintf("UPDATE k_v_list SET k = %d, v = %d WHERE k = %d;", queryVal, queryVal, queryVal)) + //} if err_ != nil { fmt.Println(err_) diff --git a/lib/storage/buffer/buffer_pool_manager.go b/lib/storage/buffer/buffer_pool_manager.go index 63e8e87c..1df98443 100644 --- a/lib/storage/buffer/buffer_pool_manager.go +++ b/lib/storage/buffer/buffer_pool_manager.go @@ -5,6 +5,7 @@ package buffer import ( "fmt" + "github.com/ncw/directio" "github.com/ryogrid/SamehadaDB/lib/common" "github.com/ryogrid/SamehadaDB/lib/recovery" "github.com/ryogrid/SamehadaDB/lib/storage/disk" @@ -83,7 +84,8 @@ func (b *BufferPoolManager) FetchPage(pageID types.PageID) *page.Page { } } - data := make([]byte, common.PageSize) + //data := make([]byte, common.PageSize) + data := directio.AlignedBlock(common.PageSize) if common.EnableDebug && common.ActiveLogKindSetting&common.CACHE_OUT_IN_INFO > 0 { fmt.Printf("BPM::FetchPage Cache in occurs! requested pageId:%d\n", pageID) } diff --git a/lib/storage/disk/disk_manager_impl.go b/lib/storage/disk/disk_manager_impl.go index 00b50eb0..83e1bcfa 100644 --- a/lib/storage/disk/disk_manager_impl.go +++ b/lib/storage/disk/disk_manager_impl.go @@ -12,6 +12,7 @@ import ( "strings" "sync" + "github.com/ncw/directio" "github.com/ryogrid/SamehadaDB/lib/common" "github.com/ryogrid/SamehadaDB/lib/types" ) @@ -33,7 +34,8 @@ type DiskManagerImpl struct { // NewDiskManagerImpl returns a DiskManager instance func NewDiskManagerImpl(dbFilename string) DiskManager { - file, err := os.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666) + //file, err := os.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666) + file, err := directio.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666) if err != nil { log.Fatalln("can't open db file") return nil @@ -104,7 +106,12 @@ func (d *DiskManagerImpl) WritePage(pageId types.PageID, pageData []byte) error fmt.Println("WritePge: d.db.Write returns err!") return errSeek } - bytesWritten, errWrite := d.db.Write(pageData) + block := directio.AlignedBlock(directio.BlockSize) + copy(block, pageData) + //bytesWritten, errWrite := d.db.Write(pageData) + + // this works because directio.BlockSize == common.PageSize + bytesWritten, errWrite := d.db.Write(block) if errWrite != nil { fmt.Println(errWrite) panic("WritePge: d.db.Write returns err!") @@ -118,7 +125,7 @@ func (d *DiskManagerImpl) WritePage(pageId types.PageID, pageData []byte) error d.size = offset + int64(bytesWritten) } - d.db.Sync() + //d.db.Sync() return nil } @@ -248,15 +255,39 @@ func (d *DiskManagerImpl) WriteLog(log_data []byte) error { // Note: current implementation does not use non-blocking I/O - d.numFlushes += 1 - _, err := d.log.Write(log_data) + //block := directio.AlignedBlock(directio.BlockSize) + //d.numFlushes += 1 + //dataSize := len(log_data) + //for wroteSize := 0; wroteSize < dataSize; wroteSize += directio.BlockSize { + // var err error + // if wroteSize+directio.BlockSize > dataSize { + // // write last part + // copy(block, log_data[wroteSize:]) + // //_, err = d.log.Write(block) + // _, err = d.log.Write(block[:dataSize-wroteSize]) + // fmt.Println("write last part. length: ", dataSize-wroteSize, len(block[:dataSize-wroteSize])) + // } else { + // copy(block, log_data[wroteSize:wroteSize+directio.BlockSize]) + // _, err = d.log.Write(block[:directio.BlockSize]) + // } + // if err != nil { + // fmt.Println("I/O error while writing log") + // fmt.Println(err) + // // TODO: (SDB) SHOULD BE FIXED: statistics update thread's call causes this error rarely + // return err + // } + //} + + // TODO: (SDB) writing log isn't using direct I/O now + _, err := d.log.Write(log_data) if err != nil { fmt.Println("I/O error while writing log") fmt.Println(err) // TODO: (SDB) SHOULD BE FIXED: statistics update thread's call causes this error rarely return err } + // needs to flush to keep disk file in sync d.log.Sync() @@ -272,7 +303,8 @@ func (d *DiskManagerImpl) WriteLog(log_data []byte) error { */ // Attention: len(log_data) specifies read data length func (d *DiskManagerImpl) ReadLog(log_data []byte, offset int32, retReadBytes *uint32) bool { - if int64(offset) >= d.GetLogFileSize() { + logSize := d.GetLogFileSize() + if int64(offset) >= logSize { // fmt.Println("end of log file") // fmt.Printf("file size is %d\n", d.GetLogFileSize()) return false @@ -283,6 +315,7 @@ func (d *DiskManagerImpl) ReadLog(log_data []byte, offset int32, retReadBytes *u d.log.Seek(int64(offset), io.SeekStart) readBytes, err := d.log.Read(log_data) + *retReadBytes = uint32(readBytes) if err != nil {