Skip to content

Commit

Permalink
preid release
Browse files Browse the repository at this point in the history
  • Loading branch information
zxl629 committed Feb 24, 2025
1 parent 677f466 commit 997685b
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push-preid-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
push:
branches:
# Change this to your branch name where "example-preid" corresponds to the preid you want your changes released on
- feat/example-preid-branch/main
- feat/websocket-publish/main

jobs:
e2e:
Expand Down
28 changes: 28 additions & 0 deletions packages/api-graphql/__tests__/events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,34 @@ describe('Events client', () => {
});
});

describe('publish', () => {
test('happy publish', async () => {
const channel = await events.connect('/');

channel.publish({ some: 'data' });
});

describe('auth modes', () => {
let mockProvider: typeof AppSyncEventProvider;

beforeEach(() => {
mockProvider = AppSyncEventProvider;
});

for (const authMode of authModes) {
test(`auth override: ${authMode}`, async () => {
const channel = await events.connect('/');

channel.publish({ some: 'data' });

expect(mockProvider.publish).toHaveBeenCalledWith(
expect.objectContaining({ authenticationType: authMode }),
);
});
}
});
});

describe('post', () => {
let mockReq: typeof appsyncRequest;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { CustomHeaders } from '@aws-amplify/data-schema/runtime';
import { DEFAULT_KEEP_ALIVE_TIMEOUT, MESSAGE_TYPES } from '../constants';
import { AWSWebSocketProvider } from '../AWSWebSocketProvider';
import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders';
import { serializeEvents } from '../../internals/events/utils';

// resolved/actual AuthMode values. identityPool gets resolves to IAM upstream in InternalGraphQLAPI._graphqlSubscribe
type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
Expand Down Expand Up @@ -97,14 +98,14 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
query,
apiKey,
region,
variables,
} = options;

// This will be needed for WS publish
// const data = {
// events: [variables],
// };

const serializedData = JSON.stringify({ channel: query });
const data = {
channel: query,
events: variables !== undefined ? serializeEvents(variables) : undefined,
};
const serializedData = JSON.stringify(data);

const headers = {
...(await awsRealTimeHeaderBasedAuth({
Expand All @@ -121,22 +122,23 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
[USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails),
};

// Commented out code will be needed for WS publish
const subscriptionMessage = {
id: subscriptionId,
channel: query,
// events: [JSON.stringify(variables)],
events: variables !== undefined ? serializeEvents(variables) : undefined,
authorization: {
...headers,
},
// payload: {
// events: serializedData,
// extensions: {
// authorization: {
// ...headers,
// },
// },
// },
payload: {
events:
variables !== undefined ? serializeEvents(variables) : undefined,
channel: query,
extensions: {
authorization: {
...headers,
},
},
},
type: publish
? MESSAGE_TYPES.EVENT_PUBLISH
: MESSAGE_TYPES.EVENT_SUBSCRIBE,
Expand Down
37 changes: 29 additions & 8 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,43 @@ export abstract class AWSWebSocketProvider {
'message',
publishListener,
);

cleanup();
resolve();
}

if (data.erroredEvents && data.erroredEvents.length > 0) {
// TODO: handle errors
if (data.errors && data.errors.length > 0) {
const errorTypes = data.errors.map((error: any) => error.errorType);
cleanup();
reject(new Error(`Publish errors: ${errorTypes.join(', ')}`));
}
};
this.awsRealTimeSocket.addEventListener('message', publishListener);
this.awsRealTimeSocket.addEventListener('close', () => {

const errorListener = (error: Event) => {
cleanup();
reject(new Error(`WebSocket error: ${error}`));
};

const closeListener = () => {
cleanup();
reject(new Error('WebSocket is closed'));
});
//
// this.awsRealTimeSocket.addEventListener('error', publishListener);
};

const cleanup = () => {
this.awsRealTimeSocket?.removeEventListener(
'message',
publishListener,
);
this.awsRealTimeSocket?.removeEventListener('error', errorListener);
this.awsRealTimeSocket?.removeEventListener('close', closeListener);
};

this.awsRealTimeSocket.addEventListener('message', publishListener);
this.awsRealTimeSocket.addEventListener('error', errorListener);
this.awsRealTimeSocket.addEventListener('close', closeListener);

this.awsRealTimeSocket.send(serializedSubscriptionMessage);
} else {
reject(new Error('WebSocket is not connected'));
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/api-graphql/src/internals/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async function connect(
};

// WS publish is not enabled in the service yet. It will be a follow up feature
const _pub = async (
const pub = async (
event: DocumentType,
pubOptions?: EventsOptions,
): Promise<any> => {
Expand All @@ -94,7 +94,7 @@ async function connect(
return {
subscribe: sub,
close,
// publish: pub,
publish: pub,
};
}

Expand Down
28 changes: 27 additions & 1 deletion packages/api-graphql/src/internals/events/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

import { Subscription } from 'rxjs';
import type { GraphQLAuthMode } from '@aws-amplify/core/internals/utils';
import type {
DocumentType,
GraphQLAuthMode,
} from '@aws-amplify/core/internals/utils';

export interface SubscriptionObserver<T> {
next(value: T): void;
Expand Down Expand Up @@ -39,6 +42,29 @@ export interface EventsChannel {
observer: SubscriptionObserver<any>,
subOptions?: EventsOptions,
): Subscription;
/**
* @experimental API may change in future versions
*
* Publish events to a channel via WebSocket
*
* @example
* const channel = await events.connect("default/channel")
*
* await channel.publish({ some: "data" });
*
* @example // event batching
* await channel.publish("default/channel", [{ some: "event" }, { some: "event2" }])
*
* @example // authMode override
* await channel.publish({ some: "data" }, { authMode: "userPool" });
*
* @param event - JSON-serializable value or an array of values
* @param pubOptions - request overrides: `authMode`, `authToken`
*
* @returns void on success
* @throws on error
*/
publish(event: DocumentType, pubOptions?: EventsOptions): Promise<any>;
/**
* @experimental API may change in future versions
*
Expand Down
2 changes: 1 addition & 1 deletion packages/api-graphql/src/internals/events/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const configure = () => {
};

/**
* Event API expects and array of JSON strings
* Event API expects an array of JSON strings
*
* @param events - JSON-serializable value or an array of values
* @returns array of JSON strings
Expand Down

0 comments on commit 997685b

Please sign in to comment.