Skip to content

Commit

Permalink
Mostly-working async find implementation, a few failing tests here.
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Dec 30, 2024
1 parent 5a4c87e commit 50cdcf0
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 185 deletions.
7 changes: 0 additions & 7 deletions packages/automerge-repo/src/DocHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
this.emit("delete", { handle: this })
return { doc: A.init() }
}),
onUnload: assign(() => {
return { doc: A.init() }
}),
onUnavailable: () => {
this.emit("unavailable", { handle: this })
},
},
}).createMachine({
/** @xstate-layout N4IgpgJg5mDOIC5QAoC2BDAxgCwJYDswBKAYgFUAFAEQEEAVAUQG0AGAXUVAAcB7WXAC64e+TiAAeiAOwAOAKwA6ACxSAzKqks1ATjlTdAGhABPRAFolAJksKN2y1KtKAbFLla5AX09G0WPISkVAwAMgyMrBxIILz8QiJikggAjCzOijKqLEqqybJyLizaRqYIFpbJtro5Uo7J2o5S3r4YOATECrgQADZgJADCAEoM9MzsYrGCwqLRSeoyCtra8pa5adquySXmDjY5ac7JljLJeepKzSB+bYGdPX0AYgCSAHJUkRN8UwmziM7HCgqyVcUnqcmScmcMm2ZV2yiyzkOx1OalUFx8V1aAQ63R46AgBCgJGGAEUyAwAMp0D7RSbxGagJKHFgKOSWJTJGRSCosCpKaEmRCqbQKU5yXINeTaer6LwY67YogKXH4wkkKgAeX6AH1hjQqABNGncL70xKIJQ5RY5BHOJag6wwpRyEWImQVeT1aWrVSXBXtJUqgn4Ik0ADqNCedG1L3CYY1gwA0saYqbpuaEG4pKLksKpFDgcsCjDhTnxTKpTLdH6sQGFOgAO7oKYhl5gAQNngAJwA1iRY3R40ndSNDSm6enfpm5BkWAVkvy7bpuTCKq7ndZnfVeSwuTX-HWu2AAI4AVzgQhD6q12rILxoADVIyEaAAhMLjtM-RmIE4LVSQi4nLLDIGzOCWwLKA0cgyLBoFWNy+43B0R5nheaqajqepjuMtJfgyEh-FoixqMCoKqOyhzgYKCDOq6UIeuCSxHOoSGKgop74OgABuzbdOgABGvTXlho5GrhJpxJOP4pLulT6KoMhpJY2hzsWNF0QobqMV6LG+pc+A8BAcBiP6gSfFJ36EQgKksksKxrHamwwmY7gLKB85QjBzoAWxdZdL0FnfARST8ooLC7qoTnWBU4pyC5ViVMKBQaHUDQuM4fm3EGhJBWaU7-CysEAUp3LpEpWw0WYRw2LmqzgqciIsCxWUdI2zaXlAbYdt2PZ5dJ1n5jY2iJY1ikOIcMJHCyUWHC62hRZkUVNPKta3Kh56wJ1-VWUyzhFc64JWJCtQNBBzhQW4cHwbsrVKpxPF8YJgV4ZZIWIKkiKiiNSkqZYWjzCWaQ5hFh0AcCuR3QoR74qUknBRmzholpv3OkpRQNNRpTzaKTWKbIWR5FDxm9AIkA7e9skUYCWayLILBZGoLkUSKbIyIdpxHPoyTeN4QA */
Expand Down Expand Up @@ -588,7 +582,6 @@ export interface DocHandleEvents<T> {
"heads-changed": (payload: DocHandleEncodedChangePayload<T>) => void
change: (payload: DocHandleChangePayload<T>) => void
delete: (payload: DocHandleDeletePayload<T>) => void
unavailable: (payload: DocHandleUnavailablePayload<T>) => void
"ephemeral-message": (payload: DocHandleEphemeralMessagePayload<T>) => void
"ephemeral-message-outbound": (
payload: DocHandleOutboundEphemeralMessagePayload<T>
Expand Down
70 changes: 32 additions & 38 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,18 +254,8 @@ export class Repo extends EventEmitter<RepoEvents> {
handle.on("heads-changed", throttle(saveFn, this.saveDebounceRate))
}

handle.on("unavailable", () => {
this.#log("document unavailable", { documentId: handle.documentId })
this.emit("unavailable-document", {
documentId: handle.documentId,
})
})

// Register the document with the synchronizer. This advertises our interest in the document.
this.synchronizer.addDocument(handle.documentId)

// Preserve the old event in case anyone was using it.
this.emit("document", { handle })
void this.synchronizer.addDocument(handle)
}

#receiveMessage(message: RepoMessage) {
Expand Down Expand Up @@ -426,22 +416,20 @@ export class Repo extends EventEmitter<RepoEvents> {
* Retrieves a document by id. It gets data from the local system, but also emits a `document`
* event to advertise interest in the document.
*/
find<T>(
async find<T>(
/** The url or documentId of the handle to retrieve */
id: AnyDocumentId
): DocHandle<T> {
id: AnyDocumentId,
{ skipReady = false }: { skipReady?: boolean } = {}
): Promise<DocHandle<T>> {
const documentId = interpretAsDocumentId(id)

// If we have the handle cached, return it
if (this.#handleCache[documentId]) {
if (this.#handleCache[documentId].isUnavailable()) {
// this ensures that the event fires after the handle has been returned
setTimeout(() => {
this.#handleCache[documentId].emit("unavailable", {
handle: this.#handleCache[documentId],
})
})
if (skipReady) {
return this.#handleCache[documentId]
}
await this.#handleCache[documentId].whenReady([READY])

return this.#handleCache[documentId]
}

Expand All @@ -456,23 +444,29 @@ export class Repo extends EventEmitter<RepoEvents> {
? this.storageSubsystem.loadDoc(handle.documentId)
: Promise.resolve(null)

attemptLoad
.then(async loadedDoc => {
if (loadedDoc) {
// uhhhh, sorry if you're reading this because we were lying to the type system
handle.update(() => loadedDoc as Automerge.Doc<T>)
handle.doneLoading()
} else {
// we want to wait for the network subsystem to be ready before
// we request the document. this prevents entering unavailable during initialization.
await this.networkSubsystem.whenReady()
handle.request()
}
this.#registerHandleWithSubsystems(handle)
})
.catch(err => {
this.#log("error waiting for network", { err })
})
const loadedDoc = await attemptLoad

if (loadedDoc) {
// uhhhh, sorry if you're reading this because we were lying to the type system
handle.update(() => loadedDoc as Automerge.Doc<T>)
handle.doneLoading()
} else {
// we want to wait for the network subsystem to be ready before
// we request the document. this prevents entering unavailable during initialization.
await this.networkSubsystem.whenReady()
handle.request()
}
this.#registerHandleWithSubsystems(handle)
if (skipReady) {
console.log("Skipping ready for sync messages")
return handle
}
await handle.whenReady([READY, UNAVAILABLE])

console.log("handle state", handle.state)
if (handle.state === UNAVAILABLE) {
throw new Error(`Document ${id} is unavailable`)
}
return handle
}

Expand Down
23 changes: 12 additions & 11 deletions packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import debug from "debug"
import { DocHandle } from "../DocHandle.js"
import { parseAutomergeUrl, stringifyAutomergeUrl } from "../AutomergeUrl.js"
import { parseAutomergeUrl } from "../AutomergeUrl.js"
import { Repo } from "../Repo.js"
import { DocMessage } from "../network/messages.js"
import { AutomergeUrl, DocumentId, PeerId } from "../types.js"
Expand Down Expand Up @@ -29,12 +29,12 @@ export class CollectionSynchronizer extends Synchronizer {
}

/** Returns a synchronizer for the given document, creating one if it doesn't already exist. */
#fetchDocSynchronizer(documentId: DocumentId) {
if (!this.docSynchronizers[documentId]) {
const handle = this.repo.find(stringifyAutomergeUrl({ documentId }))
this.docSynchronizers[documentId] = this.#initDocSynchronizer(handle)
async #fetchDocSynchronizer(handle: DocHandle<unknown>) {
if (!this.docSynchronizers[handle.documentId]) {
this.docSynchronizers[handle.documentId] =
this.#initDocSynchronizer(handle)
}
return this.docSynchronizers[documentId]
return this.docSynchronizers[handle.documentId]
}

/** Creates a new docSynchronizer and sets it up to propagate messages */
Expand Down Expand Up @@ -109,7 +109,8 @@ export class CollectionSynchronizer extends Synchronizer {

this.#docSetUp[documentId] = true

const docSynchronizer = this.#fetchDocSynchronizer(documentId)
const handle = await this.repo.find(documentId, { skipReady: true })
const docSynchronizer = await this.#fetchDocSynchronizer(handle)

docSynchronizer.receiveMessage(message)

Expand All @@ -123,13 +124,13 @@ export class CollectionSynchronizer extends Synchronizer {
/**
* Starts synchronizing the given document with all peers that we share it generously with.
*/
addDocument(documentId: DocumentId) {
async addDocument(handle: DocHandle<unknown>) {
// HACK: this is a hack to prevent us from adding the same document twice
if (this.#docSetUp[documentId]) {
if (this.#docSetUp[handle.documentId]) {
return
}
const docSynchronizer = this.#fetchDocSynchronizer(documentId)
void this.#documentGenerousPeers(documentId).then(peers => {
const docSynchronizer = await this.#fetchDocSynchronizer(handle)
void this.#documentGenerousPeers(handle.documentId).then(peers => {
docSynchronizer.beginSync(peers)
})
}
Expand Down
6 changes: 6 additions & 0 deletions packages/automerge-repo/src/synchronizer/DocSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,13 @@ export class DocSynchronizer extends Synchronizer {

this.#withSyncState(message.senderId, syncState => {
this.#handle.update(doc => {
console.log(
"received sync message in state",
this.#handle.documentId,
this.#handle.state
)
const start = performance.now()

const [newDoc, newSyncState] = A.receiveSyncMessage(
doc,
syncState,
Expand Down
Loading

0 comments on commit 50cdcf0

Please sign in to comment.