From b132a83577194441d283a8d79bac44ac1db543ee Mon Sep 17 00:00:00 2001 From: Hexagon Date: Wed, 15 May 2024 00:06:45 +0200 Subject: [PATCH] Concurrency fixes. Docs. --- .gitignore | 6 +- README.md | 75 +++++++++++-- deno.json | 2 +- src/constants.ts | 8 +- src/kv.test.ts | 69 +++++++++++- src/kv.ts | 238 +++++++++++++++++++++++++++++++++------- src/ledger.ts | 78 ++++++++----- src/transaction.test.ts | 11 +- src/transaction.ts | 21 ++-- src/utils/file.ts | 1 - src/utils/hash.ts | 7 +- stress-test/kvtest.ts | 26 +++++ stress-test/pup.json | 12 ++ 13 files changed, 454 insertions(+), 100 deletions(-) create mode 100644 stress-test/kvtest.ts create mode 100644 stress-test/pup.json diff --git a/.gitignore b/.gitignore index cb3d905..df506c2 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,8 @@ bunfig.toml # Local lab/ -lab.ts \ No newline at end of file +lab.ts +mydatabase*/ + +# Pup data +.pup \ No newline at end of file diff --git a/README.md b/README.md index 211688d..132be47 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,10 @@ A cross-platform, in-memory indexed and file based Key/Value database for JavaScript and TypeScript, designed for seamless multi-process access and compatibility across Node.js, Deno, and Bun. -_Please note that `cross/kv` is still under development. The API and features -are starting to stabilize, but are still subject to change._ +_Please note that `cross/kv` is currently in **beta**. The API and features are +starting to stabilize, but are still subject to change._ -## **Features** +## Features - **Indexed Key/Value Storage**: Store and retrieve data easily using hierarchical keys, with an in-memory index to provide fast lookups of large @@ -24,7 +24,7 @@ are starting to stabilize, but are still subject to change._ - **Key Ranges:** Retrieve ranges of data efficiently directly from the index using key ranges. -## **Installation** +## Installation Full installation instructions available at @@ -39,12 +39,13 @@ deno add @cross/kv bunx jsr add @cross/kv ``` -## **Simple Usage** +## Simple Usage ```typescript import { KV } from "@cross/kv"; const kvStore = new KV(); + await kvStore.open("./mydatabase/"); // Path where data files will be stored // Set a value @@ -61,7 +62,7 @@ await kvStore.delete(["data", "username"]); await kvStore.close(); ``` -## **Advanced Usage** +## Advanced Usage ```typescript import { KV } from "@cross/kv"; @@ -118,11 +119,12 @@ console.log("Ben: ", ben); // Outputs the object of Ben await kvStore.close(); ``` -## **API Documentation** +## API Documentation ### Methods -- `KV` class +- `KV(options)` - Main class. Options such as `autoSync` and `syncIntervalMs` + are optional. - `async open(filepath)` - Opens the KV store. - `async set(key, value)` - Stores a value. - `async get(key)` - Retrieves a value. @@ -130,8 +132,11 @@ await kvStore.close(); - `async listAll(query)` - Gets all entries for a key as an array. - `delete(key)` - Deletes a key-value pair. - `beginTransaction()` - Starts a transaction. - - `async endTransaction()` - Ends a transaction. + - `async endTransaction()` - Ends a transaction, returns a list of `Errors` if + any occurred. - `async vacuum()` - Reclaims storage space. + - `on(eventName, eventData)` - Listen for events such as `sync`, + `watchdogError` or `closing`. - `close()` - Closes the KV store. ### Keys @@ -185,6 +190,58 @@ objects like `{ from, to }`. An empty range (`{}`) means any document. ["products", "book", {}, "author"] ``` +## Multi-Process Synchronization + +`cross/kv` has a built in mechanism for synchronizing the in-memory index with +the transaction ledger, allowing for multiple processes to work with the same +database simultanously. Due to the append-only design of the ledger, each +process can update it's internal state by reading everything after the last +processed transaction. An internal watchdog actively checks for new transactions +and updates the in-memory index accordingly. The synchnization frequency can be +controlled by the option `syncIntervalMs`, which defaults to `1000` (1 second). + +In single process scenarios, the watchdog can be disabled by setting the +`autoSync` option to `false`. + +Subscribe to the `sync` event to receive notifications about synchronization +results and potential errors. + +```typescript +const kvStore = new KV(); +await kvStore.open("./mydatabase/"); + +// Subscribe to sync events for monitoring +kvStore.on("sync", (eventData) => { + switch (eventData.result) { + case "ready": + console.log("Everything is up to date."); + break; + case "blocked": + console.warn( + "Synchronization is temporarily blocked (e.g., during vacuum).", + ); + break; + case "success": + console.log( + "Synchronization completed successfully, new transactions added to the index.", + ); + break; + case "ledgerInvalidated": + console.warn( + "Ledger invalidated! The database hash been reopened and the index resynchronized to maintain consistency.", + ); + break; + case "error": + // Error Handling + console.error("Synchronization error:", eventData.error); + // Log the error, report it, or take appropriate action. + break; + default: + console.warn("Unknown sync result:", eventData.result); + } +}); +``` + ## **Contributing** Contributions are welcome! Feel free to open issues or submit pull requests. diff --git a/deno.json b/deno.json index cdaf28d..9812867 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@cross/kv", - "version": "0.0.11", + "version": "0.0.12", "exports": { ".": "./mod.ts" }, diff --git a/src/constants.ts b/src/constants.ts index 0a4afd6..855d0da 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1,11 +1,13 @@ -export const LOCK_DEFAULT_MAX_RETRIES = 32; -export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 20; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total. +export const LOCK_DEFAULT_MAX_RETRIES = 15; +export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 100; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total. export const LOCK_STALE_TIMEOUT_S = 60_000; export const SUPPORTED_LEDGER_VERSIONS = ["ALPH"]; export const LEDGER_BASE_OFFSET = 1_024; -export const LEDGER_PREFETCH_BYTES = 1_024; +export const LEDGER_MAX_READ_FAILURES = 10; + +export const LEDGER_PREFETCH_BYTES = 16000; export const SYNC_INTERVAL_MS = 1_000; diff --git a/src/kv.test.ts b/src/kv.test.ts index db3cf99..5551cc5 100644 --- a/src/kv.test.ts +++ b/src/kv.test.ts @@ -1,7 +1,8 @@ -import { assertEquals, assertRejects } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { test } from "@cross/test"; -import { KV, type KVDataEntry } from "./kv.ts"; +import { KV, type KVDataEntry, KVOptions } from "./kv.ts"; import { tempfile } from "@cross/fs"; +import { SYNC_INTERVAL_MS } from "./constants.ts"; test("KV: set, get and delete (numbers and strings)", async () => { const tempFilePrefix = await tempfile(); @@ -289,3 +290,67 @@ test("KV: vacuum", async () => { kvStore.close(); }); + +test("KV Options: defaults work correctly", () => { + const kv = new KV(); // No options provided + assertEquals(kv.autoSync, true); + assertEquals(kv.syncIntervalMs, SYNC_INTERVAL_MS); + kv.close(); +}); + +test("KV Options: custom options are applied", () => { + const options: KVOptions = { + autoSync: false, + syncIntervalMs: 5000, + }; + const kv = new KV(options); + assertEquals(kv.autoSync, false); + assertEquals(kv.syncIntervalMs, 5000); + kv.close(); +}); + +test("KV Options: throws on invalid autoSync type", () => { + const options: KVOptions = { + // @ts-expect-error Test + autoSync: "not a boolean", // Incorrect type + }; + assertThrows( + () => new KV(options), + TypeError, + "Invalid option: autoSync must be a boolean", + ); +}); + +test("KV Options: throws on invalid syncIntervalMs type", () => { + const options: KVOptions = { + // @ts-expect-error Test + syncIntervalMs: "not a number", // Incorrect type + }; + assertThrows( + () => new KV(options), + TypeError, + "Invalid option: syncIntervalMs must be a positive integer", + ); +}); + +test("KV Options: throws on negative syncIntervalMs", () => { + const options: KVOptions = { + syncIntervalMs: -1000, // Negative value + }; + assertThrows( + () => new KV(options), + TypeError, + "Invalid option: syncIntervalMs must be a positive integer", + ); +}); + +test("KV Options: throws on zero syncIntervalMs", () => { + const options: KVOptions = { + syncIntervalMs: 0, // Zero value + }; + assertThrows( + () => new KV(options), + TypeError, + "Invalid option: syncIntervalMs must be a positive integer", + ); +}); diff --git a/src/kv.ts b/src/kv.ts index 3b6d098..dcbe057 100644 --- a/src/kv.ts +++ b/src/kv.ts @@ -1,10 +1,15 @@ // deno-lint-ignore-file no-explicit-any + +// Internal dependencies import { KVIndex } from "./index.ts"; import { type KVKey, KVKeyInstance, type KVQuery } from "./key.ts"; import { KVOperation, KVTransaction } from "./transaction.ts"; import { KVLedger } from "./ledger.ts"; import { SYNC_INTERVAL_MS } from "./constants.ts"; +// External dependencies +import { EventEmitter } from "node:events"; + /** * Represents a single data entry returned after querying the Key/Value store. */ @@ -21,25 +26,82 @@ export interface KVDataEntry { } /** - * Cross-platform Key-Value store implementation backed by file storage. + * Options for configuring the behavior of the KV store. */ -export class KV { - private index: KVIndex = new KVIndex(); +export interface KVOptions { + /** + * Enables or disables automatic synchronization of the in-memory index with the on-disk ledger. + * + * When enabled (default), a background process will periodically sync the index to ensure consistency + * across multiple processes. Disabling this may improve performance in single-process scenarios, + * but you'll need to manually call `kvStore.sync()` to keep the index up-to-date. + * + * @defaultValue `true` + */ + autoSync?: boolean; - private pendingTransactions: KVTransaction[] = []; - private isInTransaction: boolean = false; + /** + * The time interval (in milliseconds) between automatic synchronization operations. + * + * This value controls how frequently the in-memory index is updated with changes from the on-disk ledger. + * A shorter interval provides more up-to-date data at the cost of potentially higher overhead. + * A longer interval reduces overhead but may result in stale data. + * + * @defaultValue `1000` + */ + syncIntervalMs?: number; +} +/** + * A cross-platform key-value store backed by file storage. + * + * Provides a persistent and reliable storage mechanism for key-value pairs, + * using an on-disk ledger for data integrity and an in-memory index for efficient retrieval. + */ +export class KV extends EventEmitter { + // Storage + private index: KVIndex = new KVIndex(); private ledger?: KVLedger; + private pendingTransactions: KVTransaction[] = []; + + // Configuration private ledgerPath?: string; - private watchdogTimer?: number; + public autoSync: boolean = true; // Public only to allow testing + public syncIntervalMs: number = SYNC_INTERVAL_MS; // Public only to allow testing + // States private blockSync: boolean = false; // Syncing can be blocked during vacuum - private aborted: boolean = false; + private isInTransaction: boolean = false; + private watchdogTimer?: number; // Undefined if not scheduled or currently running + private watchdogPromise?: Promise; + + constructor(options: KVOptions = {}) { + super(); + + // Validate and set options + // - autoSync + if ( + options.autoSync !== undefined && typeof options.autoSync !== "boolean" + ) { + throw new TypeError("Invalid option: autoSync must be a boolean"); + } + this.autoSync = options.autoSync ?? true; + // - syncIntervalMs + if ( + options.syncIntervalMs !== undefined && + (!Number.isInteger(options.syncIntervalMs) || options.syncIntervalMs <= 0) + ) { + throw new TypeError( + "Invalid option: syncIntervalMs must be a positive integer", + ); + } + this.syncIntervalMs = options.syncIntervalMs ?? SYNC_INTERVAL_MS; - constructor() { + if (this.autoSync) { + this.watchdogPromise = this.watchdog(); + } } - /** * Opens the Key-Value store based on a provided file path. * Initializes the index and data files. @@ -50,46 +112,121 @@ export class KV { public async open( filePath: string, createIfMissing: boolean = true, - forceSync: boolean = false, ) { + // Do not allow re-opening a closed database + if (this.aborted) { + throw new Error("Could not open, database already closed."); + } + // If there is an existing ledger, close it and clear the index if (this.ledger) { this.ledger?.close(); this.index.clear(); - // ToDo: Is an abort signal needed to prevent a current watchdog to recurse? - clearTimeout(this.watchdogTimer!); // Clear the timer if it exists } // Open the ledger, and start a new watchdog this.ledger = new KVLedger(filePath); this.ledgerPath = filePath; await this.ledger.open(createIfMissing); - await this.watchdog(forceSync); + + // Do the initial synchronization + // - If `this.autoSync` is enabled, additional synchronizations will be carried out every `this.syncIntervalMs` + await this.sync(true); } /** - * Starts a watchdog function that periodically syncs the ledger with disk. + * Starts a background process to periodically synchronize the in-memory index with the on-disk ledger. + * + * This function is crucial for maintaining consistency between the index and the ledger when the database is + * accessed by multiple processes or consumers. + * + * It is automatically invoked if `autoSync` is enabled during construction. + * + * @emits sync - Emits an event if anything goes wrong, containing the following information: + * - `result`: "error" + * - `error`: Error object + * + * @remarks + * - The synchronization interval is controlled by the `syncIntervalMs` property. + * - If the ledger is not open or is in the process of closing, the synchronization will not occur. + * - Errors during synchronization are emitted as `sync` events with the error in the payload. */ - private async watchdog(forceSync: boolean = false) { + private async watchdog() { if (this.aborted) return; - await this.sync(forceSync); + + // Wrap all operations in try/catch + try { + await this.sync(); + } catch (watchdogError) { + // Use the same event reporting format as the sync() method + const syncResult = "error"; + const errorDetails = new Error( + "Error in watchdog: " + watchdogError.message, + { cause: watchdogError }, + ); + // @ts-ignore .emit does indeed exist + this.emit("sync", { result: syncResult, error: errorDetails }); + } // Reschedule - this.watchdogTimer = setTimeout(() => this.watchdog(), SYNC_INTERVAL_MS); + this.watchdogTimer = setTimeout( + async () => { + // Make sure current run is done + await this.watchdogPromise; + + // Initiate a new run + this.watchdogPromise = this.watchdog(); + }, + this.syncIntervalMs, + ); } + /** + * Synchronizes the in-memory index with the on-disk ledger. + * + * This method fetches new transactions from the ledger and applies them to the index. + * If the ledger is invalidated, it automatically re-opens the database. + * + * @param force - If true, forces synchronization even if it's currently blocked (e.g., during a vacuum). + * + * @emits sync - Emits an event with the synchronization result. The event detail object has the following structure: + * - `result`: | "error" + * - `error`: Error object (if an error occurred) or null + * + * @throws {Error} If an unexpected error occurs during synchronization. + */ private async sync(force: boolean = false) { + // Early returns + if (!this.ledger || this.ledger?.isClosing()) return; if (this.aborted) return; - if (this.blockSync && !force) return; + if (this.blockSync && !force) { + // @ts-ignore .emit does indeed exist + this.emit("sync", { result: "blocked", error: errorDetails }); + return; + } + + let syncResult: + | "ready" + | "blocked" + | "success" + | "ledgerInvalidated" + | "error" = "ready"; + let errorDetails: Error | null = null; + try { const newTransactions = await this.ledger?.sync(); // If sync() do return null the ledger is invalidated // - Return without rescheduling the watchdog, and open the new ledger if (newTransactions === null) { - return this.open(this.ledgerPath!, false); - } - - if (newTransactions) { + // @ts-ignore .emit does indeed exist + syncResult = "ledgerInvalidated"; + await this.open(this.ledgerPath!, false); + } else if (newTransactions) { + // Change status to success if there are new transactions + if (newTransactions.length > 0) { + syncResult = "success"; + } + // Handle each new transactionx for (const entry of newTransactions) { try { // Apply transaction to the index @@ -101,26 +238,50 @@ export class KV { this.index.delete(entry.key); break; } - } catch (_e) { - console.error(_e); - throw new Error("Error while encoding data"); + } catch (transactionError) { + // Change result to error + syncResult = "error"; + errorDetails = new Error( + "Error processing transaction: " + transactionError.message, + { cause: transactionError }, + ); + // @ts-ignore .emit does indeed exist + this.emit("sync", { result: syncResult, error: errorDetails }); } } + } else { + throw new Error("Undefined error during ledger sync"); } - } catch (error) { - console.error("Error in watchdog sync:", error); + } catch (syncError) { + syncResult = "error"; + errorDetails = new Error( + "Error during ledger sync: " + syncError.message, + { cause: syncError }, + ); + } finally { + // @ts-ignore .emit does indeed exist + this.emit("sync", { result: syncResult, error: errorDetails }); } } /** - * Performs a vacuum operation on the underlying ledger to reclaim space. + * Performs a vacuum operation to reclaim space in the underlying ledger. + * + * This operation is essential for maintaining performance as the database grows over time. + * It involves rewriting the ledger to remove deleted entries, potentially reducing its size. + * + * @remarks + * - Vacuuming temporarily blocks regular synchronization (`blockSync` is set to `true`). + * - The database is automatically re-opened after the vacuum is complete to ensure consistency. + * + * @async */ public async vacuum(): Promise { this.blockSync = true; await this.ledger?.vacuum(); // Force re-opening the database - await this.open(this.ledgerPath!, false, true); + await this.open(this.ledgerPath!, false); this.blockSync = false; } @@ -224,7 +385,7 @@ export class KV { if (result?.transaction) { yield { ts: result?.transaction.timestamp!, - data: await result?.transaction.validateAndGetData(), + data: result?.transaction.getData(), }; count++; } @@ -327,12 +488,11 @@ export class KV { } /** - * Processes a single transaction and makes necessary updates to the index and - * data files. - * - * File locks should be handled outside this function. + * Processes a single transaction and updates the index and data files. * * @param pendingTransaction - The transaction to execute. + * + * @throws {Error} If the transaction fails or if there's an issue updating the index or data files. */ async runTransaction( pendingTransaction: KVTransaction, @@ -362,13 +522,13 @@ export class KV { } } - public close() { + public async close() { + // First await current watchdog run + await this.watchdogPromise; + // @ts-ignore Closing ledger + this.emit("closing"); this.aborted = true; clearTimeout(this.watchdogTimer!); // Clear the timer if it exists this.ledger?.close(); } - - public unsafeGetIndex(): KVIndex { - return this.index; - } } diff --git a/src/ledger.ts b/src/ledger.ts index 0cac0ff..9148fd0 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -8,6 +8,7 @@ import { import { lock, unlock } from "./utils/file.ts"; import { LEDGER_BASE_OFFSET, + LEDGER_MAX_READ_FAILURES, LEDGER_PREFETCH_BYTES, SUPPORTED_LEDGER_VERSIONS, } from "./constants.ts"; @@ -99,39 +100,50 @@ export class KVLedger { // Update offset await lock(this.dataPath); - await this.readHeader(false); - - // If the ledger is re-created (by vacuum or overwriting), there will be a time in the cached header - // and there will be a different time after reading the header - if (currentCreated !== 0 && currentCreated !== this.header.created) { - await unlock(this.dataPath); + let reusableFd; + try { + await this.readHeader(false); - // Return 0 to invalidate this ledger - return null; - } + // If the ledger is re-created (by vacuum or overwriting), there will be a time in the cached header + // and there will be a different time after reading the header + if (currentCreated !== 0 && currentCreated !== this.header.created) { + await unlock(this.dataPath); - // If there is new transactions - const reusableFd = await rawOpen(this.dataPath, false); - while (currentOffset < this.header.currentOffset) { - const result = await this.rawGetTransaction( - currentOffset, - false, - false, - reusableFd, - ); - newTransactions.push({ - key: result.transaction.key!, - operation: result.transaction.operation!, - offset: currentOffset, - }); // Add the transaction - currentOffset += result.length; // Advance the offset - } - reusableFd.close(); + // Return 0 to invalidate this ledger + return null; + } - // Update the cached header's currentOffset - this.header.currentOffset = currentOffset; + // If there is new transactions + reusableFd = await rawOpen(this.dataPath, false); + let failures = 0; + while (currentOffset < this.header.currentOffset) { + if (failures > LEDGER_MAX_READ_FAILURES) { + throw new Error("Internal sync error: Read attempts exceeded"); + } + try { + const result = await this.rawGetTransaction( + currentOffset, + false, + false, + reusableFd, + ); + newTransactions.push({ + key: result.transaction.key!, + operation: result.transaction.operation!, + offset: currentOffset, + }); // Add the transaction + currentOffset += result.length; // Advance the offset + } catch (_e) { + failures++; + } + } - await unlock(this.dataPath); + // Update the cached header's currentOffset + this.header.currentOffset = currentOffset; + } finally { + if (reusableFd) reusableFd.close(); + await unlock(this.dataPath); + } return newTransactions; } @@ -199,7 +211,6 @@ export class KVLedger { // Set numeric fields headerView.setFloat64(8, this.header.created, false); // false for little-endian headerView.setUint32(16, this.header.currentOffset, false); - // Write the header data await writeAtPosition(fd, new Uint8Array(headerBuffer), 0); } finally { @@ -214,6 +225,7 @@ export class KVLedger { ): Promise { const offset = this.header.currentOffset; if (doLock) await lock(this.dataPath); + await this.readHeader(false); let fd; try { fd = await rawOpen(this.dataPath, true); @@ -268,10 +280,12 @@ export class KVLedger { 4, false, ); + const transaction = new KVTransaction(); // Read transaction header let transactionHeaderData; + // - directly from file if (headerLength + 8 > LEDGER_PREFETCH_BYTES) { transactionHeaderData = await readAtPosition( @@ -386,4 +400,8 @@ export class KVLedger { await unlock(this.dataPath); } } + + public isClosing() { + return this.aborted; + } } diff --git a/src/transaction.test.ts b/src/transaction.test.ts index 8aa8d96..009f119 100644 --- a/src/transaction.test.ts +++ b/src/transaction.test.ts @@ -5,7 +5,7 @@ import { KVOperation, KVTransaction } from "./transaction.ts"; test("KVTransaction: create and toUint8Array", async () => { const key = new KVKeyInstance(["testKey"]); - const value = { name: "Alice", age: 30 }; + const value = { test: "data" }; const timestamp = Date.now(); const transaction = new KVTransaction(); @@ -13,8 +13,11 @@ test("KVTransaction: create and toUint8Array", async () => { const uint8Array = transaction.toUint8Array(); const decodedTransaction = new KVTransaction(); - decodedTransaction.headerFromUint8Array(uint8Array.slice(8)); // Skip the initial 8 bytes (header and data lengths) - decodedTransaction.dataFromUint8Array(transaction.data!); + const headerLength = new DataView(uint8Array.buffer).getUint32(0); + decodedTransaction.headerFromUint8Array( + uint8Array.slice(8, 8 + headerLength), + ); // Skip the initial 8 bytes (header and data lengths) + await decodedTransaction.dataFromUint8Array(transaction.data!); assertEquals( decodedTransaction.key?.getKeyRepresentation(), key.getKeyRepresentation(), @@ -22,7 +25,7 @@ test("KVTransaction: create and toUint8Array", async () => { assertEquals(decodedTransaction.operation, transaction.operation); assertEquals(decodedTransaction.timestamp, transaction.timestamp); - const decodedData = await decodedTransaction.validateAndGetData(); + const decodedData = decodedTransaction.getData(); assertEquals(decodedData, value); }); diff --git a/src/transaction.ts b/src/transaction.ts index ea567be..67872f9 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -129,9 +129,18 @@ export class KVTransaction { throw new Error("Invalid data: Hash data truncated"); } this.hash = data.subarray(offset, offset + hashLength); + offset += hashLength; + + // Do not allow extra data + if (offset !== data.byteLength) { + throw new Error("Invalid data: Extra data in transaction header"); + } } - public dataFromUint8Array(data: Uint8Array) { + public async dataFromUint8Array(data: Uint8Array) { + if (!compareHash(await sha1(data), this.hash!)) { + throw new Error("Invalid data: Read data not matching hash"); + } this.data = data; } @@ -181,14 +190,8 @@ export class KVTransaction { return fullData; } - public async validateAndGetData(): Promise { - // Validate Transaction Header - const expectedHash = await sha1(this.data!); - if (!compareHash(this.hash!, expectedHash)) { - throw new Error("Invalid data: Transaction header hash mismatch"); - } - - // Return validated data + public getData(): unknown | null { + // Return data, should be validated through create or fromUint8Array if (this.data) { return decode(this.data); } else { diff --git a/src/utils/file.ts b/src/utils/file.ts index 11c1784..d576f57 100644 --- a/src/utils/file.ts +++ b/src/utils/file.ts @@ -101,7 +101,6 @@ export async function lock(filePath: string): Promise { } else { // Runtime.Node await writeFile(lockFile, "", { flag: "wx" }); // 'wx' for exclusive creation } - // Lock acquired! return true; } catch (error) { diff --git a/src/utils/hash.ts b/src/utils/hash.ts index cebce0c..397d74d 100644 --- a/src/utils/hash.ts +++ b/src/utils/hash.ts @@ -1,5 +1,10 @@ +/** + * node:crypto is used instead of global crypto.subtle to get Node/Bun-support without polyfilling + */ +import { subtle } from "node:crypto"; + export async function sha1(data: Uint8Array): Promise { - return new Uint8Array(await crypto.subtle.digest("SHA-1", data)); + return new Uint8Array(await subtle.digest("SHA-1", data)); } export function compareHash(arr1: Uint8Array, arr2: Uint8Array): boolean { diff --git a/stress-test/kvtest.ts b/stress-test/kvtest.ts new file mode 100644 index 0000000..87f2be3 --- /dev/null +++ b/stress-test/kvtest.ts @@ -0,0 +1,26 @@ +import { KV } from "../mod.ts"; +import { getEnv } from "jsr:@cross/env"; +import { PupTelemetry } from "jsr:@pup/telemetry"; +const telemetry = new PupTelemetry(); // Initializes telemetry +const db = "./mydatabase18/"; +const kvStore = new KV(); +await kvStore.open(db); // Path where data files will be stored +const inst = getEnv("PUP_CLUSTER_INSTANCE"); +console.log(`Instance ${inst} starting...`); +const recurser = async () => { + const randomValue = Math.ceil(Math.random() * 1000); + console.log( + `Instance ${inst} writing ${randomValue} to ["values",${randomValue}]`, + ); + await kvStore.set(["values", randomValue], randomValue); + console.log( + `Instance ${inst} reading ${ + (await kvStore.get(["values", randomValue]))?.data + } from ["values",${randomValue}]`, + ); + setTimeout(() => recurser(), Math.random() * 10000); +}; +recurser(); +kvStore.on("sync", (d) => { + console.log(`Instance ${inst} synced with result ${d.error}`); +}); diff --git a/stress-test/pup.json b/stress-test/pup.json new file mode 100644 index 0000000..1270f0b --- /dev/null +++ b/stress-test/pup.json @@ -0,0 +1,12 @@ +{ + "processes": [ + { + "id": "my-scalable-app", + "cmd": "deno run -A kvtest.ts", + "autostart": true, + "cluster": { + "instances": 20 + } + } + ] +}