Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zxl629 committed Feb 11, 2025
1 parent f512b65 commit a29d4a1
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push-preid-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
push:
branches:
# Change this to your branch name where "example-preid" corresponds to the preid you want your changes released on
- feat/example-preid-branch/main
- feat/websocket-event/main

jobs:
e2e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,16 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
query,
apiKey,
region,
variables,
} = options;

// This will be needed for WS publish
// const data = {
// events: [variables],
// };
const data = {
channel: query,
events: [variables],
};

const serializedData = JSON.stringify({ channel: query });
const serializedData = JSON.stringify(data);

const headers = {
...(await awsRealTimeHeaderBasedAuth({
Expand All @@ -125,18 +127,18 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
const subscriptionMessage = {
id: subscriptionId,
channel: query,
// events: [JSON.stringify(variables)],
events: [JSON.stringify(variables)],
authorization: {
...headers,
},
// payload: {
// events: serializedData,
// extensions: {
// authorization: {
// ...headers,
// },
// },
// },
payload: {
events: serializedData,
extensions: {
authorization: {
...headers,
},
},
},
type: publish
? MESSAGE_TYPES.EVENT_PUBLISH
: MESSAGE_TYPES.EVENT_SUBSCRIBE,
Expand Down
42 changes: 36 additions & 6 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ export abstract class AWSWebSocketProvider {

return new Promise((resolve, reject) => {
if (this.awsRealTimeSocket) {
const timeoutId = setTimeout(() => {
cleanup();
reject(new Error('Publish operation timed out'));
}, 30000); // 30 seconds timeout

const publishListener = (event: MessageEvent) => {
const data = JSON.parse(event.data);
if (data.id === subscriptionId && data.type === 'publish_success') {
Expand All @@ -263,22 +268,47 @@ export abstract class AWSWebSocketProvider {
'message',
publishListener,
);

cleanup();
resolve();
}

if (data.erroredEvents && data.erroredEvents.length > 0) {
// TODO: handle errors
const errors = data.erroredEvents.map(
(errorEvent: any) => errorEvent.error,
);
cleanup();
reject(new Error(`Publish errors: ${errors.join(', ')}`));
}
};
this.awsRealTimeSocket.addEventListener('message', publishListener);
this.awsRealTimeSocket.addEventListener('close', () => {

const errorListener = (error: Event) => {
cleanup();
reject(new Error(`WebSocket error: ${error}`));
};

const closeListener = () => {
cleanup();
reject(new Error('WebSocket is closed'));
});
//
// this.awsRealTimeSocket.addEventListener('error', publishListener);
};

const cleanup = () => {
clearTimeout(timeoutId);
this.awsRealTimeSocket?.removeEventListener(
'message',
publishListener,
);
this.awsRealTimeSocket?.removeEventListener('error', errorListener);
this.awsRealTimeSocket?.removeEventListener('close', closeListener);
};

this.awsRealTimeSocket.addEventListener('message', publishListener);
this.awsRealTimeSocket.addEventListener('error', errorListener);
this.awsRealTimeSocket.addEventListener('close', closeListener);

this.awsRealTimeSocket.send(serializedSubscriptionMessage);
} else {
reject(new Error('WebSocket is not connected'));
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/api-graphql/src/internals/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async function connect(
};

// WS publish is not enabled in the service yet. It will be a follow up feature
const _pub = async (
const pub = async (
event: DocumentType,
pubOptions?: EventsOptions,
): Promise<any> => {
Expand All @@ -94,7 +94,7 @@ async function connect(
return {
subscribe: sub,
close,
// publish: pub,
publish: pub,
};
}

Expand Down
10 changes: 10 additions & 0 deletions packages/api-graphql/src/internals/events/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Subscription } from 'rxjs';
import type { GraphQLAuthMode } from '@aws-amplify/core/internals/utils';
import { DocumentType } from '@aws-amplify/core/internals/utils';

export interface SubscriptionObserver<T> {
next(value: T): void;
Expand Down Expand Up @@ -51,6 +52,15 @@ export interface EventsChannel {
*
*/
close(): void;
/**
*
* @param event
* @param pubOptions
*
* @example
* await channel.publish({ some: "data" });
*/
publish(event: DocumentType, pubOptions?: EventsOptions): Promise<any>;
}

export type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
Expand Down

0 comments on commit a29d4a1

Please sign in to comment.