Skip to content

Commit

Permalink
Merge pull request #794 from terascope/s3-sender-failure-tests
Browse files Browse the repository at this point in the history
Update s3 client to v3
  • Loading branch information
jsnoble authored Apr 26, 2023
2 parents a453c54 + 6df0381 commit 0ecf441
Show file tree
Hide file tree
Showing 30 changed files with 2,420 additions and 1,902 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ language: node_js
node_js:
- '14.21.3'
- '16.19.1'
- '18.16.0'
os:
- linux
- osx
osx_image: xcode12
branches:
only:
- master
Expand Down Expand Up @@ -39,6 +41,7 @@ script:
- teraslice-cli -v
- teraslice-cli assets build --bundle-target node14
- teraslice-cli assets build --bundle-target node16
- teraslice-cli assets build --bundle-target node18
after_success:
- bash <(curl -s https://codecov.io/bash)
before_deploy:
Expand Down
4 changes: 2 additions & 2 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "file",
"version": "2.2.2",
"version": "2.3.0",
"description": "A set of processors for working with files",
"private": true,
"workspaces": {
Expand All @@ -14,7 +14,7 @@
"build:watch": "yarn build --watch"
},
"dependencies": {
"@terascope/file-asset-apis": "^0.6.2",
"@terascope/file-asset-apis": "^0.7.0",
"@terascope/job-components": "^0.58.5",
"csvtojson": "^2.0.10",
"fs-extra": "^11.1.1",
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "file-assets-bundle",
"version": "2.2.2",
"version": "2.3.0",
"description": "A set of processors for working with files",
"repository": "https://github.com/terascope/file-assets.git",
"author": "Terascope, LLC <[email protected]>",
Expand Down Expand Up @@ -28,15 +28,14 @@
"dependencies": {},
"devDependencies": {
"@terascope/eslint-config": "^0.7.1",
"@terascope/file-asset-apis": "^0.6.2",
"@terascope/file-asset-apis": "^0.7.0",
"@terascope/job-components": "^0.58.5",
"@terascope/scripts": "^0.50.7",
"@types/fs-extra": "^11.0.1",
"@types/jest": "^29.5.0",
"@types/json2csv": "^5.0.3",
"@types/node": "^18.15.11",
"@types/node-gzip": "^1.1.0",
"aws-sdk": "^2.1351.0",
"eslint": "^8.35.0",
"fs-extra": "^11.1.1",
"jest": "^29.5.0",
Expand Down
5 changes: 3 additions & 2 deletions packages/file-asset-apis/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/file-asset-apis",
"version": "0.6.2",
"version": "0.7.0",
"description": "file reader and sender apis",
"publishConfig": {
"access": "public"
Expand All @@ -23,6 +23,8 @@
"author": "Terascope, LLC <[email protected]>",
"license": "MIT",
"dependencies": {
"@aws-sdk/client-s3": "^3.315.0",
"@aws-sdk/node-http-handler": "^3.310.0",
"@terascope/utils": "^0.45.5",
"csvtojson": "^2.0.10",
"fs-extra": "^11.1.1",
Expand All @@ -34,7 +36,6 @@
"devDependencies": {
"@terascope/scripts": "^0.50.7",
"@types/jest": "^29.5.0",
"aws-sdk": "^2.1351.0",
"jest": "^29.5.0",
"jest-extended": "^3.2.4",
"jest-fixtures": "^0.6.0",
Expand Down
56 changes: 30 additions & 26 deletions packages/file-asset-apis/src/s3/MultiPartUploader.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { EventEmitter, once } from 'events';
import type S3 from 'aws-sdk/clients/s3';
import {
Logger, pDelay, sortBy, toHumanTime
} from '@terascope/utils';
import type { S3Client, S3ClientResponse } from './client-types';
import {
createS3MultipartUpload,
uploadS3ObjectPart,
Expand Down Expand Up @@ -40,7 +40,7 @@ export class MultiPartUploader {
* These are the completed responses from the upload
* part requests
*/
private parts: S3.CompletedPart[] = [];
private parts: S3ClientResponse.CompletedPart[] = [];

/**
* This is a way of tracking the number of pending
Expand All @@ -64,13 +64,15 @@ export class MultiPartUploader {
private finished = false;

private readonly events: EventEmitter;
private readonly client: S3Client;

constructor(
readonly client: S3,
client: S3Client,
readonly bucket: string,
readonly key: string,
readonly logger: Logger
) {
this.client = client;
this.events = new EventEmitter();
// just so we don't get warnings set this to a higher number
this.events.setMaxListeners(1000);
Expand All @@ -85,16 +87,18 @@ export class MultiPartUploader {
const start = Date.now();

this.started = true;
createS3MultipartUpload(
this.client, this.bucket, this.key
).then((uploadId) => {
try {
const uploadId = await createS3MultipartUpload(
this.client, this.bucket, this.key
);

this.uploadId = uploadId;
this.logger.debug(`s3 multipart upload ${uploadId} started, took ${toHumanTime(Date.now() - start)}`);
}).catch((err) => {
} catch (err) {
this.startError = err;
}).finally(() => {
} finally {
this.events.emit(Events.StartDone);
});
}

// adding this here will ensure that
// we give the event loop some time to
Expand All @@ -106,14 +110,14 @@ export class MultiPartUploader {
* Used wait until the background request for start is finished.
* If that failed, this should throw
*/
private _waitForStart(ctx: string): Promise<void>|void {
private async _waitForStart(ctx: string): Promise<void> {
if (!this.started) {
throw Error('Expected MultiPartUploader->start to have been finished');
}

if (this.uploadId == null) {
this.logger.debug(`${ctx} waiting for upload to start`);
return once(this.events, Events.StartDone).then(() => undefined);
await once(this.events, Events.StartDone);
}

if (this.startError != null) {
Expand All @@ -137,17 +141,16 @@ export class MultiPartUploader {

this.pendingParts++;

// run this in the background
Promise.resolve()
.then(() => this._waitForStart(`part #${partNumber}`))
.then(() => this._uploadPart(body, partNumber))
.catch((err) => {
this.partUploadErrors.set(String(err), err);
})
.finally(() => {
this.pendingParts--;
this.events.emit(Events.PartDone);
});
await this._waitForStart(`part #${partNumber}`);

try {
await this._uploadPart(body, partNumber);
} catch (err) {
this.partUploadErrors.set(String(err), err);
} finally {
this.pendingParts--;
this.events.emit(Events.PartDone);
}

if (this.pendingParts > 0 || !this.uploadId) {
// adding this here will ensure that
Expand All @@ -168,14 +171,15 @@ export class MultiPartUploader {
if (this.partUploadErrors.size) {
await this._throwPartUploadError();
}

this.parts.push(await uploadS3ObjectPart(this.client, {
const { ETag } = await uploadS3ObjectPart(this.client, {
Bucket: this.bucket,
Key: this.key,
Body: body,
Body: body as any,
UploadId: this.uploadId,
PartNumber: partNumber
}));
});

this.parts.push({ PartNumber: partNumber, ETag });
}

/**
Expand Down
13 changes: 13 additions & 0 deletions packages/file-asset-apis/src/s3/client-types/client-params.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export {
GetObjectRequest,
ListObjectsRequest,
PutObjectRequest,
DeleteObjectRequest,
DeleteBucketRequest,
HeadBucketRequest,
CreateBucketRequest,
UploadPartRequest,
CompleteMultipartUploadRequest,
AbortMultipartUploadRequest,
AbortMultipartUploadCommand
} from '@aws-sdk/client-s3';
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export {
GetObjectOutput,
ListObjectsOutput,
PutObjectOutput,
DeleteObjectOutput,
ListBucketsOutput,
CreateBucketOutput,
CompletedPart,
} from '@aws-sdk/client-s3';
3 changes: 3 additions & 0 deletions packages/file-asset-apis/src/s3/client-types/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type { S3Client } from '@aws-sdk/client-s3';
export * as S3ClientParams from './client-params';
export * as S3ClientResponse from './client-response';
61 changes: 61 additions & 0 deletions packages/file-asset-apis/src/s3/createS3Client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import fs from 'fs-extra';
import { Agent } from 'https';
import { S3Client as BaseClient } from '@aws-sdk/client-s3';
import { NodeHttpHandler } from '@aws-sdk/node-http-handler';
import type { S3ClientConfig as baseConfig } from '@aws-sdk/client-s3';
import { debugLogger, has } from '@terascope/utils';
import type { S3Client } from './client-types';

export interface S3ClientConfig extends baseConfig {
sslEnabled?: boolean,
certLocation?: string,
httpOptions?: object,
secretAccessKey?: string,
accessKeyId?: string
}

export async function createS3Client(
config: S3ClientConfig,
logger = debugLogger('s3-client')
): Promise<S3Client> {
config.logger = logger;
logger.info(`Using S3 endpoint: ${config.endpoint}`);
// pull certLocation from env
// https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/node-registering-certs.html
// Instead of updating the client, we can just update the config before creating the client
if (config.sslEnabled) {
const certPath = config.certLocation ?? '/etc/ssl/certs/ca-certificates.crt';
const pathFound = await fs.exists(certPath);

if (!pathFound) {
throw new Error(
`No cert path was found in config.certLocation: "${config.certLocation}" or in default "/etc/ssl/certs/ca-certificates.crt" location`
);
}
// Assumes all certs needed are in a single bundle
const certs = await fs.readFile(certPath);

const configHttpOptions = config.httpOptions ?? {};

const httpOptions = Object.assign(
{ rejectUnauthorized: true },
configHttpOptions,
{ ca: [certs] }
);

config.requestHandler = new NodeHttpHandler({
httpsAgent: new Agent(httpOptions)
});
}

// config specified old style, need to move top level values into credentials
if (!has(config, 'credentials') && has(config, 'accessKeyId') && has(config, 'secretAccessKey')) {
const { accessKeyId, secretAccessKey } = config;
config.credentials = {
accessKeyId,
secretAccessKey
} as any;
}

return new BaseClient(config);
}
2 changes: 2 additions & 0 deletions packages/file-asset-apis/src/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ export * from './s3-fetcher';
export * from './s3-sender';
export * from './s3-slicer';
export * from './s3-reader-api';
export * from './createS3Client';
export * from './client-types';
18 changes: 9 additions & 9 deletions packages/file-asset-apis/src/s3/s3-fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Logger } from '@terascope/utils';
import type S3 from 'aws-sdk/clients/s3';
import type { S3Client } from './client-types';
import { FileSlice, ReaderConfig } from '../interfaces';
import { ChunkedFileReader, parsePath } from '../base';
import { getS3Object } from './s3-helpers';

export class S3Fetcher extends ChunkedFileReader {
protected client: S3;
protected client: S3Client;
protected readonly bucket: string;

constructor(client: S3, config: Omit<ReaderConfig, 'size'>, logger: Logger) {
constructor(client: S3Client, config: Omit<ReaderConfig, 'size'>, logger: Logger) {
super(config, logger);
const { path } = config;
const { bucket } = parsePath(path);
Expand All @@ -35,14 +35,14 @@ export class S3Fetcher extends ChunkedFileReader {
Range: `bytes=${offset}-${offset + length - 1}`
});

if (!results.Body) {
const body = results.Body;

if (body === undefined) {
throw new Error('Missing body from s3 get object request');
}
// @ts-expect-error, their types do not list added apis
const data = await body.transformToByteArray();

return this.compressor.decompress(
Buffer.isBuffer(results.Body)
? results.Body
: Buffer.from(results.Body as any)
);
return this.compressor.decompress(Buffer.from(data));
}
}
Loading

0 comments on commit 0ecf441

Please sign in to comment.