Skip to content

Commit

Permalink
Add transaction signature bytes. Handled corrupted DB. Ensure open da…
Browse files Browse the repository at this point in the history
…tabase on all operations. Improve docs. Add method listKeys.
  • Loading branch information
Hexagon committed May 19, 2024
1 parent fa89838 commit dd9d5ed
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 193 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ bunfig.toml

# Local
lab/
local/
lab.ts
local.ts
mydatabase*/
stress-test/

# Pup data
.pup
89 changes: 51 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ await kvStore.close();
- `async set(key, value)` - Stores a value.
- `async get(key)` - Retrieves a value.
- `async *iterate(query)` - Iterates over entries for a key.
- `listKeys(query)` - List all keys under <query>.
- `async listAll(query)` - Gets all entries for a key as an array.
- `async delete(key)` - Deletes a key-value pair.
- `async sync()` - Synchronizez the ledger with disk.
- `beginTransaction()` - Starts a transaction.
- `async endTransaction()` - Ends a transaction, returns a list of `Errors` if
any occurred.
Expand Down Expand Up @@ -192,54 +194,65 @@ objects like `{ from: 5, to: 20 }` or `{ from: "a", to: "l" }`. An empty range
["products", "book", {}, "author"]
```

## Multi-Process Synchronization
## Concurrency

`cross/kv` has a built in mechanism for synchronizing the in-memory index with
the transaction ledger, allowing for multiple processes to work with the same
database simultanously. Due to the append-only design of the ledger, each
process can update it's internal state by reading everything after the last
processed transaction. An internal watchdog actively checks for new transactions
and updates the in-memory index accordingly. The synchnization frequency can be
controlled by the option `syncIntervalMs`, which defaults to `1000` (1 second).
`cross/kv` has a built-in mechanism for synchronizing the in-memory index with
the transaction ledger, allowing multiple processes to work with the same
database simultaneously.

In single process scenarios, the watchdog can be disabled by setting the
`autoSync` option to `false`.
Due to the append-only design of the ledger, each process can update its
internal state by reading all new transactions appended since the last processed
transaction.

Subscribe to the `sync` event to receive notifications about synchronization
results and potential errors.
### Single-Process Synchronization

In single-process scenarios, explicit synchronization is unnecessary. You can
disable automatic synchronization by setting the `autoSync` option to false, and
do not have to care about running `.sync()`. This can improve performance when
only one process is accessing the database.

### Multi-Process Synchronisation

In multi-process scenarios, synchronization is crucial to ensure data
consistency across different processes. `cross/kv` manages synchronization in
the following ways:

- **Automatic Index Synchronization:** The index is automatically synchronized
at a set interval (default: 1000ms), ensuring that changes made by other
processes are reflected in all instances within a maximum of `syncIntervalMs`
milliseconds. You can adjust this interval using the `syncIntervalMs` option.

- **Manual Synchronization for Reads:** When reading data, you have two options:

- **Accept Potential Inconsistency:** By default, reads do not trigger an
immediate synchronization, which can lead to a small window of inconsistency
if another process has recently written to the database. This is generally
acceptable for most use cases.

- **Force Synchronization:** For strict consistency, you can manually trigger
synchronization before reading using the `.sync()` method:

```ts
await kv.sync(); // Ensure the most up-to-date data
const result = await kv.get(["my", "key"]); // Now read with confidence
```

### Monitoring Synchronization Events

You can subscribe to the `sync` event to receive notifications about
synchronization results and potential errors:

```typescript
const kvStore = new KV();
await kvStore.open("./mydatabase/");

