Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer IDBMirrorVFS and OPFSPermutedVFS broadcasts. #197

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 18 additions & 38 deletions src/examples/IDBMirrorVFS.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,20 +437,18 @@ export class IDBMirrorVFS extends FacadeVFS {
}

// In order to write, our view of the database must be up to date.
// This is tricky because transactions are published in two ways:
// via BroadcastChannel and written to IndexedDB. We must handle
// the rare cases where a transaction is in one but not the other
// because of latency or crash.
//
// First fetch all transactions in IndexedDB equal to or greater
// than our view.
// To check this, first fetch all transactions in IndexedDB equal to
// or greater than our view.
const idbTx = this.#idb.transaction(['blocks', 'tx']);
const range = IDBKeyRange.bound(
[file.path, file.viewTx.txId],
[file.path, Infinity]);

/** @type {Transaction[]} */
const entries = await idbX(idbTx.objectStore('tx').getAll(range));

// Ideally the fetched list of transactions should contain one
// entry matching our view. If not then our view is out of date.
if (entries.length && entries.at(-1).txId > file.viewTx.txId) {
// There are newer transactions in IndexedDB that we haven't
// seen via broadcast. Ensure that they are incorporated on unlock,
Expand All @@ -469,29 +467,7 @@ export class IDBMirrorVFS extends FacadeVFS {
return VFS.SQLITE_BUSY
}

if (entries[0]?.txId !== file.viewTx.txId) {
// IndexedDB doesn't contain our current view transaction. This
// could happen if the connection that wrote the transaction
// crashed before it committed to IndexedDB. To fix this, add
// the transaction to IndexedDB ourselves.
if (file.viewTx.txId) {
console.warn(`adding missing tx ${file.viewTx.txId} to IndexedDB`);
const tx = this.#idb.transaction(['blocks', 'tx'], 'readwrite');
const txComplete = new Promise((resolve, reject) => {
tx.oncomplete = resolve;
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
});
tx.objectStore('tx').put(file.viewTx);
for (const [offset, data] of file.viewTx.blocks) {
tx.objectStore('blocks').put({ path: file.path, offset, data });
}
tx.commit();
await txComplete;
}
}
console.assert(entries[0]?.txId === file.viewTx.txId || !file.viewTx.txId);
break;
case VFS.SQLITE_LOCK_EXCLUSIVE:
await this.#lock(file, 'write');
Expand Down Expand Up @@ -713,18 +689,22 @@ export class IDBMirrorVFS extends FacadeVFS {
const txSansData = Object.assign({}, file.txActive);
txSansData.blocks = new Map(Array.from(file.txActive.blocks, ([k]) => [k, null]));
idbTx.objectStore('tx').put(txSansData);
idbTx.commit();

// Broadcast transaction once it commits.
const complete = new Promise((resolve, reject) => {
const message = file.txActive;
idbTx.oncomplete = () => {
file.broadcastChannel.postMessage(message);
resolve();
};
idbTx.onabort = () => reject(idbTx.error);
idbTx.commit();
});

if (file.synchronous === 'full') {
await new Promise((resolve, reject) => {
idbTx.oncomplete = resolve;
idbTx.onabort = () => reject(idbTx.error);
})
await complete;
}

// Broadcast the transaction.
file.broadcastChannel.postMessage(file.txActive);

file.txActive = null;
file.txWriteHint = false;
}
Expand Down
55 changes: 18 additions & 37 deletions src/examples/OPFSPermutedVFS.js
Original file line number Diff line number Diff line change
Expand Up @@ -565,18 +565,16 @@ export class OPFSPermutedVFS extends FacadeVFS {
}

// In order to write, our view of the database must be up to date.
// This is tricky because transactions are published in two ways:
// via BroadcastChannel and written to IndexedDB. We must handle
// the rare cases where a transaction is in one but not the other
// because of latency or crash.
//
// First fetch all transactions in IndexedDB equal to or greater
// than our view.
// To check this, first fetch all transactions in IndexedDB equal to
// or greater than our view.
const tx = file.idb.transaction(['pending']);
const range = IDBKeyRange.lowerBound(file.viewTx.txId);

/** @type {Transaction[]} */
const entries = await idbX(tx.objectStore('pending').getAll(range));

// Ideally the fetched list of transactions should contain one
// entry matching our view. If not then our view is out of date.
if (entries.length && entries.at(-1).txId > file.viewTx.txId) {
// There are newer transactions in IndexedDB that we haven't
// seen via broadcast. Ensure that they are incorporated on unlock,
Expand All @@ -585,27 +583,6 @@ export class OPFSPermutedVFS extends FacadeVFS {
file.locks.reserved();
return VFS.SQLITE_BUSY
}

if (entries[0]?.txId !== file.viewTx.txId) {
// IndexedDB doesn't contain our current view transaction. This
// could happen if the connection that wrote the transaction
// crashed before it committed to IndexedDB. To fix this, add
// the transaction to IndexedDB ourselves.
if (file.viewTx.txId) {
console.warn(`adding missing tx ${file.viewTx.txId} to IndexedDB`);
const tx = file.idb.transaction('pending', 'readwrite');
const txComplete = new Promise((resolve, reject) => {
tx.oncomplete = resolve;
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
});
tx.objectStore('pending').put(file.viewTx);
tx.commit();
await txComplete;
}
}
break;
case VFS.SQLITE_LOCK_EXCLUSIVE:
await this.#lock(file, 'write');
Expand Down Expand Up @@ -983,13 +960,6 @@ export class OPFSPermutedVFS extends FacadeVFS {
['pages', 'pending'],
'readwrite',
{ durability: file.synchronous === 'full' ? 'strict' : 'relaxed'});
const txComplete = new Promise((resolve, reject) => {
tx.oncomplete = resolve;
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
});

if (file.txActive.oldestTxId) {
// Ensure that all pending data is safely on storage.
Expand All @@ -1016,9 +986,20 @@ export class OPFSPermutedVFS extends FacadeVFS {

// Publish the transaction via broadcast and IndexedDB.
this.log?.(`commit transaction ${file.txActive.txId}`);
file.broadcastChannel.postMessage(file.txActive);
tx.objectStore('pending').put(file.txActive);
tx.commit();

const txComplete = new Promise((resolve, reject) => {
const message = file.txActive;
tx.oncomplete = () => {
file.broadcastChannel.postMessage(message);
resolve();
};
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
tx.commit();
});

if (file.synchronous === 'full') {
await txComplete;
Expand Down
Loading