Skip to content

Commit

Permalink
++
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas2bert committed Dec 26, 2023
1 parent 8242a1d commit 35133d5
Showing 1 changed file with 143 additions and 79 deletions.
222 changes: 143 additions & 79 deletions crrExistingObjects.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,52 @@
const {
doWhilst, eachSeries, eachLimit, waterfall, series,
} = require('async');
const werelogs = require('werelogs');
const http = require('http');

const AWS = require('aws-sdk');
const { doWhilst, eachSeries, eachLimit, waterfall } = require('async');

const { Logger } = require('werelogs');
const { ObjectMD } = require('arsenal').models;
const metadataUtil = require('./CrrExistingObjects/metadataUtils');

const logLevel = Number.parseInt(process.env.DEBUG, 10) === 1
? 'debug' : 'info';
const loggerConfig = {
level: logLevel,
dump: 'error',
};
werelogs.configure(loggerConfig);
const log = new werelogs.Logger('s3utils::crrExistingObjects');
const BackbeatClient = require('./BackbeatClient');

const log = new Logger('s3utils::crrExistingObjects');
const BUCKETS = process.argv[2] ? process.argv[2].split(',') : null;
const { SITE_NAME } = process.env;
let { STORAGE_TYPE } = process.env;
let { TARGET_REPLICATION_STATUS } = process.env;
const { TARGET_PREFIX } = process.env;
const WORKERS = (process.env.WORKERS
&& Number.parseInt(process.env.WORKERS, 10)) || 10;
const MAX_UPDATES = (process.env.MAX_UPDATES
&& Number.parseInt(process.env.MAX_UPDATES, 10));
const MAX_SCANNED = (process.env.MAX_SCANNED
&& Number.parseInt(process.env.MAX_SCANNED, 10));
let { KEY_MARKER } = process.env;
let { VERSION_ID_MARKER } = process.env;
const ACCESS_KEY = process.env.ACCESS_KEY;
const SECRET_KEY = process.env.SECRET_KEY;
const ENDPOINT = process.env.ENDPOINT;
const SITE_NAME = process.env.SITE_NAME;
let STORAGE_TYPE = process.env.STORAGE_TYPE;
let TARGET_REPLICATION_STATUS = process.env.TARGET_REPLICATION_STATUS;
const TARGET_PREFIX = process.env.TARGET_PREFIX;
const WORKERS = (process.env.WORKERS &&
Number.parseInt(process.env.WORKERS, 10)) || 10;
const MAX_UPDATES = (process.env.MAX_UPDATES &&
Number.parseInt(process.env.MAX_UPDATES, 10));
const MAX_SCANNED = (process.env.MAX_SCANNED &&
Number.parseInt(process.env.MAX_SCANNED, 10));
let KEY_MARKER = process.env.KEY_MARKER;
let VERSION_ID_MARKER = process.env.VERSION_ID_MARKER;
const { GENERATE_INTERNAL_VERSION_ID } = process.env;

