Skip to content

Commit

Permalink
feat(infra): WIP data exports to grants account (#698)
Browse files Browse the repository at this point in the history
* feat(infra): WIP data exports to grants account

* WIP debug message

* add cgrants data exports

* update secrets

* rename data exports
  • Loading branch information
larisa17 authored Oct 21, 2024
1 parent fedac30 commit 736452e
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from data_model.models import Cache
from scorer.export_utils import (
AWSOverrideCredentials,
export_data_for_model,
get_pa_schema,
upload_to_s3,
Expand Down Expand Up @@ -108,6 +109,8 @@ def add_arguments(self, parser):
)

parser.add_argument("--filename", type=str, help="The output filename")
parser.add_argument("--s3-access-key", type=str, default="", help="The S3 access key for dedicated S3 download (like digital ocean)")
parser.add_argument("--s3-secret-access-key", type=str, default="", help="The S3 secret access key for dedicated S3 download (like digital ocean)")
parser.add_argument(
"--s3-extra-args",
type=str,
Expand All @@ -132,6 +135,8 @@ def handle(self, *args, **options):
batch_size = options["batch_size"]
s3_uri = options["s3_uri"]
filename = options["filename"]
s3_access_key = options["s3_access_key"]
s3_secret_access_key = options["s3_secret_access_key"]
format = options["format"]
data_model_names = (
[n.strip() for n in options["data_model"].split(",")]
Expand All @@ -148,6 +153,9 @@ def handle(self, *args, **options):
self.stdout.write(f"EXPORT - batch_size : '{batch_size}'")
self.stdout.write(f"EXPORT - filename : '{filename}'")


self.stdout.write(f"EXPORT - DEBUG LARISA : acc_key : '{s3_access_key}', s3_secret_access_key: '{s3_secret_access_key}'")

parsed_uri = urlparse(s3_uri)
s3_bucket_name = parsed_uri.netloc
s3_folder = parsed_uri.path.strip("/")
Expand All @@ -170,7 +178,17 @@ def handle(self, *args, **options):
self.style.SUCCESS(f"EXPORT - Data exported to '{filename}'")
)

upload_to_s3(filename, s3_folder, s3_bucket_name, extra_args)
if s3_access_key and s3_secret_access_key:
aws_override_credentials = AWSOverrideCredentials(
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_access_key,
aws_endpoint_url="",
)
upload_to_s3(
filename, s3_folder, s3_bucket_name, extra_args, aws_override_credentials
)
else:
upload_to_s3(filename, s3_folder, s3_bucket_name, extra_args)

if cloudfront_distribution_id:
client = boto3.client("cloudfront")
Expand Down
110 changes: 99 additions & 11 deletions infra/aws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,8 @@ const exportVals = createScoreExportBucketAndDomain(
);
// The following scorer dumps the Allo scorer scores to a public S3 bucket
// for the Allo team to easily pull the data

export const frequentAlloScorerDataDumpTaskDefinition = pulumi
// This will be removed after the confirmation that the new exports are working properly.
const frequentAlloScorerDataDumpTaskDefinition = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand All @@ -1456,8 +1456,6 @@ export const frequentAlloScorerDataDumpTaskDefinition = pulumi
]) +
"'",
`--s3-uri=s3://${publicDataDomain}/passport_scores/`,
// "--summary-extra-args",
// JSON.stringify({ ACL: "public-read" }),
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
Expand All @@ -1471,7 +1469,63 @@ export const frequentAlloScorerDataDumpTaskDefinition = pulumi
});
});

