Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds long PAN functionality to send-pan task #3885

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packages/api/lib/pdrHelpers.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const pvl = require('@cumulus/pvl');
const { getExecution } = require('@cumulus/api-client/executions');

/**
* Generate Short PAN message
Expand All @@ -17,6 +18,38 @@ function generateShortPAN(disposition) {
);
}

async function getGranuleFromExecution(executionArn) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this function for getting some of the granule information from the execution. There could be a better alternative I'm not familiar with

const excObj = await getExecution({
prefix: process.env.stackName,
arn: executionArn,
});
return excObj.originalPayload.granules[0];
}

/**
* Generate Long PAN message
*
* @param {Object|string[]} executions - List of workflow executions
* @returns {string} the PAN message
*/
async function generateLongPAN(executions) {
const timeStamp = new Date();

const longPan = new pvl.models.PVLRoot()
.add('MESSAGE_TYPE', new pvl.models.PVLTextString('LONGPAN'))
.add('NO_OF_FILES', new pvl.models.PVLNumeric(executions.length));
/* eslint-disable no-await-in-loop */
for (const exc of executions) {
const granule = await getGranuleFromExecution(exc.arn || exc);
longPan.add('FILE_DIRECTORY', new pvl.models.PVLTextString(granule.files[0].path));
longPan.add('FILE_NAME', new pvl.models.PVLTextString(granule.granuleId));
longPan.add('DISPOSITION', new pvl.models.PVLTextString(exc.reason || 'SUCCESSFUL'));
longPan.add('TIME_STAMP', new pvl.models.PVLDateTime(timeStamp));
}
/* eslint-enable no-await-in-loop */
return pvl.jsToPVL(longPan);
}

