Skip to content

Commit

Permalink
update step function to support initial onboarding flow
Browse files Browse the repository at this point in the history
  • Loading branch information
jaismith committed May 17, 2024
1 parent b5eec74 commit b884c80
Show file tree
Hide file tree
Showing 4 changed files with 1,103 additions and 29 deletions.
36 changes: 25 additions & 11 deletions backend/src/handlers/export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,31 @@
logging.basicConfig(level=logging.INFO)
client = boto3.client('dynamodb')

def handler(_event, _context):
logging.info(f'requesting export from table {TABLE_ARN} to archive-bucket')
def handler(event, _context):
export_job_arn = event['exportJobArn']

dt = datetime.now()
response = None
if (export_job_arn is None):
logging.info(f'requesting export from table {TABLE_ARN} to archive-bucket')
now = datetime.now()
response = client.export_table_to_point_in_time(
TableArn=TABLE_ARN,
ExportTime=now - timedelta(minutes=5),
S3Bucket=BUCKET_NAME,
S3Prefix=f'{now.timestamp()}',
ExportFormat='DYNAMODB_JSON'
)
export_job_arn = response['ExportDescription']['ExportArn']
else:
response = client.describe_export(
ExportArn=export_job_arn
)

client.export_table_to_point_in_time(
TableArn=TABLE_ARN,
ExportTime=dt - timedelta(minutes=5),
S3Bucket=BUCKET_NAME,
S3Prefix=f'{dt.timestamp()}',
ExportFormat='DYNAMODB_JSON'
)
status = response['ExportDescription']['ExportStatus']

return { 'statusCode': 200 }
if status == 'IN_PROGRESS':
return { 'statusCode': 202, 'exportJobArn': export_job_arn }
elif status == 'FAILED' or status == 'CANCELLED':
return { 'statusCode': 500 }
else:
return { 'statusCode': 200 }
63 changes: 46 additions & 17 deletions infra/lib/flowcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,48 +219,77 @@ export class FlowcastStack extends Stack {
modelBucket.grantReadWrite(trainRole);
modelBucket.grantReadWrite(forecast);

// * export

const exportFunc = new lambda.Function(this, 'export_function', {
code: lambda.Code.fromAsset(path.join(__dirname, '../../backend/src/handlers/export')),
environment: env,
handler: 'export.handler',
runtime: lambda.Runtime.PYTHON_3_10,
architecture: lambda.Architecture.ARM_64,
timeout: cdk.Duration.seconds(30),
memorySize: 768
});
db.grantFullAccess(exportFunc);
archiveBucket.grantReadWrite(exportFunc);

// * sfn

const updateTask = new sfnTasks.LambdaInvoke(this, 'update_task', {
lambdaFunction: update,
resultPath: '$.Result'
});
const wait = new sfn.Wait(this, 'wait', {
time: sfn.WaitTime.duration(cdk.Duration.seconds(10))
});
const forecastTask = new sfnTasks.LambdaInvoke(this, 'forecast_task', {
lambdaFunction: forecast,
resultPath: '$.Result'
});
const exportTask = new sfnTasks.LambdaInvoke(this, 'export_task', {
lambdaFunction: exportFunc,
resultPath: '$.Result'
});
const trainTask = new sfnTasks.BatchSubmitJob(this, 'train_task', {
jobQueueArn: trainJobQueue.jobQueueArn,
jobDefinitionArn: trainJobDefinition.jobDefinitionArn,
jobName: 'site_onboarding_initial_train',
payload: sfn.TaskInput.fromObject({
'usgs_site.$': '$.usgs_site'
}),
resultPath: '$.Result'
});

const wait = new sfn.Wait(this, 'wait', {
time: sfn.WaitTime.duration(cdk.Duration.seconds(30))
});
const onboardCondition = sfn.Condition.booleanEquals('$.is_onboarding', true);
const failCondition = sfn.Condition.not(sfn.Condition.numberEquals('$.Result.Payload.statusCode', 200));

const exportCompleteChoice = new sfn.Choice(this, 'check_export_complete');
exportCompleteChoice
.when(sfn.Condition.numberEquals('$.Result.Payload.statusCode', 200), new sfn.Pass(this, 'export_complete'))
.otherwise(wait.next(new sfnTasks.LambdaInvoke(this, 'poll_export_task', {
lambdaFunction: exportFunc,
resultPath: '$.Result'
})).next(exportCompleteChoice));

const updateAndForecastSfn = new sfn.StateMachine(this, 'update_and_forecast', {
definitionBody: sfn.DefinitionBody.fromChainable(sfn.Chain.start(updateTask)
.next(new sfn.Choice(this, 'verify_update')
.when(failCondition, new sfn.Fail(this, 'update_failed'))
.otherwise(new sfn.Pass(this, 'update_successful'))
.afterwards())
.next(wait)
.next(new sfn.Choice(this, 'check_onboarding')
.when(onboardCondition, exportTask.next(exportCompleteChoice))
.otherwise(new sfn.Pass(this, 'not_onboarding'))
.afterwards())
.next(forecastTask)
.next(new sfn.Choice(this, 'verify_forecast')
.when(failCondition, new sfn.Fail(this, 'forecast_failed'))
.otherwise(new sfn.Pass(this, 'forecast_successful'))
.afterwards())
.next(new sfn.Succeed(this, 'update_and_forecast_successful'))),
timeout: cdk.Duration.minutes(10)
timeout: cdk.Duration.minutes(25)
});

const exportFunc = new lambda.Function(this, 'export_function', {
code: lambda.Code.fromAsset(path.join(__dirname, '../../backend/src/handlers/export')),
environment: env,
handler: 'export.handler',
runtime: lambda.Runtime.PYTHON_3_10,
architecture: lambda.Architecture.ARM_64,
timeout: cdk.Duration.seconds(30),
memorySize: 768
});
db.grantFullAccess(exportFunc);
archiveBucket.grantReadWrite(exportFunc);

// * public access url
new cdk.CfnResource(this, 'public_access_url', {
type: 'AWS::Lambda::Url',
Expand Down
1 change: 1 addition & 0 deletions infra/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
},
"dependencies": {
"@aws-cdk/aws-amplify-alpha": "^2.141.0-alpha.0",
"@aws-sdk/client-cloudfront": "^3.577.0",
"aws-cdk-lib": "^2.141.0",
"cdk-nextjs-standalone": "^4.0.0-beta.29",
"constructs": "^10.0.0",
Expand Down
Loading

0 comments on commit b884c80

Please sign in to comment.