// Subscribe to sync events for monitoring
kvStore.on("sync", (eventData) => {
switch (eventData.result) {
case "ready":
console.log("Everything is up to date.");
break;
case "blocked":
console.warn(
"Synchronization is temporarily blocked (e.g., during vacuum).",
);
break;
case "success":
console.log(
"Synchronization completed successfully, new transactions added to the index.",
);
break;
case "ledgerInvalidated":
console.warn(
"Ledger invalidated! The database hash been reopened and the index resynchronized to maintain consistency.",
);
break;
case "error":
// Error Handling
console.error("Synchronization error:", eventData.error);
// Log the error, report it, or take appropriate action.
break;
default:
console.warn("Unknown sync result:", eventData.result);
case "ready": // No new updates
case "blocked": // Synchronization temporarily blocked (e.g., during vacuum)
case "success": // Synchronization successful, new transactions added
case "ledgerInvalidated": // Ledger recreated, database reopened and index resynchronized
case "error": // An error occurred during synchronization
}
});
```
Expand Down
3 changes: 1 addition & 2 deletions deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cross/kv",
"version": "0.9.0",
"version": "0.9.1",
"exports": {
".": "./mod.ts"
},
Expand All @@ -18,7 +18,6 @@
"tasks": {
"check": "deno fmt --check && deno lint && deno check mod.ts && deno doc --lint mod.ts && deno test --allow-read --allow-write --allow-env --allow-net --allow-sys --allow-run --unstable-kv --coverage=cov_profile && echo \"Generating coverage\" && deno coverage cov_profile --exclude=test/ --lcov --output=cov_profile.lcov",
"check-coverage": "deno task check && genhtml cov_profile.lcov --output-directory cov_profile/html && lcov --list cov_profile.lcov && deno run --allow-net --allow-read https://deno.land/std/http/file_server.ts cov_profile/html",

"bench": "deno bench -A --unstable-kv",
"check-deps": "deno run -rA jsr:@check/deps"
}
Expand Down
7 changes: 6 additions & 1 deletion mod.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export { KV, type KVOptions } from "./src/kv.ts";
export {
KV,
type KVOptions,
type KVSyncResult,
type KVSyncResultStatus,
} from "./src/kv.ts";
export type { KVKey, KVQuery, KVQueryRange } from "./src/key.ts";
export type { KVOperation, KVTransactionResult } from "./src/transaction.ts";
7 changes: 4 additions & 3 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
// Configurable
export const LOCK_DEFAULT_MAX_RETRIES = 32;
export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 20; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total.
export const LOCK_STALE_TIMEOUT_S = 60_000;
export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 30; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total.
export const LOCK_STALE_TIMEOUT_MS = 60_000;
export const LEDGER_CURRENT_VERSION: string = "BETA";
export const SUPPORTED_LEDGER_VERSIONS: string[] = [
"ALPH",
LEDGER_CURRENT_VERSION,
];
export const LEDGER_MAX_READ_FAILURES = 10;
export const LEDGER_PREFETCH_BYTES = 2_048;
export const SYNC_INTERVAL_MS = 1_000; // Overridable with instance configuration
export const SYNC_INTERVAL_MS = 2_500; // Overridable with instance configuration

// Extremely constant
export const LEDGER_BASE_OFFSET = 1_024; // DO NOT CHANGE!
export const KV_KEY_ALLOWED_CHARS = /^[a-zA-Z0-9\-_@]+$/;
export const LEDGER_FILE_ID: string = "CKVD"; // Cross/KV Database
export const TRANSACTION_SIGNATURE: string = "CKT"; // Cross/Kv Transaction
26 changes: 25 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { KVKeyInstance, KVQueryRange } from "./key.ts";
/**
* Represents content of a node within the KVIndex tree.
*/
interface KVIndexContent {
export interface KVIndexContent {
/**
* Holds references to child nodes in the index.
*/
Expand Down Expand Up @@ -145,4 +145,28 @@ export class KVIndex {

return resultSet;
}

/**
* Retrieves the child keys of a given key.
*
* @param key - The key (or null for root level).
* @returns An array of child keys at the next level.
*/
public getChildKeys(key: KVKeyInstance | null): string[] {
let currentNode: KVIndexContent | undefined = this.index;

// Navigate to the node of the provided key (or root)
if (key !== null) {
const keyParts = key.get();
for (const part of keyParts) {
currentNode = currentNode.children.get(part as (string | number));
if (!currentNode) {
return []; // Key not found, no children
}
}
}

// Return the keys at the next level
return Array.from(currentNode.children.keys()).map(String);
}
}
Loading

0 comments on commit dd9d5ed

Please sign in to comment.