Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLite: Cache prepared statements behind sql.exec(). #2970

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/workerd/api/sql-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ async function test(state) {
assert.equal(resultNumberRaw[0].length, 1);
assert.equal(resultNumberRaw[0][0], 123);

sql.exec('SELECT 123');
sql.exec('SELECT 123');
sql.exec('SELECT 123');

// Test string results
const resultStr = [...sql.exec("SELECT 'hello'")];
assert.equal(resultStr.length, 1);
Expand Down Expand Up @@ -294,14 +298,12 @@ async function test(state) {
assert.equal(resultPrepared.length, 1);
assert.equal(resultPrepared[0]['789'], 789);

// Running the same query twice invalidates the previous cursor.
// Running the same query twice, overlapping, works just fine.
let result1 = prepared();
let result2 = prepared();
// Iterate result2 before result1.
assert.equal([...result2][0]['789'], 789);
assert.throws(
() => [...result1],
'SQL cursor was closed because the same statement was executed again.'
);
assert.equal([...result1][0]['789'], 789);

// That said if a cursor was already done before the statement was re-run, it's not considered
// canceled.
Expand Down Expand Up @@ -331,9 +333,7 @@ async function test(state) {
}

// Prepared statement with multiple statements
assert.throws(() => {
sql.prepare('SELECT 1; SELECT 2;');
}, /A prepared SQL statement must contain only one statement./);
assert.deepEqual([...sql.prepare('SELECT 1; SELECT 2;')()], [{ 2: 2 }]);

// Accessing a hidden _cf_ table
assert.throws(
Expand Down
111 changes: 68 additions & 43 deletions src/workerd/api/sql.c++
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,60 @@

namespace workerd::api {

SqlStorage::SqlStorage(jsg::Ref<DurableObjectStorage> storage): storage(kj::mv(storage)) {}
// Maximum total size of all cached statements (measured in size of the SQL code). If cached
// statements exceed this, we remove the LRU statement(s).
//
// Hopefully most apps don't ever hit this, but it's important to have a limit in case of
// queries containing dynamic content or excessively large one-off queries.
static constexpr uint SQL_STATEMENT_CACHE_MAX_SIZE = 1024 * 1024;

SqlStorage::SqlStorage(jsg::Ref<DurableObjectStorage> storage)
: storage(kj::mv(storage)),
statementCache(IoContext::current().addObject(kj::heap<StatementCache>())) {}

SqlStorage::~SqlStorage() {}

jsg::Ref<SqlStorage::Cursor> SqlStorage::exec(
jsg::Lock& js, kj::String querySql, jsg::Arguments<BindingValue> bindings) {
SqliteDatabase::Regulator& regulator = *this;
return jsg::alloc<Cursor>(getDb(js), regulator, querySql, kj::mv(bindings));
jsg::Lock& js, jsg::JsString querySql, jsg::Arguments<BindingValue> bindings) {
auto& db = getDb(js);
auto& statementCache = *this->statementCache;

kj::Rc<CachedStatement>& slot = statementCache.map.findOrCreate(querySql, [&]() {
auto result = kj::rc<CachedStatement>(js, *this, db, querySql, js.toString(querySql));
statementCache.totalSize += result->statementSize;
return result;
});

// Move cached statement to end of LRU queue.
if (slot->lruLink.isLinked()) {
statementCache.lru.remove(*slot.get());
}
statementCache.lru.add(*slot.get());

if (slot->isShared()) {
// Oops, this CachedStatement is currently in-use (presumably by a Cursor).
//
// SQLite only allows one instance of a statement to run at a time, so we will have to compile
// the statement again as a one-off.
//
// In theory we could try to cache multiple copies of the statement, but as this is probably
// exceedingly rare, it is not worth the added code complexity.
SqliteDatabase::Regulator& regulator = *this;
return jsg::alloc<Cursor>(db, regulator, js.toString(querySql), kj::mv(bindings));
}

auto result = jsg::alloc<Cursor>(slot.addRef(), kj::mv(bindings));

// If the statement cache grew too big, drop the least-recently-used entry.
while (statementCache.totalSize > SQL_STATEMENT_CACHE_MAX_SIZE) {
auto& toRemove = *statementCache.lru.begin();
auto oldQuery = jsg::JsString(toRemove.query.getHandle(js));
statementCache.totalSize -= toRemove.statementSize;
statementCache.lru.remove(toRemove);
KJ_ASSERT(statementCache.map.eraseMatch(oldQuery));
}

return result;
}

SqlStorage::IngestResult SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
Expand All @@ -27,8 +73,8 @@ SqlStorage::IngestResult SqlStorage::ingest(jsg::Lock& js, kj::String querySql)
kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount);
}

jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, kj::String query) {
return jsg::alloc<Statement>(getDb(js).prepare(*this, query));
jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, jsg::JsString query) {
return jsg::alloc<Statement>(js, JSG_THIS, query);
}

double SqlStorage::getDatabaseSize(jsg::Lock& js) {
Expand Down Expand Up @@ -60,6 +106,12 @@ bool SqlStorage::allowTransactions() const {
"write coalescing.");
}

SqlStorage::StatementCache::~StatementCache() noexcept(false) {
for (auto& entry: lru) {
lru.remove(entry);
}
}

jsg::JsValue SqlStorage::wrapSqlValue(jsg::Lock& js, SqlValue value) {
KJ_IF_SOME(v, value) {
KJ_SWITCH_ONEOF(v) {
Expand Down Expand Up @@ -99,19 +151,19 @@ jsg::JsArray SqlStorage::wrapSqlRowRaw(jsg::Lock& js, kj::Array<SqlValue> row) {
}));
}

SqlStorage::Cursor::State::State(kj::RefcountedWrapper<SqliteDatabase::Statement>& statement,
kj::Array<BindingValue> bindingsParam)
: dependency(statement.addWrappedRef()),
bindings(kj::mv(bindingsParam)),
query(statement.getWrapped().run(mapBindings(bindings).asPtr())) {}

SqlStorage::Cursor::State::State(SqliteDatabase& db,
SqliteDatabase::Regulator& regulator,
kj::StringPtr sqlCode,
kj::Array<BindingValue> bindingsParam)
: bindings(kj::mv(bindingsParam)),
query(db.run(regulator, sqlCode, mapBindings(bindings).asPtr())) {}

SqlStorage::Cursor::State::State(
kj::Rc<CachedStatement> cachedStatementParam, kj::Array<BindingValue> bindingsParam)
: bindings(kj::mv(bindingsParam)),
query(cachedStatement.emplace(kj::mv(cachedStatementParam))
->statement.run(mapBindings(bindings).asPtr())) {}

SqlStorage::Cursor::~Cursor() noexcept(false) {
// If this Cursor was created from a Statement, clear the Statement's currentCursor weak ref.
KJ_IF_SOME(s, selfRef) {
Expand All @@ -123,7 +175,7 @@ SqlStorage::Cursor::~Cursor() noexcept(false) {
}
}

void SqlStorage::Cursor::CachedColumnNames::ensureInitialized(
void SqlStorage::CachedColumnNames::ensureInitialized(
jsg::Lock& js, SqliteDatabase::Query& source) {
if (names == kj::none) {
js.withinHandleScope([&] {
Expand Down Expand Up @@ -320,10 +372,6 @@ auto SqlStorage::Cursor::iteratorImpl(jsg::Lock& js, jsg::Ref<Cursor>& obj, Func
return results.finish();
}

SqlStorage::Statement::Statement(SqliteDatabase::Statement&& statement)
: statement(IoContext::current().addObject(
kj::refcountedWrapper<SqliteDatabase::Statement>(kj::mv(statement)))) {}

kj::Array<const SqliteDatabase::Query::ValuePtr> SqlStorage::Cursor::mapBindings(
kj::ArrayPtr<BindingValue> values) {
return KJ_MAP(value, values) -> SqliteDatabase::Query::ValuePtr {
Expand All @@ -346,32 +394,9 @@ kj::Array<const SqliteDatabase::Query::ValuePtr> SqlStorage::Cursor::mapBindings
};
}

jsg::Ref<SqlStorage::Cursor> SqlStorage::Statement::run(jsg::Arguments<BindingValue> bindings) {
auto& statementRef = *statement; // validate we're in the right IoContext

KJ_IF_SOME(c, currentCursor) {
// Invalidate previous cursor if it's still running. We have to do this because SQLite only
// allows one execution of a statement at a time.
//
// If this is a problem, we could consider a scheme where we dynamically instantiate copies of
// the statement as needed. However, that risks wasting memory if the app commonly leaves
// cursors open and the GC doesn't run proactively enough.
KJ_IF_SOME(s, c.state) {
c.canceled = !s->query.isDone();
c.state = kj::none;
}
c.selfRef = kj::none;
c.statement = kj::none;
currentCursor = kj::none;
}

auto result = jsg::alloc<Cursor>(cachedColumnNames, statementRef, kj::mv(bindings));
result->statement = JSG_THIS;

result->selfRef = currentCursor;
currentCursor = *result;

return result;
jsg::Ref<SqlStorage::Cursor> SqlStorage::Statement::run(
jsg::Lock& js, jsg::Arguments<BindingValue> bindings) {
return sqlStorage->exec(js, jsg::JsString(query.getHandle(js)), kj::mv(bindings));
}

void SqlStorage::visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
Loading