export const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
// Only for production
if (stack === "production") {
const frequentAlloScorerDataDumpTaskDefinitionDigitalOcean = pulumi
.all([exportVals, apiSecrets])
.apply(([_exportedVals, _apiSecrets]) => {
const digitalOceanAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_ACCESS_KEY")?.valueFrom;
const digitalOceanSecretAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_SECRET_ACCESS_KEY")?.valueFrom;
const digitalOceanS3Endpoint = op.read.parse(
`op://DevOps/passport-scorer-${stack}-env/api/GRANTS_DIGITAL_OCEAN_S3_ENDPOINT`
);
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
createScheduledTask({
name: "frequent-allo-scorer-data-dump-grants",
config: {
...baseScorerServiceConfig,
securityGroup: secgrp,
command: [
"python",
"manage.py",
"scorer_dump_data",
"--batch-size=1000",
"--database=read_replica_analytics",
"--config",
"'" +
JSON.stringify([
{
name: "registry.Score",
filter: { passport__community_id: 335 },
select_related: ["passport"],
},
]) +
"'",
`--s3-uri=s3://${digitalOceanS3Endpoint}`,
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
},
environment:apiEnvironment,

secrets: _apiSecrets.map(secret => {
if (secret.name === "S3_DATA_AWS_SECRET_ACCESS_KEY") {
return { ...secret, valueFrom: digitalOceanAccessKey}; // Replace for data dump with digital ocean credentials
}
if (secret.name === "S3_DATA_AWS_SECRET_KEY_ID") {
return { ...secret, valueFrom: digitalOceanSecretAccessKey }; // Replace for data dump with digital ocean credentials
}
return secret;
}) as secretsManager.SecretRef[],
alarmPeriodSeconds: 3600, // 1h in seconds
enableInvocationAlerts: true,
scorerSecretManagerArn: scorerSecret.arn,
});
});
});
}

const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand All @@ -1498,8 +1552,6 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
]) +
"'",
`--s3-uri=s3://${publicDataDomain}/passport_scores/335/`,
// "--summary-extra-args",
// JSON.stringify({ ACL: "public-read" }),
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
Expand All @@ -1513,7 +1565,7 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
});
});

export const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand All @@ -1540,8 +1592,6 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
]) +
"'",
`--s3-uri=s3://${publicDataDomain}/passport_scores/6608/`,
// "--summary-extra-args",
// JSON.stringify({ ACL: "public-read" }),
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
Expand All @@ -1558,7 +1608,8 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
/*
* Dump data for the eth-model V2
*/
export const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = pulumi
// this for sure
const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand Down Expand Up @@ -1588,6 +1639,43 @@ export const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = pulumi
});
});

if (stack === "production") {
const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorerDigitalOcean = pulumi
.all([exportVals, apiSecrets])
.apply(([_exportedVals, _apiSecrets]) => {
// const digitalOceanAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_ACCESS_KEY")?.valueFrom;
// const digitalOceanSecretAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_SECRET_ACCESS_KEY")?.valueFrom;
const digitalOceanS3Endpoint = op.read.parse(
`op://DevOps/passport-scorer-${stack}-env/api/GRANTS_DIGITAL_OCEAN_S3_ENDPOINT`
);
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
createScheduledTask({
name: "frequent-eth-model-v2-dump-grants",
config: {
...baseScorerServiceConfig,
securityGroup: secgrp,
command: [
"python",
"manage.py",
"scorer_dump_data_model_score",
`--s3-uri=s3://${digitalOceanS3Endpoint}`,
`--s3-access-key=$GRANTS_DIGITAL_OCEAN_ACCESS_KEY`,
`--s3-secret-access-key=$GRANTS_DIGITAL_OCEAN_SECRET_ACCESS_KEY`,
"--filename=model_scores.parquet",
"--format=parquet",
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
},
environment: apiEnvironment,
secrets: _apiSecrets,
alarmPeriodSeconds: 3600, // 1h in seconds
enableInvocationAlerts: true,
scorerSecretManagerArn: scorerSecret.arn,
});
});
});
}
export const coinbaseRevocationCheck = createScheduledTask({
name: "coinbase-revocation-check",
config: {
Expand Down
4 changes: 2 additions & 2 deletions infra/lib/scorer/scheduledTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export function createTask({
name: string;
config: ScheduledTaskConfig;
environment: secretsManager.EnvironmentVar[];
secrets: pulumi.Output<secretsManager.SecretRef[]>;
secrets: pulumi.Output<secretsManager.SecretRef[]> | secretsManager.SecretRef[];
scorerSecretManagerArn: Input<string>;
}) {
const {
Expand Down Expand Up @@ -225,7 +225,7 @@ export function createScheduledTask({
name: string;
config: ScheduledTaskConfig;
environment: secretsManager.EnvironmentVar[];
secrets: pulumi.Output<secretsManager.SecretRef[]>;
secrets: pulumi.Output<secretsManager.SecretRef[]> | secretsManager.SecretRef[];
alarmPeriodSeconds?: number;
enableInvocationAlerts?: boolean;
scorerSecretManagerArn: Input<string>;
Expand Down

0 comments on commit 736452e

Please sign in to comment.