Skip to content

Commit

Permalink
drop table: addresses issue tursodatabase#894 which requires DROP TAB…
Browse files Browse the repository at this point in the history
…LE to be implemented

this is the initial commit is for the implementation of DROP TABLE. It adds support for the DROP TABLE instruction and adds a DropBTree instruction. It also implements the btree_drop method in btree.rs which makes use of free_page method which will be implemented via PR tursodatabase#785
  • Loading branch information
redixhumayun committed Feb 15, 2025
1 parent f220f9a commit bd5e658
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 1 deletion.
5 changes: 5 additions & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,10 @@ impl Connection {
pub(crate) fn run_cmd(self: &Rc<Connection>, cmd: Cmd) -> Result<Option<Statement>> {
let db = self.db.clone();
let syms: &SymbolTable = &db.syms.borrow();
println!("the command {:?}", cmd);
match cmd {
Cmd::Stmt(stmt) => {
println!("AST: {:?}", stmt);
let program = Rc::new(translate::translate(
&self.schema.borrow(),
stmt,
Expand All @@ -327,6 +329,7 @@ impl Connection {
syms,
QueryMode::Normal,
)?);
println!("the generated program {:?}", program);
let stmt = Statement::new(program, self.pager.clone());
Ok(Some(stmt))
}
Expand Down Expand Up @@ -367,11 +370,13 @@ impl Connection {
}

pub fn execute(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<()> {
println!("running execute");
let sql = sql.as_ref();
let db = &self.db;
let syms: &SymbolTable = &db.syms.borrow();
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
println!("the command {:?}", cmd);
if let Some(cmd) = cmd {
match cmd {
Cmd::Explain(stmt) => {
Expand Down
106 changes: 106 additions & 0 deletions core/storage/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{LimboError, Result};
use std::cell::{Ref, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;

use super::pager::PageRef;
use super::sqlite3_ondisk::{
Expand Down Expand Up @@ -2234,6 +2235,111 @@ impl BTreeCursor {

Ok(Some(n_overflow))
}

pub fn btree_drop(&mut self) -> Result<CursorResult<()>> {
self.move_to_root();

loop {
let page = self.stack.top();
return_if_locked!(page);

if !page.is_loaded() {
self.pager.load_page(Arc::clone(&page))?;
return Ok(CursorResult::IO);
}

let contents = page.get().contents.as_ref().unwrap();
// TOOD: Uncomment after Krishvishal's PR https://github.com/tursodatabase/limbo/pull/785 merged
// let current_page_id = page.get().id;

if !contents.is_leaf() {
let mut has_unprocessed_children = false;

// Process all the regular cells first
for cell_idx in 0..contents.cell_count() {
let cell = contents.cell_get(
cell_idx,
Rc::clone(&self.pager),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
if let BTreeCell::TableInteriorCell(interior) = cell {
let child_page =
self.pager.read_page(interior._left_child_page as usize)?;
self.stack.push(child_page);
has_unprocessed_children = true;
break;
}
}

if !has_unprocessed_children {
if let Some(rightmost) = contents.rightmost_pointer() {
let rightmost_page = self.pager.read_page(rightmost as usize)?;
self.stack.push(rightmost_page);
continue;
}
}

if has_unprocessed_children {
continue;
}
} else {
for cell_idx in 0..contents.cell_count() {
let cell = contents.cell_get(
cell_idx,
Rc::clone(&self.pager),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
if let BTreeCell::TableLeafCell(TableLeafCell {
_rowid,
_payload,
first_overflow_page: Some(overflow_page_id),
}) = cell
{
let mut current_overflow_id = overflow_page_id;
loop {
let overflow_page =
self.pager.read_page(current_overflow_id as usize)?;
return_if_locked!(overflow_page);

if !overflow_page.is_loaded() {
self.pager.load_page(Arc::clone(&overflow_page))?;
return Ok(CursorResult::IO);
}

let overflow_contents = overflow_page.get().contents.as_ref().unwrap();
let next_overflow_id = u32::from_be_bytes(
overflow_contents.as_ptr()[..4].try_into().unwrap(),
);

// TODO: Uncomment after Krishvishal's PR https://github.com/tursodatabase/limbo/pull/785 is merged
// self.pager
// .free_page(Some(overflow_page), current_overflow_id as usize)?;

if next_overflow_id == 0 {
break;
}
current_overflow_id = next_overflow_id;
}
}
}
}

// All children & overflow pages have been processed
// TODO: Uncomment after Krishvishal's PR https://github.com/tursodatabase/limbo/pull/785 is merged
// self.pager.free_page(Some(page), current_page_id)?;

if self.stack.has_parent() {
self.stack.pop();
} else {
break;
}
}
Ok(CursorResult::Ok(()))
}
}

impl PageStack {
Expand Down
226 changes: 225 additions & 1 deletion core/translate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ pub fn translate(
}
ast::Stmt::Detach(_) => bail_parse_error!("DETACH not supported yet"),
ast::Stmt::DropIndex { .. } => bail_parse_error!("DROP INDEX not supported yet"),
ast::Stmt::DropTable { .. } => bail_parse_error!("DROP TABLE not supported yet"),
// ast::Stmt::DropTable { .. } => bail_parse_error!("DROP TABLE not supported yet"),
ast::Stmt::DropTable {
if_exists,
tbl_name,
} => {
translate_drop_table(&mut program, tbl_name, if_exists, schema)?;
}
ast::Stmt::DropTrigger { .. } => bail_parse_error!("DROP TRIGGER not supported yet"),
ast::Stmt::DropView { .. } => bail_parse_error!("DROP VIEW not supported yet"),
ast::Stmt::Pragma(name, body) => pragma::translate_pragma(
Expand Down Expand Up @@ -515,6 +521,224 @@ fn translate_create_table(
Ok(program)
}

fn translate_drop_table(
program: &mut ProgramBuilder,
tbl_name: ast::QualifiedName,
if_exists: bool,
schema: &Schema,
) -> Result<()> {
let table = schema.get_table(tbl_name.name.0.as_str());
if table.is_none() {
if if_exists {
let init_label = program.emit_init();
let start_offset = program.offset();
program.emit_halt();
program.resolve_label(init_label, program.offset());
program.emit_transaction(true);
program.emit_constant_insns();
program.emit_goto(start_offset);

return Ok(());
}
bail_parse_error!("No such table: {}", tbl_name.name.0.as_str());
}
let table = table.unwrap(); // safe to do since we have a check before this

let init_label = program.emit_init();
let start_offset = program.offset();

// 1. Drop the table BTree
program.emit_insn(Insn::DropBtree {
db: 0,
root: table.root_page,
});

// TODO: Drop indexes?

// 2. Delete table metadata from sqlite_schema
let table_name = "sqlite_schema";
let table = schema.get_table(&table_name).unwrap();
let sqlite_schema_cursor_id = program.alloc_cursor_id(
Some(table_name.to_string()),
CursorType::BTreeTable(table.clone()),
);
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: sqlite_schema_cursor_id,
root_page: 1,
});
program.emit_insn(Insn::OpenWriteAwait {});

// Rewind to the very beginning of the cursor
program.emit_insn(Insn::RewindAsync {
cursor_id: sqlite_schema_cursor_id,
});
let end_metadata_label = program.allocate_label();
program.emit_insn(Insn::RewindAwait {
cursor_id: sqlite_schema_cursor_id,
pc_if_empty: end_metadata_label,
});
let metadata_loop = program.allocate_label();
program.resolve_label(metadata_loop, program.offset());

// Load row details
let tbl_name_reg = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: sqlite_schema_cursor_id,
column: 2,
dest: tbl_name_reg,
});
let string_reg = program.emit_string8_new_reg(tbl_name.name.0.clone());
let next_label = program.allocate_label();
program.emit_insn(Insn::Ne {
lhs: tbl_name_reg,
rhs: string_reg,
target_pc: next_label,
flags: CmpInsFlags::default(),
});

// Delete matching row
program.emit_insn(Insn::DeleteAsync {
cursor_id: sqlite_schema_cursor_id,
});
program.emit_insn(Insn::DeleteAwait {
cursor_id: sqlite_schema_cursor_id,
});

// Move to next row
program.resolve_label(next_label, program.offset());
program.emit_insn(Insn::NextAsync {
cursor_id: sqlite_schema_cursor_id,
});
program.emit_insn(Insn::NextAwait {
cursor_id: sqlite_schema_cursor_id,
pc_if_next: metadata_loop,
});
program.resolve_label(end_metadata_label, program.offset());

// Update schema
let parse_schema_where_clause = format!("tbl_name = {}", tbl_name.name.0);
program.emit_insn(Insn::ParseSchema {
db: 0,
where_clause: parse_schema_where_clause,
});

program.emit_halt();
program.resolve_label(init_label, program.offset());
program.emit_transaction(true);
program.emit_goto(start_offset);

Ok(program)
}

fn translate_drop_table(
program: &mut ProgramBuilder,
tbl_name: ast::QualifiedName,
if_exists: bool,
schema: &Schema,
) -> Result<()> {
let table = schema.get_table(tbl_name.name.0.as_str());
if table.is_none() {
if if_exists {
let init_label = program.emit_init();
let start_offset = program.offset();
program.emit_halt();
program.resolve_label(init_label, program.offset());
program.emit_transaction(true);
program.emit_constant_insns();
program.emit_goto(start_offset);

return Ok(());
}
bail_parse_error!("No such table: {}", tbl_name.name.0.as_str());
}
let table = table.unwrap(); // safe to do since we have a check before this

let init_label = program.emit_init();
let start_offset = program.offset();

// 1. Drop the table BTree
program.emit_insn(Insn::DropBtree {
db: 0,
root: table.root_page,
});

// TODO: Drop indexes?

// 2. Delete table metadata from sqlite_schema
let table_name = "sqlite_schema";
let table = schema.get_table(&table_name).unwrap();
let sqlite_schema_cursor_id = program.alloc_cursor_id(
Some(table_name.to_string()),
CursorType::BTreeTable(table.clone()),
);
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: sqlite_schema_cursor_id,
root_page: 1,
});
program.emit_insn(Insn::OpenWriteAwait {});

// Rewind to the very beginning of the cursor
program.emit_insn(Insn::RewindAsync {
cursor_id: sqlite_schema_cursor_id,
});
let end_metadata_label = program.allocate_label();
program.emit_insn(Insn::RewindAwait {
cursor_id: sqlite_schema_cursor_id,
pc_if_empty: end_metadata_label,
});
let metadata_loop = program.allocate_label();
program.resolve_label(metadata_loop, program.offset());

// Load row details
let tbl_name_reg = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: sqlite_schema_cursor_id,
column: 2,
dest: tbl_name_reg,
});
let string_reg = program.emit_string8_new_reg(tbl_name.name.0.clone());
let next_label = program.allocate_label();
program.emit_insn(Insn::Ne {
lhs: tbl_name_reg,
rhs: string_reg,
target_pc: next_label,
flags: CmpInsFlags::default(),
});

// Delete matching row
program.emit_insn(Insn::DeleteAsync {
cursor_id: sqlite_schema_cursor_id,
});
program.emit_insn(Insn::DeleteAwait {
cursor_id: sqlite_schema_cursor_id,
});

// Move to next row
program.resolve_label(next_label, program.offset());
program.emit_insn(Insn::NextAsync {
cursor_id: sqlite_schema_cursor_id,
});
program.emit_insn(Insn::NextAwait {
cursor_id: sqlite_schema_cursor_id,
pc_if_next: metadata_loop,
});
program.resolve_label(end_metadata_label, program.offset());

// Update schema
let parse_schema_where_clause = format!("tbl_name = {}", tbl_name.name.0);
program.emit_insn(Insn::ParseSchema {
db: 0,
where_clause: parse_schema_where_clause,
});

program.emit_halt();
program.resolve_label(init_label, program.offset());
program.emit_transaction(true);
program.emit_goto(start_offset);

Ok(())
}

enum PrimaryKeyDefinitionType<'a> {
Simple {
typename: Option<&'a str>,
Expand Down
Loading

0 comments on commit bd5e658

Please sign in to comment.