const LISTING_LIMIT = (process.env.LISTING_LIMIT
&& Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000;

const LOG_PROGRESS_INTERVAL_MS = 10000;
const AWS_SDK_REQUEST_RETRIES = 100;
const AWS_SDK_REQUEST_DELAY_MS = 30;

if (!BUCKETS || BUCKETS.length === 0) {
log.fatal('No buckets given as input! Please provide '
+ 'a comma-separated list of buckets');
log.fatal('No buckets given as input! Please provide ' +
'a comma-separated list of buckets');
process.exit(1);
}
if (!ENDPOINT) {
log.fatal('ENDPOINT not defined!');
process.exit(1);
}
if (!ACCESS_KEY) {
log.fatal('ACCESS_KEY not defined');
process.exit(1);
}
if (!SECRET_KEY) {
log.fatal('SECRET_KEY not defined');
process.exit(1);
}
if (!STORAGE_TYPE) {
Expand All @@ -59,6 +69,48 @@ log.info('Objects with replication status '
+ `${replicationStatusToProcess.join(' or ')} `
+ 'will be reset to PENDING to trigger CRR');

// Configuration for AWS S3
const awsConfig = {
accessKeyId: ACCESS_KEY,
secretAccessKey: SECRET_KEY,
endpoint: ENDPOINT,
region: 'us-east-1',
sslEnabled: false,
s3ForcePathStyle: true,
apiVersions: { s3: '2006-03-01' },
signatureVersion: 'v4',
signatureCache: false,
httpOptions: {
timeout: 0,
agent: new http.Agent({ keepAlive: true }),
},
};

/**
* Custom backoff strategy for AWS SDK requests.
* @param {number} retryCount - The current retry attempt.
* @param {Error} error - The error that caused the retry.
* @returns {number} The delay in milliseconds before the next retry.
*/
function customBackoffStrategy(retryCount, error) {
log.error('aws sdk request error', { error, retryCount });
// The delay is not truly exponential; it resets to the minimum after every 10 calls,
// with a maximum delay of 15 seconds.
return AWS_SDK_REQUEST_DELAY_MS * Math.pow(2, retryCount % 10);
}

// Specific options for S3 requests
const s3SpecificOptions = {
maxRetries: AWS_SDK_REQUEST_RETRIES,
customBackoff: customBackoffStrategy,
};

// Create an S3 client instance
const s3 = new AWS.S3({ ...awsConfig, ...s3SpecificOptions });

// Create a BackbeatClient instance
const bb = new BackbeatClient(awsConfig);

let nProcessed = 0;
let nSkipped = 0;
let nUpdated = 0;
Expand All @@ -83,34 +135,43 @@ const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS);
function _objectShouldBeUpdated(objMD) {
return replicationStatusToProcess.some(filter => {
if (filter === 'NEW') {
return (!objMD.getReplicationInfo()
|| objMD.getReplicationInfo().status === '');
return (!objMD.getReplicationInfo() ||
objMD.getReplicationInfo().status === '');
}
return (objMD.getReplicationInfo()
&& objMD.getReplicationInfo().status === filter);
return (objMD.getReplicationInfo() &&
objMD.getReplicationInfo().status === filter);
});
}

function _markObjectPending(
bucket,
key,
versionId,
storageClass,
repConfig,
cb,
) {
function _markObjectPending(bucket, key, versionId, storageClass,
repConfig, cb) {
let objMD;
let skip = false;
return waterfall([
// get object blob
next => metadataUtil.getMetadata({
next => bb.getMetadata({
Bucket: bucket,
Key: key,
VersionId: versionId,
}, log, next),
}, next),
(mdRes, next) => {
objMD = new ObjectMD(mdRes);
const md = objMD.getValue();
// The Arsenal Object Metadata schema version 8.1 is being used for both Ring S3C and Artesca,
// and this is acceptable because the 8.1 schema only adds extra properties to the 7.10 schema.
// This is beneficial because:
// - Forward compatibility: Having the 8.1 properties in place now ensures that
// S3C is compatible with the 8.1 schema, which could be useful if we plan to upgrade
// from 7.10 to 8.1 in the future.
// - Minimal impact on current functionality: The extra properties from the 8.1
// schema do not interfere with the current functionalities of the 7.10 environment,
// so there is no harm in keeping them. S3C should ignore them without causing any issues.
// - Simplified codebase: Not having to remove these properties simplifies the codebase of s3utils.
// This avoids the added complexity and potential errors associated with conditionally removing
// or altering metadata properties based on the version.
// - Single schema approach: Maintaining a single, unified schema approach in s3utils can make the
// codebase easier to maintain and upgrade, as opposed to having multiple branches or versions of
// the code for different schema versions.
objMD = new ObjectMD(JSON.parse(mdRes.Body));
console.log('GET METADATA!!!', objMD);
if (!_objectShouldBeUpdated(objMD)) {
skip = true;
return next();
Expand All @@ -133,19 +194,23 @@ function _markObjectPending(
}
// The object does not have an *internal* versionId, as it
// was put on a nonversioned bucket: do a first metadata
// update to generate one, just passing on the existing metadata
// blob. Note that the resulting key will still be nonversioned,
// but the following update will be able to create a versioned key
// for this object, so that replication can happen. The externally
// visible version will stay "null".
return metadataUtil.putMetadata({
// update to let cloudserver generate one, just passing on
// the existing metadata blob. Note that the resulting key
// will still be nonversioned, but the following update
// will be able to create a versioned key for this object,
// so that replication can happen. The externally visible
// version will stay "null".
return bb.putMetadata({
Bucket: bucket,
Key: key,
Body: md,
}, log, (err, putRes) => {
VersionId: versionId,
ContentLength: Buffer.byteLength(mdRes.Body),
Body: mdRes.Body,
}, (err, putRes) => {
if (err) {
return next(err);
}
console.log('PUT METADATA >>>>>>>>');
// No need to fetch the whole metadata again, simply
// update the one we have with the generated versionId.
objMD.setVersionId(putRes.versionId);
Expand All @@ -157,8 +222,9 @@ function _markObjectPending(
if (skip) {
return next();
}

// Initialize replication info, if missing
// This is particularly important if the object was created before
// enabling replication on the bucket.
if (!objMD.getReplicationInfo()
|| !objMD.getReplicationSiteStatus(storageClass)) {
const { Rules, Role } = repConfig;
Expand All @@ -185,13 +251,17 @@ function _markObjectPending(

objMD.setReplicationSiteStatus(storageClass, 'PENDING');
objMD.setReplicationStatus('PENDING');
objMD.updateMicroVersionId();
const md = objMD.getValue();
return metadataUtil.putMetadata({
objMD.updateMicroVersionId?.();
console.log('objMD!!!', objMD);
const md = objMD.getSerialized();
console.log('SERIALIZED objMD!!!', md);
return bb.putMetadata({
Bucket: bucket,
Key: key,
VersionId: versionId,
ContentLength: Buffer.byteLength(md),
Body: md,
}, log, next);
}, next);
},
], err => {
++nProcessed;
Expand All @@ -213,19 +283,19 @@ function _markObjectPending(

// list object versions
function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) {
return metadataUtil.listObjectVersions({
return s3.listObjectVersions({
Bucket: bucket,
MaxKeys: LISTING_LIMIT,
Prefix: TARGET_PREFIX,
VersionIdMarker,
KeyMarker,
}, log, cb);
}, cb);
}

function _markPending(bucket, versions, cb) {
const options = { Bucket: bucket };
waterfall([
next => metadataUtil.getBucketReplication(options, log, (err, res) => {
next => s3.getBucketReplication(options, (err, res) => {
if (err) {
log.error('error getting bucket replication', { error: err });
return next(err);
Expand Down Expand Up @@ -272,18 +342,16 @@ function triggerCRROnBucket(bucketName, cb) {
log.error('error listing object versions', { error: err });
return done(err);
}
const versions = data.DeleteMarkers
? data.Versions.concat(data.DeleteMarkers) : data.Versions;
return _markPending(bucket, versions, err => {
if (err) {
return done(err);
}
VersionIdMarker = data.NextVersionIdMarker;
KeyMarker = data.NextKeyMarker;
return done();
});
},
),
return _markPending(
bucket, data.Versions.concat(data.DeleteMarkers), err => {
if (err) {
return done(err);
}
VersionIdMarker = data.NextVersionIdMarker;
KeyMarker = data.NextKeyMarker;
return done();
});
}),
() => {
if (nUpdated >= MAX_UPDATES || nProcessed >= MAX_SCANNED) {
_logProgress();
Expand Down Expand Up @@ -326,16 +394,12 @@ function triggerCRROnBucket(bucketName, cb) {
_logProgress();
log.info(`completed task for bucket: ${bucket}`);
return cb();
},
}
);
}

// trigger the calls to list objects and mark them for crr
series([
next => metadataUtil.metadataClient.setup(next),
next => eachSeries(BUCKETS, triggerCRROnBucket, next),
next => metadataUtil.metadataClient.close(next),
], err => {
eachSeries(BUCKETS, triggerCRROnBucket, err => {
clearInterval(logProgressInterval);
if (err) {
return log.error('error during task execution', { error: err });
Expand Down

0 comments on commit 35133d5

Please sign in to comment.