Skip to content

Commit

Permalink
feat(transport): add the ability to switch transport API
Browse files Browse the repository at this point in the history
For the browser version of PubNub SDK, add the ability to switch between `fetch` and `xhr` APIs
(`transport` configuration option).

fix(event-engine): handshake/receive requests timeout

Fix issue because of which, in Event Engine mode, wrong timeout values have been set for requests
which create long-poll request.

refactor(request): make sure request cancels on timeout

Refactor `timeout` implementation for `fetch` transport to properly cancel request when the
timeout timer will fire.
  • Loading branch information
parfeon committed Feb 4, 2025
1 parent a204205 commit a2bd6ea
Show file tree
Hide file tree
Showing 20 changed files with 924 additions and 443 deletions.
689 changes: 393 additions & 296 deletions dist/web/pubnub.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dist/web/pubnub.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lib/core/components/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class AbstractRequest {
path: this.path,
queryParameters: this.queryParameters,
cancellable: (_d = (_c = this.params) === null || _c === void 0 ? void 0 : _c.cancellable) !== null && _d !== void 0 ? _d : false,
timeout: 10000,
timeout: 10,
identifier: this.requestIdentifier,
};
// Attach headers (if required).
Expand Down
12 changes: 7 additions & 5 deletions lib/core/pubnub-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -550,21 +550,23 @@ class PubNubCore {
}
// Complete request configuration.
const transportRequest = request.request();
const operation = request.operation();
if ((transportRequest.formData && transportRequest.formData.length > 0) ||
request.operation() === operations_1.default.PNDownloadFileOperation) {
operation === operations_1.default.PNDownloadFileOperation) {
// Set file upload / download request delay.
transportRequest.timeout = this._configuration.getFileTimeout();
}
else {
if (request.operation() === operations_1.default.PNSubscribeOperation)
if (operation === operations_1.default.PNSubscribeOperation ||
operation === operations_1.default.PNReceiveMessagesOperation)
transportRequest.timeout = this._configuration.getSubscribeTimeout();
else
transportRequest.timeout = this._configuration.getTransactionTimeout();
}
// API request processing status.
const status = {
error: false,
operation: request.operation(),
operation,
category: categories_1.default.PNAcknowledgmentCategory,
statusCode: 0,
};
Expand Down Expand Up @@ -600,8 +602,8 @@ class PubNubCore {
const apiError = !(error instanceof pubnub_api_error_1.PubNubAPIError) ? pubnub_api_error_1.PubNubAPIError.create(error) : error;
// Notify callback (if possible).
if (callback)
return callback(apiError.toStatus(request.operation()), null);
throw apiError.toPubNubError(request.operation(), 'REST API request processing error, check status for details');
return callback(apiError.toStatus(operation), null);
throw apiError.toPubNubError(operation, 'REST API request processing error, check status for details');
});
});
}
Expand Down
4 changes: 2 additions & 2 deletions lib/errors/pubnub-api-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ class PubNubAPIError extends Error {
category = categories_1.default.PNCancelledCategory;
message = 'Request cancelled';
}
else if (message.indexOf('timeout') !== -1) {
else if (message.toLowerCase().indexOf('timeout') !== -1) {
category = categories_1.default.PNTimeoutCategory;
message = 'Request timeout';
}
else if (message.indexOf('network') !== -1) {
else if (message.toLowerCase().indexOf('network') !== -1) {
category = categories_1.default.PNNetworkIssuesCategory;
message = 'Network issues';
}
Expand Down
4 changes: 2 additions & 2 deletions lib/react_native/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ const text_encoding_1 = require("text-encoding");
require("react-native-url-polyfill/auto");
const cbor_js_1 = __importDefault(require("cbor-js"));
const buffer_1 = require("buffer");
const web_react_native_transport_1 = require("../transport/web-react-native-transport");
const stringify_buffer_keys_1 = require("../core/components/stringify_buffer_keys");
const react_native_transport_1 = require("../transport/react-native-transport");
const configuration_1 = require("../core/components/configuration");
const token_manager_1 = require("../core/components/token_manager");
const middleware_1 = require("../transport/middleware");
Expand Down Expand Up @@ -62,7 +62,7 @@ class PubNub extends pubnub_common_1.PubNubCore {
const transportMiddleware = new middleware_1.PubNubMiddleware({
clientConfiguration,
tokenManager,
transport: new web_react_native_transport_1.WebReactNativeTransport(fetch, clientConfiguration.keepAlive, clientConfiguration.logVerbosity),
transport: new react_native_transport_1.ReactNativeTransport(clientConfiguration.keepAlive, clientConfiguration.logVerbosity),
});
super({
configuration: clientConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,34 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge
});
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.WebReactNativeTransport = void 0;
exports.ReactNativeTransport = void 0;
const pubnub_api_error_1 = require("../errors/pubnub-api-error");
const utils_1 = require("../core/utils");
/**
* Class representing a `fetch`-based browser and React Native transport provider.
* Class representing a React Native transport provider.
*
* @internal
*/
class WebReactNativeTransport {
class ReactNativeTransport {
/**
* Create and configure transport provider for Web and Rect environments.
*
* @param originalFetch - Pointer to the original (not monkey patched) `fetch` implementation.
* @param [keepAlive] - Whether client should try to keep connections open for reuse or not.
* @param logVerbosity - Whether verbose logs should be printed or not.
*
* @internal
*/
constructor(originalFetch, keepAlive = false, logVerbosity = false) {
constructor(keepAlive = false, logVerbosity = false) {
this.keepAlive = keepAlive;
this.logVerbosity = logVerbosity;
WebReactNativeTransport.originalFetch = originalFetch;
// Check whether `fetch` has been monkey patched or not.
if (logVerbosity && this.isFetchMonkeyPatched()) {
console.warn("[PubNub] Native Web Fetch API 'fetch' function monkey patched.");
if (!this.isFetchMonkeyPatched(WebReactNativeTransport.originalFetch))
console.info("[PubNub] Use native Web Fetch API 'fetch' implementation from iframe as APM workaround.");
else
console.warn('[PubNub] Unable receive native Web Fetch API. There can be issues with subscribe long-poll cancellation');
}
}
makeSendable(req) {
let controller;
let abortController;
if (req.cancellable) {
abortController = new AbortController();
controller = {
// Storing controller inside to prolong object lifetime.
abortController,
abort: () => abortController === null || abortController === void 0 ? void 0 : abortController.abort(),
};
}
const abortController = new AbortController();
const controller = {
// Storing controller inside to prolong object lifetime.
abortController,
abort: () => !abortController.signal.aborted && abortController.abort(),
};
return [
this.requestFromTransportRequest(req).then((request) => {
const start = new Date().getTime();
Expand All @@ -65,21 +51,27 @@ class WebReactNativeTransport {
*
* **Note:** Native Fetch API doesn't support `timeout` out-of-box.
*/
let timeoutId;
const requestTimeout = new Promise((_, reject) => {
const timeoutId = setTimeout(() => {
// Clean up.
timeoutId = setTimeout(() => {
clearTimeout(timeoutId);
reject(new Error('Request timeout'));
controller.abort();
}, req.timeout * 1000);
});
return Promise.race([
WebReactNativeTransport.originalFetch(request, {
signal: abortController === null || abortController === void 0 ? void 0 : abortController.signal,
fetch(request, {
signal: abortController.signal,
credentials: 'omit',
cache: 'no-cache',
}),
requestTimeout,
])
.then((response) => {
if (timeoutId)
clearTimeout(timeoutId);
return response;
})
.then((response) => response.arrayBuffer().then((arrayBuffer) => [response, arrayBuffer]))
.then((response) => {
const responseBody = response[1].byteLength > 0 ? response[1] : undefined;
Expand Down Expand Up @@ -177,7 +169,7 @@ class WebReactNativeTransport {
if (typeof requestBody === 'string')
outgoing += `\n\n${requestBody}`;
else
outgoing += `\n\n${WebReactNativeTransport.decoder.decode(requestBody)}`;
outgoing += `\n\n${ReactNativeTransport.decoder.decode(requestBody)}`;
}
console.log(`<<<<<`);
console.log(outgoing);
Expand All @@ -186,28 +178,17 @@ class WebReactNativeTransport {
else {
let outgoing = `[${timestamp} / ${elapsed}]\n${protocol}//${host}${pathname}\n${search}`;
if (body)
outgoing += `\n\n${WebReactNativeTransport.decoder.decode(body)}`;
outgoing += `\n\n${ReactNativeTransport.decoder.decode(body)}`;
console.log('>>>>>>');
console.log(outgoing);
console.log('-----');
}
}
/**
* Check whether original `fetch` has been monkey patched or not.
*
* @returns `true` if original `fetch` has been patched.
*
* @internal
*/
isFetchMonkeyPatched(oFetch) {
const fetchString = (oFetch !== null && oFetch !== void 0 ? oFetch : fetch).toString();
return !fetchString.includes('[native code]') && fetch.name !== 'fetch';
}
}
exports.WebReactNativeTransport = WebReactNativeTransport;
exports.ReactNativeTransport = ReactNativeTransport;
/**
* Service {@link ArrayBuffer} response decoder.
*
* @internal
*/
WebReactNativeTransport.decoder = new TextDecoder();
ReactNativeTransport.decoder = new TextDecoder();
23 changes: 22 additions & 1 deletion lib/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2753,14 +2753,35 @@ declare namespace PubNub {
validate(): void;
};

/**
* Policy, which uses linear formula to calculate next request retry attempt time.
*/
export type LinearRetryPolicyConfiguration = {
/**
* Delay between retry attempt (in seconds).
*/
delay: number;
/**
* Maximum number of retry attempts.
*/
maximumRetry: number;
};

/**
* Policy, which uses exponential formula to calculate next request retry attempt time.
*/
export type ExponentialRetryPolicyConfiguration = {
/**
* Minimum delay between retry attempts (in seconds).
*/
minimumDelay: number;
/**
* Maximum delay between retry attempts (in seconds).
*/
maximumDelay: number;
/**
* Maximum number of retry attempts.
*/
maximumRetry: number;
};

Expand Down Expand Up @@ -2915,7 +2936,7 @@ declare namespace PubNub {
*/
body?: ArrayBuffer | PubNubFileInterface | string;
/**
* For how long request should wait response from the server.
* For how long (in seconds) request should wait response from the server.
*
* @default `10` seconds.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export abstract class AbstractRequest<ResponseType> implements Request<ResponseT
path: this.path,
queryParameters: this.queryParameters,
cancellable: this.params?.cancellable ?? false,
timeout: 10000,
timeout: 10,
identifier: this.requestIdentifier,
};

Expand Down
17 changes: 9 additions & 8 deletions src/core/pubnub-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -806,22 +806,26 @@ export class PubNubCore<

// Complete request configuration.
const transportRequest = request.request();
const operation = request.operation();
if (
(transportRequest.formData && transportRequest.formData.length > 0) ||
request.operation() === RequestOperation.PNDownloadFileOperation
operation === RequestOperation.PNDownloadFileOperation
) {
// Set file upload / download request delay.
transportRequest.timeout = this._configuration.getFileTimeout();
} else {
if (request.operation() === RequestOperation.PNSubscribeOperation)
if (
operation === RequestOperation.PNSubscribeOperation ||
operation === RequestOperation.PNReceiveMessagesOperation
)
transportRequest.timeout = this._configuration.getSubscribeTimeout();
else transportRequest.timeout = this._configuration.getTransactionTimeout();
}

// API request processing status.
const status: Status = {
error: false,
operation: request.operation(),
operation,
category: StatusCategory.PNAcknowledgmentCategory,
statusCode: 0,
};
Expand Down Expand Up @@ -862,12 +866,9 @@ export class PubNubCore<
const apiError = !(error instanceof PubNubAPIError) ? PubNubAPIError.create(error) : error;

// Notify callback (if possible).
if (callback) return callback(apiError.toStatus(request.operation()), null);
if (callback) return callback(apiError.toStatus(operation), null);

throw apiError.toPubNubError(
request.operation(),
'REST API request processing error, check status for details',
);
throw apiError.toPubNubError(operation, 'REST API request processing error, check status for details');
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/types/transport-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export type TransportRequest = {
body?: ArrayBuffer | PubNubFileInterface | string;

/**
* For how long request should wait response from the server.
* For how long (in seconds) request should wait response from the server.
*
* @default `10` seconds.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/errors/pubnub-api-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ export class PubNubAPIError extends Error {
if (errorName === 'AbortError' || message.indexOf('Aborted') !== -1) {
category = StatusCategory.PNCancelledCategory;
message = 'Request cancelled';
} else if (message.indexOf('timeout') !== -1) {
} else if (message.toLowerCase().indexOf('timeout') !== -1) {
category = StatusCategory.PNTimeoutCategory;
message = 'Request timeout';
} else if (message.indexOf('network') !== -1) {
} else if (message.toLowerCase().indexOf('network') !== -1) {
category = StatusCategory.PNNetworkIssuesCategory;
message = 'Network issues';
} else if (errorName === 'TypeError') {
Expand Down
24 changes: 24 additions & 0 deletions src/event-engine/core/retryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,37 @@ export type RequestRetryPolicy = {
validate(): void;
};

/**
* Policy, which uses linear formula to calculate next request retry attempt time.
*/
export type LinearRetryPolicyConfiguration = {
/**
* Delay between retry attempt (in seconds).
*/
delay: number;

/**
* Maximum number of retry attempts.
*/
maximumRetry: number;
};

/**
* Policy, which uses exponential formula to calculate next request retry attempt time.
*/
export type ExponentialRetryPolicyConfiguration = {
/**
* Minimum delay between retry attempts (in seconds).
*/
minimumDelay: number;

/**
* Maximum delay between retry attempts (in seconds).
*/
maximumDelay: number;

/**
* Maximum number of retry attempts.
*/
maximumRetry: number;
};
4 changes: 2 additions & 2 deletions src/react_native/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import CborReader from 'cbor-js';
import { Buffer } from 'buffer';

import { ExtendedConfiguration, PlatformConfiguration } from '../core/interfaces/configuration';
import { WebReactNativeTransport } from '../transport/web-react-native-transport';
import { stringifyBufferKeys } from '../core/components/stringify_buffer_keys';
import { ReactNativeTransport } from '../transport/react-native-transport';
import { makeConfiguration } from '../core/components/configuration';
import { PubNubFileParameters } from '../file/modules/react-native';
import { TokenManager } from '../core/components/token_manager';
Expand Down Expand Up @@ -72,7 +72,7 @@ export default class PubNub extends PubNubCore<null, PubNubFileParameters> {
const transportMiddleware = new PubNubMiddleware({
clientConfiguration,
tokenManager,
transport: new WebReactNativeTransport(fetch, clientConfiguration.keepAlive, clientConfiguration.logVerbosity!),
transport: new ReactNativeTransport(clientConfiguration.keepAlive, clientConfiguration.logVerbosity!),
});

super({
Expand Down
Loading

0 comments on commit a2bd6ea

Please sign in to comment.