Skip to content

Commit

Permalink
alright, find is now abortable
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Jan 4, 2025
1 parent 4be34ba commit 5fa277f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 35 deletions.
78 changes: 43 additions & 35 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import type {
DocumentId,
PeerId,
} from "./types.js"
import { abortable, AbortOptions } from "./helpers/abortable.js"

function randomPeerId() {
return ("peer-" + Math.random().toString(36).slice(4)) as PeerId
Expand Down Expand Up @@ -413,61 +414,64 @@ export class Repo extends EventEmitter<RepoEvents> {
}

/**
* Retrieves a document by id. It gets data from the local system, but also emits a `document`
* event to advertise interest in the document.
* Loads a document without waiting for ready state
*/
async find<T>(
/** The url or documentId of the handle to retrieve */
id: AnyDocumentId,
{ skipReady = false }: { skipReady?: boolean } = {}
): Promise<DocHandle<T>> {
const documentId = interpretAsDocumentId(id)

async #loadDocument<T>(documentId: DocumentId): Promise<DocHandle<T>> {
// If we have the handle cached, return it
if (this.#handleCache[documentId]) {
if (skipReady) {
return this.#handleCache[documentId]
}
await this.#handleCache[documentId].whenReady([READY])

return this.#handleCache[documentId]
}

// If we don't already have the handle, make an empty one and try loading it
const handle = this.#getHandle<T>({
documentId,
}) as DocHandle<T>

// Loading & network is going to be asynchronous no matter what,
// but we want to return the handle immediately.
const attemptLoad = this.storageSubsystem
const handle = this.#getHandle<T>({ documentId })
const loadedDoc = await (this.storageSubsystem
? this.storageSubsystem.loadDoc(handle.documentId)
: Promise.resolve(null)

const loadedDoc = await attemptLoad
: Promise.resolve(null))

if (loadedDoc) {
// uhhhh, sorry if you're reading this because we were lying to the type system
// We need to cast this to <T> because loadDoc operates in <unknowns>.
// This is really where we ought to be validating the input matches <T>.
handle.update(() => loadedDoc as Automerge.Doc<T>)
handle.doneLoading()
} else {
// we want to wait for the network subsystem to be ready before
// we request the document. this prevents entering unavailable during initialization.
// Because the network subsystem might still be booting up, we wait
// here so that we don't immediately give up loading because we're still
// making our initial connection to a sync server.
await this.networkSubsystem.whenReady()
handle.request()
}
this.#registerHandleWithSubsystems(handle)
if (skipReady) {
return handle
}
await handle.whenReady([READY, UNAVAILABLE])

if (handle.state === UNAVAILABLE) {
throw new Error(`Document ${id} is unavailable`)
}
this.#registerHandleWithSubsystems(handle)
return handle
}

/**
* Retrieves a document by id. It gets data from the local system, but also emits a `document`
* event to advertise interest in the document.
*/
async find<T>(
/** The url or documentId of the handle to retrieve */
id: AnyDocumentId,
options: RepoFindOptions & AbortOptions = {}
): Promise<DocHandle<T>> {
const documentId = interpretAsDocumentId(id)
const { skipReady, signal } = options

return Promise.race([
(async () => {
const handle = await this.#loadDocument<T>(documentId)
if (!skipReady) {
await handle.whenReady([READY, UNAVAILABLE])
if (handle.state === UNAVAILABLE) {
throw new Error(`Document ${id} is unavailable`)
}
}
return handle
})(),
abortable(signal),
])
}

delete(
/** The url or documentId of the handle to delete */
id: AnyDocumentId
Expand Down Expand Up @@ -658,6 +662,10 @@ export interface RepoEvents {
"doc-metrics": (arg: DocMetrics) => void
}

export interface RepoFindOptions {
skipReady?: boolean
}

export interface DocumentPayload {
handle: DocHandle<any>
}
Expand Down
6 changes: 6 additions & 0 deletions packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,8 @@ describe("Repo", () => {

await eventPromise(aliceRepo.networkSubsystem, "peer")

// Not sure why we need this pause here, but... we do.
await pause(100)
const handle = await charlieRepo.find<TestDoc>(url)
const doc = await handle.doc()
assert.deepStrictEqual(doc, { foo: "baz" })
Expand Down Expand Up @@ -1021,6 +1023,10 @@ describe("Repo", () => {
network: [new MessageChannelNetworkAdapter(ba)],
})

// We need a proper peer status API so we can tell when the
// peer is connected. For now we just wait a bit.
await pause(50)

// The empty repo should be notified of the new peer, send it a request
// and eventually resolve the handle to "READY"
const handle = await a.find<TestDoc>(url)
Expand Down

0 comments on commit 5fa277f

Please sign in to comment.