Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSE-to-Matrix spike #2031

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/host/app/resources/card-resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ export class CardResource extends Resource<Args> {
unsubscribe: this.messageService.subscribe(
realmURL.href,
({ type, data: dataStr }) => {
console.log('received message event', type, dataStr);
if (type !== 'index') {
return;
}
Expand Down
11 changes: 11 additions & 0 deletions packages/host/app/services/matrix-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import type CommandService from './command-service';
import type LoaderService from './loader-service';
import type MatrixSDKLoader from './matrix-sdk-loader';
import type { ExtendedClient, ExtendedMatrixSDK } from './matrix-sdk-loader';
import type MessageService from './message-service';
import type RealmService from './realm';
import type RealmServerService from './realm-server';
import type ResetService from './reset';
Expand All @@ -108,6 +109,7 @@ export default class MatrixService extends Service {
@service private declare commandService: CommandService;
@service private declare realm: RealmService;
@service private declare matrixSdkLoader: MatrixSDKLoader;
@service private declare messageService: MessageService;
@service private declare realmServer: RealmServerService;
@service private declare router: RouterService;
@service private declare reset: ResetService;
Expand Down Expand Up @@ -1312,6 +1314,15 @@ export default class MatrixService extends Service {
event.content?.msgtype === APP_BOXEL_REALM_SERVER_EVENT_MSGTYPE
) {
await this.realmServer.handleEvent(event);
} else if (
event.type === 'm.room.message' &&
event.content?.msgtype === 'app.boxel.sse'
) {
// FIXME provenance should be checked
console.log('received sse event', event);
let parsedEventContent = JSON.parse(event.content.body);
console.log('relayMatrixSSE', parsedEventContent);
this.messageService.relayMatrixSSE(parsedEventContent);
}
await this.addRoomEvent(event, oldEventId);

Expand Down
22 changes: 22 additions & 0 deletions packages/host/app/services/message-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import window from 'ember-window-mock';

import qs from 'qs';

import type { RealmURLWrappedServerEvent } from '@cardstack/runtime-common/realm';

import { SessionLocalStorageKey } from '../utils/local-storage-keys';

import type NetworkService from './network';

export default class MessageService extends Service {
@tracked subscriptions: Map<string, EventSource> = new Map();
@tracked listenerCallbacks: Map<string, ((ev: ServerEvents) => void)[]> =
new Map();
@service private declare network: NetworkService;

register() {
Expand Down Expand Up @@ -48,13 +52,31 @@ export default class MessageService extends Service {
// TODO might want to consider making separate subscription methods so that
// you can subscribe to a specific type of events instead of all of the
// events...

if (!this.listenerCallbacks.has(realmURL)) {
this.listenerCallbacks.set(realmURL, []);
}
this.listenerCallbacks.get(realmURL)?.push(cb);

eventSource.addEventListener('update', cb);
eventSource.addEventListener('index', cb);
return () => {
eventSource.removeEventListener('update', cb);
eventSource.removeEventListener('index', cb);
};
}

relayMatrixSSE(realmWrappedEvent: RealmURLWrappedServerEvent) {
console.log('relaying matrix sse event', realmWrappedEvent);
this.listenerCallbacks.get(realmWrappedEvent.realmURL)?.forEach((cb) => {
console.log('callback', cb);
let eventWithStringData = {
type: realmWrappedEvent.event.type,
data: JSON.stringify(realmWrappedEvent.event.data),
};
cb(eventWithStringData);
});
}
}

function getPersistedTokenForRealm(realmURL: string) {
Expand Down
1 change: 1 addition & 0 deletions packages/runtime-common/realm-auth-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export class RealmAuthClient {
options?: Options,
) {
this.isRealmServerAuth = Boolean(options?.authWithRealmServer);
console.log('i exist', realmURL, options);
}

get jwt(): string | undefined {
Expand Down
52 changes: 52 additions & 0 deletions packages/runtime-common/realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ interface UpdateItem {

type ServerEvents = UpdateEvent | IndexEvent | MessageEvent;

type RealmURLWrappedServerEvent = {
realmURL: string;
event: ServerEvents;
};

interface UpdateEvent {
type: 'update';
data: UpdateEventData;
Expand Down Expand Up @@ -1863,6 +1868,7 @@ export class Realm {
}

private listeningClients: WritableStream[] = [];
private listeningUsers: string[] = [];

private async subscribe(
request: Request,
Expand Down Expand Up @@ -1914,6 +1920,26 @@ export class Realm {
});
}

let authorizationString = request.headers.get('Authorization');

if (!authorizationString) {
throw new AuthenticationError(
AuthenticationErrorMessages.MissingAuthHeader,
);
}
let tokenString = authorizationString.replace('Bearer ', ''); // Parse the JWT

let token: TokenClaims;

try {
token = this.#adapter.verifyJWT(tokenString, this.#realmSecretSeed);
this.listeningUsers.push(token.user);
} catch (e) {
console.log('error storing listening user', e);
}

console.log('listeningUsers now', this.listeningUsers);

this.listeningClients.push(writable);
this.sendServerEvent({
type: 'message',
Expand Down Expand Up @@ -1968,6 +1994,32 @@ export class Realm {
}

private async sendServerEvent(event: ServerEvents): Promise<void> {
if (['update', 'index'].includes(event.type)) {
console.log(
`skipping sending ${event.type} event, sending via Matrix instead`,
event,
);

// FIXME duplicated in multiple places
let dmRooms =
(await this.#matrixClient.getAccountData<Record<string, string>>(
'boxel.session-rooms',
)) ?? {};

for (let user of this.listeningUsers) {
let roomId = dmRooms[user];
await this.#matrixClient.sendEvent(roomId, 'm.room.message', {
body: JSON.stringify({
realmURL: this.url,
event,
}),
msgtype: 'app.boxel.sse',
format: 'app.boxel.sse-format',
});
}

return;
}
this.#log.debug(
`sending updates to ${this.listeningClients.length} clients`,
);
Expand Down
Loading