Skip to content

Commit

Permalink
Merge pull request #197 from rhashimoto/sync-broadcast
Browse files Browse the repository at this point in the history
Defer IDBMirrorVFS and OPFSPermutedVFS broadcasts.
  • Loading branch information
rhashimoto authored Jul 26, 2024
2 parents f1ab3d5 + bfd23c5 commit 7722688
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 75 deletions.
56 changes: 18 additions & 38 deletions src/examples/IDBMirrorVFS.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,20 +437,18 @@ export class IDBMirrorVFS extends FacadeVFS {
}

// In order to write, our view of the database must be up to date.
// This is tricky because transactions are published in two ways:
// via BroadcastChannel and written to IndexedDB. We must handle
// the rare cases where a transaction is in one but not the other
// because of latency or crash.
//
// First fetch all transactions in IndexedDB equal to or greater
// than our view.
// To check this, first fetch all transactions in IndexedDB equal to
// or greater than our view.
const idbTx = this.#idb.transaction(['blocks', 'tx']);
const range = IDBKeyRange.bound(
[file.path, file.viewTx.txId],
[file.path, Infinity]);

/** @type {Transaction[]} */
const entries = await idbX(idbTx.objectStore('tx').getAll(range));

// Ideally the fetched list of transactions should contain one
// entry matching our view. If not then our view is out of date.
if (entries.length && entries.at(-1).txId > file.viewTx.txId) {
// There are newer transactions in IndexedDB that we haven't
// seen via broadcast. Ensure that they are incorporated on unlock,
Expand All @@ -469,29 +467,7 @@ export class IDBMirrorVFS extends FacadeVFS {
return VFS.SQLITE_BUSY
}

if (entries[0]?.txId !== file.viewTx.txId) {
// IndexedDB doesn't contain our current view transaction. This
// could happen if the connection that wrote the transaction
// crashed before it committed to IndexedDB. To fix this, add
// the transaction to IndexedDB ourselves.
if (file.viewTx.txId) {
console.warn(`adding missing tx ${file.viewTx.txId} to IndexedDB`);
const tx = this.#idb.transaction(['blocks', 'tx'], 'readwrite');
const txComplete = new Promise((resolve, reject) => {
tx.oncomplete = resolve;
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
});
tx.objectStore('tx').put(file.viewTx);
for (const [offset, data] of file.viewTx.blocks) {
tx.objectStore('blocks').put({ path: file.path, offset, data });
}
tx.commit();
await txComplete;
}
}
console.assert(entries[0]?.txId === file.viewTx.txId || !file.viewTx.txId);
break;
case VFS.SQLITE_LOCK_EXCLUSIVE:
await this.#lock(file, 'write');
Expand Down Expand Up @@ -713,18 +689,22 @@ export class IDBMirrorVFS extends FacadeVFS {
const txSansData = Object.assign({}, file.txActive);
txSansData.blocks = new Map(Array.from(file.txActive.blocks, ([k]) => [k, null]));
idbTx.objectStore('tx').put(txSansData);
idbTx.commit();

// Broadcast transaction once it commits.
const complete = new Promise((resolve, reject) => {
const message = file.txActive;
idbTx.oncomplete = () => {
file.broadcastChannel.postMessage(message);
resolve();
};
idbTx.onabort = () => reject(idbTx.error);
idbTx.commit();
});

if (file.synchronous === 'full') {
await new Promise((resolve, reject) => {
idbTx.oncomplete = resolve;
idbTx.onabort = () => reject(idbTx.error);
})
await complete;
}

// Broadcast the transaction.
file.broadcastChannel.postMessage(file.txActive);

file.txActive = null;
file.txWriteHint = false;
}
Expand Down
55 changes: 18 additions & 37 deletions src/examples/OPFSPermutedVFS.js
Original file line number Diff line number Diff line change
Expand Up @@ -565,18 +565,16 @@ export class OPFSPermutedVFS extends FacadeVFS {
}

// In order to write, our view of the database must be up to date.
// This is tricky because transactions are published in two ways:
// via BroadcastChannel and written to IndexedDB. We must handle
// the rare cases where a transaction is in one but not the other
// because of latency or crash.
//
// First fetch all transactions in IndexedDB equal to or greater
// than our view.
// To check this, first fetch all transactions in IndexedDB equal to
// or greater than our view.
const tx = file.idb.transaction(['pending']);
const range = IDBKeyRange.lowerBound(file.viewTx.txId);

/** @type {Transaction[]} */
const entries = await idbX(tx.objectStore('pending').getAll(range));

// Ideally the fetched list of transactions should contain one
// entry matching our view. If not then our view is out of date.
if (entries.length && entries.at(-1).txId > file.viewTx.txId) {
// There are newer transactions in IndexedDB that we haven't
// seen via broadcast. Ensure that they are incorporated on unlock,
Expand All @@ -585,27 +583,6 @@ export class OPFSPermutedVFS extends FacadeVFS {
file.locks.reserved();
return VFS.SQLITE_BUSY
}

if (entries[0]?.txId !== file.viewTx.txId) {
// IndexedDB doesn't contain our current view transaction. This
// could happen if the connection that wrote the transaction
// crashed before it committed to IndexedDB. To fix this, add
// the transaction to IndexedDB ourselves.
if (file.viewTx.txId) {
console.warn(`adding missing tx ${file.viewTx.txId} to IndexedDB`);
const tx = file.idb.transaction('pending', 'readwrite');
const txComplete = new Promise((resolve, reject) => {
tx.oncomplete = resolve;
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
});
tx.objectStore('pending').put(file.viewTx);
tx.commit();
await txComplete;
}
}
break;
case VFS.SQLITE_LOCK_EXCLUSIVE:
await this.#lock(file, 'write');
Expand Down Expand Up @@ -983,13 +960,6 @@ export class OPFSPermutedVFS extends FacadeVFS {
['pages', 'pending'],
'readwrite',
{ durability: file.synchronous === 'full' ? 'strict' : 'relaxed'});
const txComplete = new Promise((resolve, reject) => {
tx.oncomplete = resolve;
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
});

if (file.txActive.oldestTxId) {
// Ensure that all pending data is safely on storage.
Expand All @@ -1016,9 +986,20 @@ export class OPFSPermutedVFS extends FacadeVFS {

// Publish the transaction via broadcast and IndexedDB.
this.log?.(`commit transaction ${file.txActive.txId}`);
file.broadcastChannel.postMessage(file.txActive);
tx.objectStore('pending').put(file.txActive);
tx.commit();

const txComplete = new Promise((resolve, reject) => {
const message = file.txActive;
tx.oncomplete = () => {
file.broadcastChannel.postMessage(message);
resolve();
};
tx.onabort = () => {
file.abortController.abort();
reject(tx.error);
};
tx.commit();
});

if (file.synchronous === 'full') {
await txComplete;
Expand Down

0 comments on commit 7722688

Please sign in to comment.