/**
* Generate a PDRD message with a given err
*
Expand All @@ -33,5 +66,7 @@ function generatePDRD(err) {

module.exports = {
generateShortPAN,
generateLongPAN,
generatePDRD,
getGranuleFromExecution,
};
63 changes: 62 additions & 1 deletion packages/api/tests/lib/test-pdrHelpers.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,61 @@
'use strict';

const test = require('ava');
const pdrHelpers = require('../../lib/pdrHelpers');
const proxyquire = require('proxyquire');

const fakeExecutionModule = {
getExecution: () => Promise.resolve({
originalPayload: {
granules: [
{
files: [
{
name: 'test_id.nc',
path: 'test',
},
],
granuleId: 'test_id',
},
],
},
}),
};

const pdrHelpers = proxyquire(
'../../lib/pdrHelpers',
{
'@cumulus/api-client/executions': fakeExecutionModule,
}
);

// eslint-disable-next-line max-len
const regex = /MESSAGE_TYPE = "SHORTPAN";\nDISPOSITION = "SUCCESSFUL";\nTIME_STAMP = \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z;\n/;
// eslint-disable-next-line max-len
const emptyRegex = /MESSAGE_TYPE = "SHORTPAN";\nDISPOSITION = "";\nTIME_STAMP = \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z;\n/;
const longPanRegex = new RegExp(
'MESSAGE_TYPE = "LONGPAN";\\n' +
'NO_OF_FILES = 5;\\n' +
'FILE_DIRECTORY = "test";\\n' +
'FILE_NAME = "test_id";\\n' +
'DISPOSITION = "FAILED A";\\n' +
'TIME_STAMP = \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z;\\n' +
'FILE_DIRECTORY = "test";\\n' +
'FILE_NAME = "test_id";\\n' +
'DISPOSITION = "FAILED B";\\n' +
'TIME_STAMP = \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z;\\n' +
'FILE_DIRECTORY = "test";\\n' +
'FILE_NAME = "test_id";\\n' +
'DISPOSITION = "FAILED C";\\n' +
'TIME_STAMP = \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z;\\n' +
'FILE_DIRECTORY = "test";\\n' +
'FILE_NAME = "test_id";\\n' +
'DISPOSITION = "SUCCESSFUL";\\n' +
'TIME_STAMP = \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z;\\n' +
'FILE_DIRECTORY = "test";\\n' +
'FILE_NAME = "test_id";\\n' +
'DISPOSITION = "SUCCESSFUL";\\n' +
'TIME_STAMP = \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z;\\n'
);

test('generateShortPAN with a disposition', (t) => {
const disposition = 'SUCCESSFUL';
Expand All @@ -19,3 +68,15 @@ test('generateShortPAN with an empty disposition', (t) => {
const pan = pdrHelpers.generateShortPAN(disposition);
t.regex(pan, emptyRegex);
});

test('generateLongPAN', async (t) => {
const executions = [
{ arn: 'arn:failed:execution', reason: 'FAILED A' },
{ arn: 'arn:failed:execution', reason: 'FAILED B' },
{ arn: 'arn:failed:execution', reason: 'FAILED C' },
'arn:completed:execution',
'arn:completed:execution',
];
const pan = await pdrHelpers.generateLongPAN(executions);
t.regex(pan, longPanRegex);
});
9 changes: 9 additions & 0 deletions tasks/pdr-status-check/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
Lambda function handler for checking the status of a workflow (step function) execution. Expects a payload object which includes the name of a PDR.
The concurrency of SFN API calls is set to 10 by default, and it's configurable by setting the Lambda environment variable CONCURRENCY.

Make sure the line: `"ErrorPath": "$.exception.Cause"` is added to your workflow failed task inside your ingest granule workflow to ensure the error is properly propagated to this task
```json
"WorkflowFailed": {
"Type": "Fail",
"Cause": "Workflow failed",
"ErrorPath": "$.exception.Cause"
},
```

## About Cumulus

Cumulus is a cloud-based data ingest, archive, distribution and management prototype for NASA's future Earth science data streams.
Expand Down
2 changes: 1 addition & 1 deletion tasks/pdr-status-check/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function buildOutput(event, groupedExecutions) {

const parseFailedExecution = (execution) => {
let reason = 'Workflow Failed';
if (execution.output) reason = JSON.parse(execution.output).exception;
if (execution.error) reason = execution.error;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execution.error gets the error from the failed ingest granule workflow. The line "ErrorPath": "$.exception.Cause" has to be added into the WorkflowFailed step definition for the actual error to propagate. I added it to the README.

https://docs.aws.amazon.com/step-functions/latest/dg/state-fail.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I used ErrorPath instead of CausePath since I wasn't 100 percent sure if Cause was being used for something else

return { arn: execution.executionArn, reason };
};

Expand Down
5 changes: 5 additions & 0 deletions tasks/send-pan/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
"type": ["string", "null"],
"description": "The path in the provider to upload the file to.",
"default": "pans"
},
"panType": {
"type": ["string", "null"],
"description": "Determines which pan type to create: (shortPan, longPan, or longPanAlways)",
"default": "shortPan"
}
}
}
32 changes: 28 additions & 4 deletions tasks/send-pan/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,40 @@ async function sendPAN(event: HandlerEvent): Promise<HandlerOutput> {
const { config, input } = event;
const provider = config.provider;
const remoteDir = config.remoteDir || 'pans';
const panType = config.panType || 'shortPan';

const panName = input.pdr.name.replace(/\.pdr/gi, '.pan');
const panName = input.pdr.name.replace(/\.pdr/gi, '.PAN');
const uploadPath = path.join(remoteDir, panName);

if (input.running.length !== 0) {
throw new Error('Executions still running');
}

const disposition = (input.failed.length > 0) ? 'FAILED' : 'SUCCESSFUL';
const pan = pdrHelpers.generateShortPAN(disposition);
let pan;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided on 3 pan types:

  • longPan: Only creates long PANs when there's more than one granule, else just a short
  • shortPan: Always creates short PANs
  • longPanAlways: Always creates long PANs

switch (panType) {
case 'longPanAlways':
pan = await pdrHelpers.generateLongPAN([...input.completed, ...input.failed]);
log.debug('Created long PAN');
break;
case 'shortPan': {
const disposition = (input.failed.length > 0) ? 'FAILED' : 'SUCCESSFUL';
pan = pdrHelpers.generateShortPAN(disposition);
log.debug('Created short PAN');
break;
}
case 'longPan': {
if (input.failed.length + input.completed.length <= 1) {
const disposition = (input.failed.length > 0) ? 'FAILED' : 'SUCCESSFUL';
pan = pdrHelpers.generateShortPAN(disposition);
log.debug('Created short PAN');
} else {
pan = await pdrHelpers.generateLongPAN([...input.completed, ...input.failed]);
log.debug('Created long PAN');
}
break;
}
default:
throw new Error(`Unknown panType: ${panType}, must be shortPan, longPan, or longPanAlways`);
}

const localPath = path.join(tmpdir(), panName);
fs.writeFileSync(localPath, pan);
Expand Down
1 change: 1 addition & 0 deletions tasks/send-pan/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export type HandlerEvent = {
host: string,
},
remoteDir: string | null,
panType: string | null
},
input: HandlerInput,
};
8 changes: 4 additions & 4 deletions tasks/send-pan/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ test('SendPan task calls upload', async (t) => {
};

const url = `http://${event.config.provider.host}:${port}`;
const remotePath = path.join(event.config.remoteDir, `${fileNameBase}.pan`);
const remotePath = path.join(event.config.remoteDir, `${fileNameBase}.PAN`);
// Message should look like this:
// MESSAGE_TYPE = "SHORTPAN";
// DISPOSITION = "SUCCESSFUL";
Expand Down Expand Up @@ -103,7 +103,7 @@ test('SendPan task sends PAN to HTTP server', async (t) => {
test('SendPan task sends PAN to s3', async (t) => {
const remoteDir = 'pan/remote-dir';
const fileNameBase = 'test-send-s3-pdr';
const uploadPath = path.join(remoteDir, `${fileNameBase}.pan`);
const uploadPath = path.join(remoteDir, `${fileNameBase}.PAN`);
const event = {
config: {
provider: {
Expand Down Expand Up @@ -175,7 +175,7 @@ test('SendPan task throws error when provider protocol is not supported', async

test('SendPan task sends PAN to default location when remoteDir is null', async (t) => {
const fileNameBase = 'test-default-pan-path-pdr';
const uploadPath = `pans/${fileNameBase}.pan`;
const uploadPath = `pans/${fileNameBase}.PAN`;
const event = {
config: {
provider: {
Expand Down Expand Up @@ -248,7 +248,7 @@ test('SendPan task fails with executions still running', async (t) => {

test('SendPan task sends failed PAN to s3', async (t) => {
const fileNameBase = 'test-failed-pan-path-pdr';
const uploadPath = `pans/${fileNameBase}.pan`;
const uploadPath = `pans/${fileNameBase}.PAN`;
const event = {
config: {
provider: {
Expand Down