From 35133d54a2ca6210b455a0c431ab62c0324538db Mon Sep 17 00:00:00 2001 From: Nicolas Humbert Date: Tue, 26 Dec 2023 10:58:15 +0100 Subject: [PATCH] ++ --- crrExistingObjects.js | 222 +++++++++++++++++++++++++++--------------- 1 file changed, 143 insertions(+), 79 deletions(-) diff --git a/crrExistingObjects.js b/crrExistingObjects.js index 06312574..83667117 100644 --- a/crrExistingObjects.js +++ b/crrExistingObjects.js @@ -1,42 +1,52 @@ -const { - doWhilst, eachSeries, eachLimit, waterfall, series, -} = require('async'); -const werelogs = require('werelogs'); +const http = require('http'); + +const AWS = require('aws-sdk'); +const { doWhilst, eachSeries, eachLimit, waterfall } = require('async'); + +const { Logger } = require('werelogs'); const { ObjectMD } = require('arsenal').models; -const metadataUtil = require('./CrrExistingObjects/metadataUtils'); -const logLevel = Number.parseInt(process.env.DEBUG, 10) === 1 - ? 'debug' : 'info'; -const loggerConfig = { - level: logLevel, - dump: 'error', -}; -werelogs.configure(loggerConfig); -const log = new werelogs.Logger('s3utils::crrExistingObjects'); +const BackbeatClient = require('./BackbeatClient'); +const log = new Logger('s3utils::crrExistingObjects'); const BUCKETS = process.argv[2] ? process.argv[2].split(',') : null; -const { SITE_NAME } = process.env; -let { STORAGE_TYPE } = process.env; -let { TARGET_REPLICATION_STATUS } = process.env; -const { TARGET_PREFIX } = process.env; -const WORKERS = (process.env.WORKERS - && Number.parseInt(process.env.WORKERS, 10)) || 10; -const MAX_UPDATES = (process.env.MAX_UPDATES - && Number.parseInt(process.env.MAX_UPDATES, 10)); -const MAX_SCANNED = (process.env.MAX_SCANNED - && Number.parseInt(process.env.MAX_SCANNED, 10)); -let { KEY_MARKER } = process.env; -let { VERSION_ID_MARKER } = process.env; +const ACCESS_KEY = process.env.ACCESS_KEY; +const SECRET_KEY = process.env.SECRET_KEY; +const ENDPOINT = process.env.ENDPOINT; +const SITE_NAME = process.env.SITE_NAME; +let STORAGE_TYPE = process.env.STORAGE_TYPE; +let TARGET_REPLICATION_STATUS = process.env.TARGET_REPLICATION_STATUS; +const TARGET_PREFIX = process.env.TARGET_PREFIX; +const WORKERS = (process.env.WORKERS && + Number.parseInt(process.env.WORKERS, 10)) || 10; +const MAX_UPDATES = (process.env.MAX_UPDATES && + Number.parseInt(process.env.MAX_UPDATES, 10)); +const MAX_SCANNED = (process.env.MAX_SCANNED && + Number.parseInt(process.env.MAX_SCANNED, 10)); +let KEY_MARKER = process.env.KEY_MARKER; +let VERSION_ID_MARKER = process.env.VERSION_ID_MARKER; const { GENERATE_INTERNAL_VERSION_ID } = process.env; - const LISTING_LIMIT = (process.env.LISTING_LIMIT && Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000; - const LOG_PROGRESS_INTERVAL_MS = 10000; +const AWS_SDK_REQUEST_RETRIES = 100; +const AWS_SDK_REQUEST_DELAY_MS = 30; if (!BUCKETS || BUCKETS.length === 0) { - log.fatal('No buckets given as input! Please provide ' - + 'a comma-separated list of buckets'); + log.fatal('No buckets given as input! Please provide ' + + 'a comma-separated list of buckets'); + process.exit(1); +} +if (!ENDPOINT) { + log.fatal('ENDPOINT not defined!'); + process.exit(1); +} +if (!ACCESS_KEY) { + log.fatal('ACCESS_KEY not defined'); + process.exit(1); +} +if (!SECRET_KEY) { + log.fatal('SECRET_KEY not defined'); process.exit(1); } if (!STORAGE_TYPE) { @@ -59,6 +69,48 @@ log.info('Objects with replication status ' + `${replicationStatusToProcess.join(' or ')} ` + 'will be reset to PENDING to trigger CRR'); +// Configuration for AWS S3 +const awsConfig = { + accessKeyId: ACCESS_KEY, + secretAccessKey: SECRET_KEY, + endpoint: ENDPOINT, + region: 'us-east-1', + sslEnabled: false, + s3ForcePathStyle: true, + apiVersions: { s3: '2006-03-01' }, + signatureVersion: 'v4', + signatureCache: false, + httpOptions: { + timeout: 0, + agent: new http.Agent({ keepAlive: true }), + }, +}; + +/** + * Custom backoff strategy for AWS SDK requests. + * @param {number} retryCount - The current retry attempt. + * @param {Error} error - The error that caused the retry. + * @returns {number} The delay in milliseconds before the next retry. + */ +function customBackoffStrategy(retryCount, error) { + log.error('aws sdk request error', { error, retryCount }); + // The delay is not truly exponential; it resets to the minimum after every 10 calls, + // with a maximum delay of 15 seconds. + return AWS_SDK_REQUEST_DELAY_MS * Math.pow(2, retryCount % 10); +} + +// Specific options for S3 requests +const s3SpecificOptions = { + maxRetries: AWS_SDK_REQUEST_RETRIES, + customBackoff: customBackoffStrategy, +}; + +// Create an S3 client instance +const s3 = new AWS.S3({ ...awsConfig, ...s3SpecificOptions }); + +// Create a BackbeatClient instance +const bb = new BackbeatClient(awsConfig); + let nProcessed = 0; let nSkipped = 0; let nUpdated = 0; @@ -83,34 +135,43 @@ const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS); function _objectShouldBeUpdated(objMD) { return replicationStatusToProcess.some(filter => { if (filter === 'NEW') { - return (!objMD.getReplicationInfo() - || objMD.getReplicationInfo().status === ''); + return (!objMD.getReplicationInfo() || + objMD.getReplicationInfo().status === ''); } - return (objMD.getReplicationInfo() - && objMD.getReplicationInfo().status === filter); + return (objMD.getReplicationInfo() && + objMD.getReplicationInfo().status === filter); }); } -function _markObjectPending( - bucket, - key, - versionId, - storageClass, - repConfig, - cb, -) { +function _markObjectPending(bucket, key, versionId, storageClass, + repConfig, cb) { let objMD; let skip = false; return waterfall([ // get object blob - next => metadataUtil.getMetadata({ + next => bb.getMetadata({ Bucket: bucket, Key: key, VersionId: versionId, - }, log, next), + }, next), (mdRes, next) => { - objMD = new ObjectMD(mdRes); - const md = objMD.getValue(); + // The Arsenal Object Metadata schema version 8.1 is being used for both Ring S3C and Artesca, + // and this is acceptable because the 8.1 schema only adds extra properties to the 7.10 schema. + // This is beneficial because: + // - Forward compatibility: Having the 8.1 properties in place now ensures that + // S3C is compatible with the 8.1 schema, which could be useful if we plan to upgrade + // from 7.10 to 8.1 in the future. + // - Minimal impact on current functionality: The extra properties from the 8.1 + // schema do not interfere with the current functionalities of the 7.10 environment, + // so there is no harm in keeping them. S3C should ignore them without causing any issues. + // - Simplified codebase: Not having to remove these properties simplifies the codebase of s3utils. + // This avoids the added complexity and potential errors associated with conditionally removing + // or altering metadata properties based on the version. + // - Single schema approach: Maintaining a single, unified schema approach in s3utils can make the + // codebase easier to maintain and upgrade, as opposed to having multiple branches or versions of + // the code for different schema versions. + objMD = new ObjectMD(JSON.parse(mdRes.Body)); + console.log('GET METADATA!!!', objMD); if (!_objectShouldBeUpdated(objMD)) { skip = true; return next(); @@ -133,19 +194,23 @@ function _markObjectPending( } // The object does not have an *internal* versionId, as it // was put on a nonversioned bucket: do a first metadata - // update to generate one, just passing on the existing metadata - // blob. Note that the resulting key will still be nonversioned, - // but the following update will be able to create a versioned key - // for this object, so that replication can happen. The externally - // visible version will stay "null". - return metadataUtil.putMetadata({ + // update to let cloudserver generate one, just passing on + // the existing metadata blob. Note that the resulting key + // will still be nonversioned, but the following update + // will be able to create a versioned key for this object, + // so that replication can happen. The externally visible + // version will stay "null". + return bb.putMetadata({ Bucket: bucket, Key: key, - Body: md, - }, log, (err, putRes) => { + VersionId: versionId, + ContentLength: Buffer.byteLength(mdRes.Body), + Body: mdRes.Body, + }, (err, putRes) => { if (err) { return next(err); } + console.log('PUT METADATA >>>>>>>>'); // No need to fetch the whole metadata again, simply // update the one we have with the generated versionId. objMD.setVersionId(putRes.versionId); @@ -157,8 +222,9 @@ function _markObjectPending( if (skip) { return next(); } - // Initialize replication info, if missing + // This is particularly important if the object was created before + // enabling replication on the bucket. if (!objMD.getReplicationInfo() || !objMD.getReplicationSiteStatus(storageClass)) { const { Rules, Role } = repConfig; @@ -185,13 +251,17 @@ function _markObjectPending( objMD.setReplicationSiteStatus(storageClass, 'PENDING'); objMD.setReplicationStatus('PENDING'); - objMD.updateMicroVersionId(); - const md = objMD.getValue(); - return metadataUtil.putMetadata({ + objMD.updateMicroVersionId?.(); + console.log('objMD!!!', objMD); + const md = objMD.getSerialized(); + console.log('SERIALIZED objMD!!!', md); + return bb.putMetadata({ Bucket: bucket, Key: key, + VersionId: versionId, + ContentLength: Buffer.byteLength(md), Body: md, - }, log, next); + }, next); }, ], err => { ++nProcessed; @@ -213,19 +283,19 @@ function _markObjectPending( // list object versions function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { - return metadataUtil.listObjectVersions({ + return s3.listObjectVersions({ Bucket: bucket, MaxKeys: LISTING_LIMIT, Prefix: TARGET_PREFIX, VersionIdMarker, KeyMarker, - }, log, cb); + }, cb); } function _markPending(bucket, versions, cb) { const options = { Bucket: bucket }; waterfall([ - next => metadataUtil.getBucketReplication(options, log, (err, res) => { + next => s3.getBucketReplication(options, (err, res) => { if (err) { log.error('error getting bucket replication', { error: err }); return next(err); @@ -272,18 +342,16 @@ function triggerCRROnBucket(bucketName, cb) { log.error('error listing object versions', { error: err }); return done(err); } - const versions = data.DeleteMarkers - ? data.Versions.concat(data.DeleteMarkers) : data.Versions; - return _markPending(bucket, versions, err => { - if (err) { - return done(err); - } - VersionIdMarker = data.NextVersionIdMarker; - KeyMarker = data.NextKeyMarker; - return done(); - }); - }, - ), + return _markPending( + bucket, data.Versions.concat(data.DeleteMarkers), err => { + if (err) { + return done(err); + } + VersionIdMarker = data.NextVersionIdMarker; + KeyMarker = data.NextKeyMarker; + return done(); + }); + }), () => { if (nUpdated >= MAX_UPDATES || nProcessed >= MAX_SCANNED) { _logProgress(); @@ -326,16 +394,12 @@ function triggerCRROnBucket(bucketName, cb) { _logProgress(); log.info(`completed task for bucket: ${bucket}`); return cb(); - }, + } ); } // trigger the calls to list objects and mark them for crr -series([ - next => metadataUtil.metadataClient.setup(next), - next => eachSeries(BUCKETS, triggerCRROnBucket, next), - next => metadataUtil.metadataClient.close(next), -], err => { +eachSeries(BUCKETS, triggerCRROnBucket, err => { clearInterval(logProgressInterval); if (err) { return log.error('error during task execution', { error: err });