diff --git a/CHANGELOG.md b/CHANGELOG.md index 6784652..23d74e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## 0.15.5 + +## Additions + +- Add argument `reverse` to `iterate` and `listAll` to allow almost instant + queries for "last x transactions matching query y". +- Reduce lock time on writing new transactions. +- Optimize the logic of writing new transactions. + +## Fixes + +- Make sure the results of `iterate` and `listAll` are returned in insertion + order, or reverse insertion order if `reverse` is set. + ## 0.15.4 - Fix for `prompt` totally blocking the event loop in the cli tool, preventing diff --git a/README.md b/README.md index 4732cb7..f490fd7 100644 --- a/README.md +++ b/README.md @@ -111,9 +111,11 @@ deno install -frA --name ckv jsr:@cross/kv/cli `createIfMissing` defaults to true. - `async set(key, value)` - Stores a value. - `async get(key)` - Retrieves a value. - - `async *iterate(query)` - Iterates over entries for a key. + - `async *iterate(query, limit, reverse)` - Iterates over entries for a + key. Limit and reverse are optional. + - `async listAll(query, limit, reverse)` - Gets all entries for a key as an + array. Limit and reverse are optional. - `listKeys(query)` - List all keys under . - - `async listAll(query)` - Gets all entries for a key as an array. - `async delete(key)` - Deletes a key-value pair. - `async sync()` - Synchronizez the ledger with disk. - `watch(query, callback, recursive): void` - Registers a callback to be diff --git a/src/lib/index.ts b/src/lib/index.ts index be816fb..47594f2 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -120,13 +120,11 @@ export class KVIndex { * @param key - The key to search for (can include ranges) * @returns An array of data row references. */ - get(key: KVKeyInstance, limit?: number): number[] { + get(key: KVKeyInstance, limit?: number, reverse: boolean = false): number[] { const resultSet: number[] = []; const keyLength = key.get().length; function recurse(node: KVIndexContent, keyIndex: number): void { - if (limit !== undefined && resultSet.length >= limit) return; // Stop recursion early if limit reached - if (keyIndex >= keyLength) { // We've reached the end of the key if (node.reference !== undefined) { @@ -176,6 +174,15 @@ export class KVIndex { // Start recursion from the root of the tree recurse(this.index, 0); + // Sort the array by transaction offset, to give results sorted in insertion order + resultSet.sort(); + + // Reverse if requested, after sorting + if (reverse) resultSet.reverse(); + + // Limit if requested, after sorting and reversing + if (limit !== undefined) resultSet.splice(limit); + return resultSet; } diff --git a/src/lib/kv.ts b/src/lib/kv.ts index 2a0acbb..7f559fc 100644 --- a/src/lib/kv.ts +++ b/src/lib/kv.ts @@ -512,6 +512,7 @@ export class KV extends EventEmitter { * @param key - Representation of the key to search for. * @param limit - (Optional) Maximum number of entries to yield. If not provided, all * entries associated with the key will be yielded. + * @param reverse - (Optional) Return the results in reverse insertion order, most recent first. Defaulting to false - oldest first. * @yields An object containing the `ts` (timestamp) and `data` for each matching entry. * * @example @@ -527,12 +528,13 @@ export class KV extends EventEmitter { public async *iterate( key: KVQuery, limit?: number, + reverse: boolean = false, ): AsyncGenerator> { // Throw if database isn't open this.ensureOpen(); this.ensureIndex(); const validatedKey = new KVKeyInstance(key, true); - const offsets = this.index!.get(validatedKey, limit)!; + const offsets = this.index!.get(validatedKey, limit, reverse)!; if (offsets === null || offsets.length === 0) { return; // No results to yield @@ -556,17 +558,22 @@ export class KV extends EventEmitter { * all yielded entries into an array. * * @param key - Representation of the key to query. + * @param limit - (Optional) Maximum number of entries to return. If not provided, all + * entries associated with the key will be yielded. + * @param reverse - (Optional) Return the results in reverse insertion order, most recent first. Defaulting to false - oldest first. * @returns A Promise that resolves to an array of all matching data entries. */ public async listAll( key: KVQuery, + limit?: number, + reverse: boolean = false, ): Promise[]> { // Throw if database isn't open this.ensureOpen(); this.ensureIndex(); const entries: KVTransactionResult[] = []; - for await (const entry of this.iterate(key)) { + for await (const entry of this.iterate(key, limit, reverse)) { entries.push(entry); } return entries; @@ -690,17 +697,8 @@ export class KV extends EventEmitter { currentOffset += transactionData.length; } - // Convert buffered transactions to Uint8Array[] - const transactionsData = new Uint8Array(currentOffset); - currentOffset = 0; - for (const transaction of bufferedTransactions) { - transactionsData.set( - transaction.transactionData, - transaction.relativeOffset, - ); - } - await this.ledger!.lock(); + let unlocked = false; try { // Sync before writing the transactions const syncResult = await this.sync(false, false); @@ -708,13 +706,12 @@ export class KV extends EventEmitter { throw syncResult.error; } - // Convert buffered transactions to Uint8Array[] - const transactionsData = bufferedTransactions.map(({ transactionData }) => - transactionData - ); - // Write all buffered transactions at once and get the base offset - const baseOffset = await this.ledger!.add(transactionsData); + const baseOffset = await this.ledger!.add(bufferedTransactions); + + // Unlock early if everying successed + await this.ledger!.unlock(); + unlocked = true; // Update the index and check for errors for ( @@ -745,7 +742,8 @@ export class KV extends EventEmitter { } } } finally { - await this.ledger!.unlock(); + // Back-up unlock + if (!unlocked) await this.ledger!.unlock(); this.pendingTransactions = []; // Clear pending transactions this.isInTransaction = false; } diff --git a/src/lib/ledger.ts b/src/lib/ledger.ts index 793fad7..d8ceb8c 100644 --- a/src/lib/ledger.ts +++ b/src/lib/ledger.ts @@ -287,7 +287,9 @@ export class KVLedger { * @param transactionsData An array of raw transaction data as Uint8Arrays. * @returns The base offset where the transactions were written. */ - public async add(transactionsData: Uint8Array[]): Promise { + public async add(transactionsData: { + transactionData: Uint8Array; + }[]): Promise { this.ensureOpen(); // Used to return the first offset of the series @@ -300,7 +302,7 @@ export class KVLedger { let fd; try { fd = await rawOpen(this.dataPath, true); - for (const transactionData of transactionsData) { + for (const { transactionData } of transactionsData) { // Append each transaction data await writeAtPosition(fd, transactionData, currentOffset); @@ -454,7 +456,9 @@ export class KVLedger { validTransaction.offset, true, ); - await tempLedger.add([transaction.transaction.toUint8Array()]); + await tempLedger.add([{ + transactionData: transaction.transaction.toUint8Array(), + }]); } this.header.currentOffset = tempLedger.header.currentOffset; diff --git a/test/kv.test.ts b/test/kv.test.ts index e1fa264..8eb74e9 100644 --- a/test/kv.test.ts +++ b/test/kv.test.ts @@ -620,3 +620,85 @@ test("KV: list keys after deletion", async () => { await kvStore.close(); }); + +test("KV: iterate in forward order with limit", async () => { + const tempFilePrefix = await tempfile(); + const kvStore = new KV(); + await kvStore.open(tempFilePrefix); + + for (let i = 1; i <= 5; i++) { + await kvStore.set(["data", i], `Value ${i}`); + } + + const limit = 3; + const expectedValues = ["Value 1", "Value 2", "Value 3"]; // Expected in reverse order + const results = []; + + for await (const entry of kvStore.iterate(["data"], limit, false)) { // true for reverse + results.push(entry.data); + } + + assertEquals(results, expectedValues); // Check if values match and are in the correct order + await kvStore.close(); +}); + +test("KV: listAll in forward order with limit", async () => { + const tempFilePrefix = await tempfile(); + const kvStore = new KV(); + await kvStore.open(tempFilePrefix); + + for (let i = 1; i <= 5; i++) { + await kvStore.set(["data", i], `Value ${i}`); + } + + const limit = 3; + const expectedValues = ["Value 1", "Value 2", "Value 3"]; + + const results = (await kvStore.listAll(["data"], limit, false)).map((entry) => + entry.data + ); + + assertEquals(results, expectedValues); + await kvStore.close(); +}); + +test("KV: iterate in reverse order with limit", async () => { + const tempFilePrefix = await tempfile(); + const kvStore = new KV(); + await kvStore.open(tempFilePrefix); + + for (let i = 1; i <= 5; i++) { + await kvStore.set(["data", i], `Value ${i}`); + } + + const limit = 3; + const expectedValues = ["Value 5", "Value 4", "Value 3"]; // Expected in reverse order + const results = []; + + for await (const entry of kvStore.iterate(["data"], limit, true)) { // true for reverse + results.push(entry.data); + } + + assertEquals(results, expectedValues); // Check if values match and are in the correct order + await kvStore.close(); +}); + +test("KV: listAll in reverse order with limit", async () => { + const tempFilePrefix = await tempfile(); + const kvStore = new KV(); + await kvStore.open(tempFilePrefix); + + for (let i = 1; i <= 5; i++) { + await kvStore.set(["data", i], `Value ${i}`); + } + + const limit = 3; + const expectedValues = ["Value 5", "Value 4", "Value 3"]; + + const results = (await kvStore.listAll(["data"], limit, true)).map((entry) => + entry.data + ); + + assertEquals(results, expectedValues); + await kvStore.close(); +});