Skip to content

Commit

Permalink
Add .watch + various fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed May 20, 2024
1 parent f85cbae commit 4587511
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 65 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## 0.11.0

- Update ledger version `BETA`->`B011` "Beta 0.11"
- Use the supplied database path without adding `.data`
- Reduce header length from 1024 to 256 bytes
- Add feature `.watch(query, callback, recursive)`
- Change lockfile name from `path/to/db.lock` to `path/to/db-lock`
- Change temporary name from `path/to/db.tmp` to `path/to/db-tmp`
- Allow all unicode letters and numbers in string key parts
- Add `CHANGELOG.md`
- Code cleanup
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import { KV } from "@cross/kv";

const kvStore = new KV();

await kvStore.open("./mydatabase/"); // Path where data files will be stored
// Open the database, path and database is created if it does not exist
await kvStore.open("data/mydatabase.db");

// Set a value
await kvStore.set(["data", "username"], "Alice");
Expand All @@ -71,7 +72,7 @@ import { KV } from "@cross/kv";
const kvStore = new KV();

// Open the database
await kvStore.open("./mydatabase/");
await kvStore.open("data/mydatabase.db");

// Store some values/documents indexed by users.by_id.<id>
await kvStore.set(["users", "by_id", 1], {
Expand Down Expand Up @@ -125,14 +126,20 @@ await kvStore.close();

- `KV(options)` - Main class. Options such as `autoSync` and `syncIntervalMs`
are optional.
- `async open(filepath)` - Opens the KV store.
- `async open(filepath, createIfMissing)` - Opens the KV store.
`createIfMissing` defaults to true.
- `async set(key, value)` - Stores a value.
- `async get(key)` - Retrieves a value.
- `async *iterate(query)` - Iterates over entries for a key.
- `listKeys(query)` - List all keys under <query>.
- `async listAll(query)` - Gets all entries for a key as an array.
- `async delete(key)` - Deletes a key-value pair.
- `async sync()` - Synchronizez the ledger with disk.
- `watch(query, callback, recursive): void` - Registers a callback to be
called whenever a new transaction matching the given query is added to the
database.
- `unwatch(query, callback): void` - Unregisters a previously registered watch
handler.
- `beginTransaction()` - Starts a transaction.
- `async endTransaction()` - Ends a transaction, returns a list of `Errors` if
any occurred.
Expand Down Expand Up @@ -244,7 +251,7 @@ synchronization results and potential errors:

```typescript
const kvStore = new KV();
await kvStore.open("./mydatabase/");
await kvStore.open("db/mydatabase.db");

kvStore.on("sync", (eventData) => {
switch (eventData.result) {
Expand All @@ -253,9 +260,10 @@ kvStore.on("sync", (eventData) => {
case "success": // Synchronization successful, new transactions added
case "ledgerInvalidated": // Ledger recreated, database reopened and index resynchronized
case "error": // An error occurred during synchronization
default:
// Handle unexpected eventData.result values if needed
}
});
```

## Contributing

Expand All @@ -268,3 +276,4 @@ package in most distributions.
## **License**

MIT License
```
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cross/kv",
"version": "0.10.0",
"version": "0.11.0",
"exports": {
".": "./mod.ts"
},
Expand Down
9 changes: 4 additions & 5 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
export const LOCK_DEFAULT_MAX_RETRIES = 32;
export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 30; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total.
export const LOCK_STALE_TIMEOUT_MS = 60_000;
export const LEDGER_CURRENT_VERSION: string = "BETA";
export const LEDGER_CURRENT_VERSION: string = "B011";
export const SUPPORTED_LEDGER_VERSIONS: string[] = [
"ALPH",
LEDGER_CURRENT_VERSION,
];
export const LEDGER_MAX_READ_FAILURES = 10;
export const LEDGER_PREFETCH_BYTES = 2_048;
export const SYNC_INTERVAL_MS = 2_500; // Overridable with instance configuration

// Extremely constant
export const LEDGER_BASE_OFFSET = 1_024; // DO NOT CHANGE!
export const KV_KEY_ALLOWED_CHARS = /^[a-zA-Z0-9\-_@]+$/;
export const LEDGER_BASE_OFFSET = 256; // DO NOT CHANGE!
export const KV_KEY_ALLOWED_CHARS = /^[@\p{L}\p{N}_-]+$/u; // Unicode letters and numbers, undescore, hyphen and at
export const LEDGER_FILE_ID: string = "CKVD"; // Cross/KV Database
export const TRANSACTION_SIGNATURE: string = "CKT"; // Cross/Kv Transaction
export const TRANSACTION_SIGNATURE: string = "T;"; // Cross/Kv Transaction
72 changes: 71 additions & 1 deletion src/key.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export class KVKeyInstance {

if (typeof element === "string" && !KV_KEY_ALLOWED_CHARS.test(element)) {
throw new TypeError(
"String elements in the key can only contain a-zA-Z, 0-9, '-', and '_'",
"String elements in the key can only contain unicode letters, numbers, '@', '-', and '_'",
);
}
}
Expand All @@ -229,4 +229,74 @@ export class KVKeyInstance {

return this.key.join(".");
}

/**
* Checks if this key instance matches a given query, optionally including descendants.
*
* This implementation performs strict type matching, ensuring that number elements in the query only match number elements in the key, and likewise for strings.
*
* @param query The query to match against.
* @param recursive If true, the match includes descendant keys; if false, only the exact key matches.
* @returns `true` if the key matches the query (and optionally its descendants), `false` otherwise.
*/
public matchesQuery(query: KVQuery, recursive: boolean = false): boolean {
const thisKey = this.get() as KVKey;

if (!recursive && thisKey.length < query.length) {
return false;
}

if (thisKey.length > query.length && !recursive) {
return false;
}

for (let i = 0; i < query.length; i++) {
const queryElement = query[i];
const keyElement = thisKey[i];
if (typeof queryElement === "string") {
if (typeof keyElement !== "string" || queryElement !== keyElement) {
return false;
}
} else if (typeof queryElement === "number") {
if (typeof keyElement !== "number" || queryElement !== keyElement) {
return false;
}
} else if (typeof queryElement === "object") {
if (
// String comparison
(typeof keyElement === "string" &&
(queryElement.from === undefined ||
keyElement >= (queryElement.from as string)) &&
(queryElement.to === undefined ||
keyElement <= (queryElement.to as string))) ||
// Number comparison
(typeof keyElement === "number" &&
(queryElement.from === undefined ||
keyElement >= (queryElement.from as number)) &&
(queryElement.to === undefined ||
keyElement <= (queryElement.to as number)))
) {
/* Ok */
} else {
return false;
}
} else {
throw new Error(`Invalid query element type at index ${i}`);
}

// Recursively check descendants if needed
if (recursive && thisKey.length > i + 1) {
const subquery = query.slice(i + 1);
const subkey = thisKey.slice(i + 1);

if (
!new KVKeyInstance(subkey, true).matchesQuery(subquery, recursive)
) {
return false;
}
}
}

return true; // All elements match
}
}
75 changes: 69 additions & 6 deletions src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import { EventEmitter } from "node:events";
* Represents the status of a synchronization operation between the in-memory index and the on-disk ledger.
*/
export type KVSyncResultStatus =
| "noop" // No operation was performed (e.g., ledger not open)
| "ready" // The database is ready, no new data
| "blocked" // Synchronization is blocked (e.g., during a vacuum)
| "success" // The database is ready, new data were synchronized
| "ledgerInvalidated" // The ledger was invalidated and needs to be reopened
| "error"; // An error occurred during synchronization, check .error for details
| "noop" /** No operation was performed (e.g., ledger not open). */
| "ready" /** The database is ready, no new data to synchronize. */
| "blocked" /** Synchronization is temporarily blocked (e.g., during a vacuum). */
| "success" /** Synchronization completed successfully, new data was added. */
| "ledgerInvalidated" /** The ledger was invalidated and needs to be reopened. */
| "error"; /** An error occurred during synchronization. Check the `error` property for details. */

/**
* The result of a synchronization operation between the in-memory index and the on-disk ledger.
Expand All @@ -39,6 +39,24 @@ export interface KVSyncResult {
error: Error | null;
}

/**
* A function that is called when a watched transaction occurs.
*/
export interface WatchHandler {
/**
* The query used to filter the transactions.
*/
query: KVQuery;
/**
* The callback function that will be called when a transaction matches the query.
*/
callback: (transaction: KVTransactionResult) => void;
/**
* Whether to include child keys
*/
recursive: boolean;
}

/**
* Options for configuring the behavior of the KV store.
*/
Expand Down Expand Up @@ -77,6 +95,7 @@ export class KV extends EventEmitter {
private index: KVIndex = new KVIndex();
private ledger?: KVLedger;
private pendingTransactions: KVTransaction[] = [];
private watchHandlers: WatchHandler[] = [];

// Configuration
private ledgerPath?: string;
Expand Down Expand Up @@ -297,6 +316,14 @@ export class KV extends EventEmitter {
// Throw if database isn't open
this.ensureOpen();

// Check for matches in watch handlers
for (const handler of this.watchHandlers) {
if (transaction.key!.matchesQuery(handler.query, handler.recursive)) {
handler.callback(transaction.asResult());
}
}

// Add the transaction to index
switch (transaction.operation) {
case KVOperation.SET:
this.index.add(transaction.key!, offset);
Expand Down Expand Up @@ -610,6 +637,42 @@ export class KV extends EventEmitter {
);
}

/**
* Registers a callback function to be called whenever a new transaction matching the given query is added to the database.
*
* @param query - The query to match against new transactions.
* @param callback - The callback function to be called when a match is found. The callback will receive the matching transaction as its argument.
*/
public watch(
query: KVQuery,
callback: (transaction: KVTransactionResult) => void,
recursive: boolean = false,
) {
this.watchHandlers.push({ query, callback, recursive });
}

/**
* Unregisters a previously registered watch handler.
*
* Both query and callback must be a reference to the original values passed to `.watch()`
*
* @param query - The original query or handlerused to register the watch handler.
* @param callback - The callback function used to register the watch handler.
*
* @returns True on success
*/
public unwatch(
query: KVQuery,
callback: (transaction: KVTransactionResult) => void,
): boolean {
const newWatchHandlers = this.watchHandlers.filter(
(handler) => handler.query !== query || handler.callback !== callback,
);
const result = newWatchHandlers.length !== this.watchHandlers.length;
this.watchHandlers = newWatchHandlers;
return result;
}

/**
* Closes the database gracefully.
*
Expand Down
Loading

0 comments on commit 4587511

Please sign in to comment.