Skip to content

Commit

Permalink
Add argument reverse to iterate and listAll. Optimizations.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed May 31, 2024
1 parent e577ed5 commit c77e02c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 27 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## 0.15.5

## Additions

- Add argument `reverse` to `iterate` and `listAll` to allow almost instant
queries for "last x transactions matching query y".
- Reduce lock time on writing new transactions.
- Optimize the logic of writing new transactions.

## Fixes

- Make sure the results of `iterate` and `listAll` are returned in insertion
order, or reverse insertion order if `reverse` is set.

## 0.15.4

- Fix for `prompt` totally blocking the event loop in the cli tool, preventing
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ deno install -frA --name ckv jsr:@cross/kv/cli
`createIfMissing` defaults to true.
- `async set<T>(key, value)` - Stores a value.
- `async get<T>(key)` - Retrieves a value.
- `async *iterate<T>(query)` - Iterates over entries for a key.
- `async *iterate<T>(query, limit, reverse)` - Iterates over entries for a
key. Limit and reverse are optional.
- `async listAll<T>(query, limit, reverse)` - Gets all entries for a key as an
array. Limit and reverse are optional.
- `listKeys(query)` - List all keys under <query>.
- `async listAll<T>(query)` - Gets all entries for a key as an array.
- `async delete(key)` - Deletes a key-value pair.
- `async sync()` - Synchronizez the ledger with disk.
- `watch<T>(query, callback, recursive): void` - Registers a callback to be
Expand Down
13 changes: 10 additions & 3 deletions src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,11 @@ export class KVIndex {
* @param key - The key to search for (can include ranges)
* @returns An array of data row references.
*/
get(key: KVKeyInstance, limit?: number): number[] {
get(key: KVKeyInstance, limit?: number, reverse: boolean = false): number[] {
const resultSet: number[] = [];
const keyLength = key.get().length;

function recurse(node: KVIndexContent, keyIndex: number): void {
if (limit !== undefined && resultSet.length >= limit) return; // Stop recursion early if limit reached

if (keyIndex >= keyLength) {
// We've reached the end of the key
if (node.reference !== undefined) {
Expand Down Expand Up @@ -176,6 +174,15 @@ export class KVIndex {
// Start recursion from the root of the tree
recurse(this.index, 0);

// Sort the array by transaction offset, to give results sorted in insertion order
resultSet.sort();

// Reverse if requested, after sorting
if (reverse) resultSet.reverse();

// Limit if requested, after sorting and reversing
if (limit !== undefined) resultSet.splice(limit);

return resultSet;
}

Expand Down
36 changes: 17 additions & 19 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ export class KV extends EventEmitter {
* @param key - Representation of the key to search for.
* @param limit - (Optional) Maximum number of entries to yield. If not provided, all
* entries associated with the key will be yielded.
* @param reverse - (Optional) Return the results in reverse insertion order, most recent first. Defaulting to false - oldest first.
* @yields An object containing the `ts` (timestamp) and `data` for each matching entry.
*
* @example
Expand All @@ -527,12 +528,13 @@ export class KV extends EventEmitter {
public async *iterate<T = unknown>(
key: KVQuery,
limit?: number,
reverse: boolean = false,
): AsyncGenerator<KVTransactionResult<T>> {
// Throw if database isn't open
this.ensureOpen();
this.ensureIndex();
const validatedKey = new KVKeyInstance(key, true);
const offsets = this.index!.get(validatedKey, limit)!;
const offsets = this.index!.get(validatedKey, limit, reverse)!;

if (offsets === null || offsets.length === 0) {
return; // No results to yield
Expand All @@ -556,17 +558,22 @@ export class KV extends EventEmitter {
* all yielded entries into an array.
*
* @param key - Representation of the key to query.
* @param limit - (Optional) Maximum number of entries to return. If not provided, all
* entries associated with the key will be yielded.
* @param reverse - (Optional) Return the results in reverse insertion order, most recent first. Defaulting to false - oldest first.
* @returns A Promise that resolves to an array of all matching data entries.
*/
public async listAll<T = unknown>(
key: KVQuery,
limit?: number,
reverse: boolean = false,
): Promise<KVTransactionResult<T>[]> {
// Throw if database isn't open
this.ensureOpen();
this.ensureIndex();

const entries: KVTransactionResult<T>[] = [];
for await (const entry of this.iterate<T>(key)) {
for await (const entry of this.iterate<T>(key, limit, reverse)) {
entries.push(entry);
}
return entries;
Expand Down Expand Up @@ -690,31 +697,21 @@ export class KV extends EventEmitter {
currentOffset += transactionData.length;
}

// Convert buffered transactions to Uint8Array[]
const transactionsData = new Uint8Array(currentOffset);
currentOffset = 0;
for (const transaction of bufferedTransactions) {
transactionsData.set(
transaction.transactionData,
transaction.relativeOffset,
);
}

await this.ledger!.lock();
let unlocked = false;
try {
// Sync before writing the transactions
const syncResult = await this.sync(false, false);
if (syncResult.error) {
throw syncResult.error;
}

// Convert buffered transactions to Uint8Array[]
const transactionsData = bufferedTransactions.map(({ transactionData }) =>
transactionData
);

// Write all buffered transactions at once and get the base offset
const baseOffset = await this.ledger!.add(transactionsData);
const baseOffset = await this.ledger!.add(bufferedTransactions);

// Unlock early if everying successed
await this.ledger!.unlock();
unlocked = true;

// Update the index and check for errors
for (
Expand Down Expand Up @@ -745,7 +742,8 @@ export class KV extends EventEmitter {
}
}
} finally {
await this.ledger!.unlock();
// Back-up unlock
if (!unlocked) await this.ledger!.unlock();
this.pendingTransactions = []; // Clear pending transactions
this.isInTransaction = false;
}
Expand Down
10 changes: 7 additions & 3 deletions src/lib/ledger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ export class KVLedger {
* @param transactionsData An array of raw transaction data as Uint8Arrays.
* @returns The base offset where the transactions were written.
*/
public async add(transactionsData: Uint8Array[]): Promise<number> {
public async add(transactionsData: {
transactionData: Uint8Array;
}[]): Promise<number> {
this.ensureOpen();

// Used to return the first offset of the series
Expand All @@ -300,7 +302,7 @@ export class KVLedger {
let fd;
try {
fd = await rawOpen(this.dataPath, true);
for (const transactionData of transactionsData) {
for (const { transactionData } of transactionsData) {
// Append each transaction data
await writeAtPosition(fd, transactionData, currentOffset);

Expand Down Expand Up @@ -454,7 +456,9 @@ export class KVLedger {
validTransaction.offset,
true,
);
await tempLedger.add([transaction.transaction.toUint8Array()]);
await tempLedger.add([{
transactionData: transaction.transaction.toUint8Array(),
}]);
}
this.header.currentOffset = tempLedger.header.currentOffset;

Expand Down
82 changes: 82 additions & 0 deletions test/kv.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,85 @@ test("KV: list keys after deletion", async () => {

await kvStore.close();
});

test("KV: iterate in forward order with limit", async () => {
const tempFilePrefix = await tempfile();
const kvStore = new KV();
await kvStore.open(tempFilePrefix);

for (let i = 1; i <= 5; i++) {
await kvStore.set(["data", i], `Value ${i}`);
}

const limit = 3;
const expectedValues = ["Value 1", "Value 2", "Value 3"]; // Expected in reverse order
const results = [];

for await (const entry of kvStore.iterate(["data"], limit, false)) { // true for reverse
results.push(entry.data);
}

assertEquals(results, expectedValues); // Check if values match and are in the correct order
await kvStore.close();
});

test("KV: listAll in forward order with limit", async () => {
const tempFilePrefix = await tempfile();
const kvStore = new KV();
await kvStore.open(tempFilePrefix);

for (let i = 1; i <= 5; i++) {
await kvStore.set(["data", i], `Value ${i}`);
}

const limit = 3;
const expectedValues = ["Value 1", "Value 2", "Value 3"];

const results = (await kvStore.listAll(["data"], limit, false)).map((entry) =>
entry.data
);

assertEquals(results, expectedValues);
await kvStore.close();
});

test("KV: iterate in reverse order with limit", async () => {
const tempFilePrefix = await tempfile();
const kvStore = new KV();
await kvStore.open(tempFilePrefix);

for (let i = 1; i <= 5; i++) {
await kvStore.set(["data", i], `Value ${i}`);
}

const limit = 3;
const expectedValues = ["Value 5", "Value 4", "Value 3"]; // Expected in reverse order
const results = [];

for await (const entry of kvStore.iterate(["data"], limit, true)) { // true for reverse
results.push(entry.data);
}

assertEquals(results, expectedValues); // Check if values match and are in the correct order
await kvStore.close();
});

test("KV: listAll in reverse order with limit", async () => {
const tempFilePrefix = await tempfile();
const kvStore = new KV();
await kvStore.open(tempFilePrefix);

for (let i = 1; i <= 5; i++) {
await kvStore.set(["data", i], `Value ${i}`);
}

const limit = 3;
const expectedValues = ["Value 5", "Value 4", "Value 3"];

const results = (await kvStore.listAll(["data"], limit, true)).map((entry) =>
entry.data
);

assertEquals(results, expectedValues);
await kvStore.close();
});

0 comments on commit c77e02c

Please sign in to comment.