diff --git a/src/main/java/ifs_mover/ObjectMover.java b/src/main/java/ifs_mover/ObjectMover.java index 06aa574..c8a1240 100644 --- a/src/main/java/ifs_mover/ObjectMover.java +++ b/src/main/java/ifs_mover/ObjectMover.java @@ -12,6 +12,9 @@ package ifs_mover; import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -472,13 +475,13 @@ private boolean moveObject(String path, boolean isDelete, boolean isFile, String try { if (isDelete) { - targetRepository.deleteObject(targetBucket, targetPath, versionId); + targetRepository.deleteObject(targetBucket, targetPath, null); logger.info("delete success : {}", sourcePath); } else { if (isFile) { if (multipartInfo != null) { // for swift large file (more than 5G) - String uploadId = targetRepository.startMultipart(targetBucket, targetPath); + String uploadId = targetRepository.startMultipart(targetBucket, targetPath, null); List partList = new ArrayList(); String[] multiPath = multipartInfo.split("/", 2); int partNumber = 0; @@ -499,30 +502,39 @@ private boolean moveObject(String path, boolean isDelete, boolean isFile, String targetRepository.setTagging(targetBucket, targetPath, tagSet); } logger.info("move success : {}", path); - } else if ((!type.equalsIgnoreCase(Repository.IFS_FILE) && moveSize != 0 && size > moveSize) - || (type.equalsIgnoreCase(Repository.SWIFT) && size > GIGA_BYTES)) { + } else if ((!type.equalsIgnoreCase(Repository.IFS_FILE) && size > GIGA_BYTES) + || (!type.equalsIgnoreCase(Repository.IFS_FILE) && (moveSize != 0 && size > moveSize))) { // send multipart + if (versionId != null && !versionId.isEmpty()) { + logger.debug("send multipart : {}:{}, size {}", path, versionId, size); + } else { + logger.debug("send multipart : {}, size {}", path, size); + } long limitSize = 0L; if (moveSize == 0) { limitSize = 100 * MEGA_BYTES; } else { limitSize = moveSize; } - String uploadId = targetRepository.startMultipart(targetBucket, targetPath); + ObjectData objData = sourceRepository.getObject(sourceBucket, sourcePath, versionId); + String uploadId = targetRepository.startMultipart(targetBucket, targetPath, objData.getMetadata()); List partList = new ArrayList(); int partNumber = 1; - ObjectData data = null; + for (long i = 0; i < size; i += limitSize, partNumber++) { long start = i; long end = i + limitSize - 1; if (end >= size) { end = size - 1; } - data = sourceRepository.getObject(sourceBucket, sourcePath, null, start, end); + + ObjectData data = sourceRepository.getObject(sourceBucket, sourcePath, versionId, start, end); + if (data != null) { String partETag = targetRepository.uploadPart(targetBucket, targetPath, uploadId, data.getInputStream(), partNumber, data.getSize()); partList.add(new PartETag(partNumber, partETag)); data.getInputStream().close(); + logger.debug("{} - move part : {}, size : {}", path, partNumber, data.getSize()); } } targetRepository.completeMultipart(targetBucket, targetPath, uploadId, partList); @@ -557,7 +569,7 @@ private boolean moveObject(String path, boolean isDelete, boolean isFile, String } if (!type.equalsIgnoreCase(Repository.IFS_FILE) && isFile && size > 0) { - if (etag.equals(s3ETag)) { + if (etag.equals(s3ETag) || etag.contains("-")) { if (versionId != null && !versionId.isEmpty()) { logger.info("move success : {}:{}", path, versionId); } else { diff --git a/src/main/java/ifs_mover/repository/IfsS3.java b/src/main/java/ifs_mover/repository/IfsS3.java index b3a2cd0..84cbba2 100644 --- a/src/main/java/ifs_mover/repository/IfsS3.java +++ b/src/main/java/ifs_mover/repository/IfsS3.java @@ -77,7 +77,25 @@ public class IfsS3 implements Repository, S3 { private final String BUCKET_ALREADY_EXISTS = "BucketAlreadyExists"; private final String INVALID_ACCESS_KEY_ID = "InvalidAccessKeyId"; private final String SIGNATURE_DOES_NOT_MATCH = "SignatureDoesNotMatch"; - + + private final int MILLISECONDS = 1000; + private final int TIMEOUT = 300; + private final int RETRY_COUNT = 2; + + private final String LOG_SOURCE_INVALID_ACCESS = "source - The access key is invalid."; + private final String LOG_SOURCE_INVALID_SECRET = "source - The secret key is invalid."; + private final String LOG_TARGET_INVALID_ACCESS = "target - The access key is invalid."; + private final String LOG_TARGET_INVALID_SECRET = "target - The secret key is invalid."; + private final String LOG_SOURCE_ENDPOINT_NULL = "source - endpoint is null"; + private final String LOG_TARGET_ENDPOINT_NULL = "target - endpoint is null"; + private final String LOG_SOURCE_BUCKET_NULL = "source - bucket is null"; + private final String LOG_TARGET_BUCKET_NULL = "target - bucket is null"; + private final String LOG_SOURCE_BUCKET_NOT_EXIST = "source - bucket is not exist"; + private final String LOG_SOURCE_NOT_REGION = "source - unable to find region."; + private final String LOG_TARGET_NOT_REGION = "target - unable to find region."; + private final String LOG_SOURCE_INVALID_ENDPOINT = "source - endpoint is invalid."; + private final String LOG_TARGET_INVALID_ENDPOINT = "target - endpoint is invalid."; + IfsS3(String jobId) { this.jobId = jobId; } @@ -94,11 +112,11 @@ public void setConfig(Config config, boolean isSource) { public int check(String type) { if (config.getEndPoint() == null || config.getEndPoint().isEmpty()) { if (isSource) { - logger.error("source - endpoint is null"); - errMessage = "source - endpoint is null"; + logger.error(LOG_SOURCE_ENDPOINT_NULL); + errMessage = LOG_SOURCE_ENDPOINT_NULL; } else { - logger.error("target - endpoint is null"); - errMessage = "target - endpoint is null"; + logger.error(LOG_TARGET_ENDPOINT_NULL); + errMessage = LOG_TARGET_ENDPOINT_NULL; } return ENDPOINT_IS_NULL; } @@ -106,11 +124,11 @@ public int check(String type) { if (!type.equalsIgnoreCase(Repository.SWIFT)) { if (config.getBucket() == null || config.getBucket().isEmpty()) { if (isSource) { - logger.error("source - bucket is null"); - errMessage = "source - bucket is null"; + logger.error(LOG_SOURCE_BUCKET_NULL); + errMessage = LOG_SOURCE_BUCKET_NULL; } else { - logger.error("target - bucket is null"); - errMessage = "target - bucket is null"; + logger.error(LOG_TARGET_BUCKET_NULL); + errMessage = LOG_TARGET_BUCKET_NULL; } return BUCKET_IS_NULL; } @@ -134,8 +152,8 @@ public int check(String type) { result = existBucket(true, config.getBucket()); if (result == BUCKET_NO_EXIST) { if (isSource) { - logger.error("source - bucket is not exist"); - errMessage = "source - bucket is not exist"; + logger.error(LOG_SOURCE_BUCKET_NOT_EXIST); + errMessage = LOG_SOURCE_BUCKET_NOT_EXIST; return BUCKET_NO_EXIST; } else { result = createBucket(true); @@ -155,21 +173,21 @@ public int getClient() { client = createClient(isAWS, isSecure, config.getEndPoint(), config.getAccessKey(), config.getSecretKey()); } catch (SdkClientException e) { if (isSource) { - logger.error("source - unable to find region."); - errMessage = "source - unable to find region"; + logger.error(LOG_SOURCE_NOT_REGION); + errMessage = LOG_SOURCE_NOT_REGION; } else { - logger.error("target - unable to find region."); - errMessage = "target - unable to find region"; + logger.error(LOG_TARGET_NOT_REGION); + errMessage = LOG_TARGET_NOT_REGION; } return UNABLE_FIND_REGION; } catch (IllegalArgumentException e) { if (isSource) { - logger.error("source - endpoint is invalid."); - errMessage = "source - endpoint is invalid."; + logger.error(LOG_SOURCE_INVALID_ENDPOINT); + errMessage = LOG_SOURCE_INVALID_ENDPOINT; } else { - logger.error("target - endpoint is invalid."); - errMessage = "target - endpoint is invalid."; + logger.error(LOG_TARGET_INVALID_ENDPOINT); + errMessage = LOG_TARGET_INVALID_ENDPOINT; } return INVALID_ENDPOINT; } @@ -187,6 +205,9 @@ private AmazonS3 createClient(boolean isAWS, boolean isSecure, String URL, Strin } config.setSignerOverride(AWS_S3_V4_SIGNER_TYPE); + config.setMaxErrorRetry(RETRY_COUNT); + config.setConnectionTimeout(TIMEOUT * MILLISECONDS); + config.setSocketTimeout(TIMEOUT * MILLISECONDS); AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard(); clientBuilder.setCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(AccessKey, SecretKey))); @@ -217,19 +238,19 @@ private int existBucket(boolean isCheck, String bucket) { case INVALID_ACCESS_KEY_ID: if (isCheck) { if (isSource) { - logger.error("source - The access key is invalid."); - errMessage = "source - The access key is invalid."; + logger.error(LOG_SOURCE_INVALID_ACCESS); + errMessage = LOG_SOURCE_INVALID_ACCESS; } else { - logger.error("target - The access key is invalid."); - errMessage = "target - The access key is invalid."; + logger.error(LOG_TARGET_INVALID_ACCESS); + errMessage = LOG_TARGET_INVALID_ACCESS; } } else { if (isSource) { - logger.error("source - The access key is invalid."); - errMessage = "source - The access key is invalid."; + logger.error(LOG_SOURCE_INVALID_ACCESS); + errMessage = LOG_SOURCE_INVALID_ACCESS; } else { - logger.error("target - The access key is invalid."); - errMessage = "target - The access key is invalid."; + logger.error(LOG_TARGET_INVALID_ACCESS); + errMessage = LOG_TARGET_INVALID_ACCESS; } } result = INVALID_ACCESS_KEY; @@ -238,19 +259,19 @@ private int existBucket(boolean isCheck, String bucket) { case SIGNATURE_DOES_NOT_MATCH: if (isCheck) { if (isSource) { - logger.error("source - The secret key is invalid."); - errMessage = "source - The secret key is invalid."; + logger.error(LOG_SOURCE_INVALID_SECRET); + errMessage = LOG_SOURCE_INVALID_SECRET; } else { - logger.error("target - The secret key is invalid."); - errMessage = "target The secret key is invalid."; + logger.error(LOG_TARGET_INVALID_SECRET); + errMessage = LOG_TARGET_INVALID_SECRET; } } else { if (isSource) { - logger.error("source - The secret key is invalid."); - errMessage = "source - The secret key is invalid."; + logger.error(LOG_SOURCE_INVALID_SECRET); + errMessage = LOG_SOURCE_INVALID_SECRET; } else { - logger.error("target - The secret key is invalid."); - errMessage = "target - The secret key is invalid."; + logger.error(LOG_TARGET_INVALID_SECRET); + errMessage = LOG_TARGET_INVALID_SECRET; } } result = INVALID_SECRET_KEY; @@ -291,19 +312,19 @@ private int createBucket(boolean isCheck) { case INVALID_ACCESS_KEY_ID: if (isCheck) { if (isSource) { - logger.error("source - The access key is invalid."); - errMessage = "source - The access key is invalid."; + logger.error(LOG_SOURCE_INVALID_ACCESS); + errMessage = LOG_SOURCE_INVALID_ACCESS; } else { - logger.error("target - The access key is invalid."); - errMessage = "target - The access key is invalid."; + logger.error(LOG_TARGET_INVALID_ACCESS); + errMessage = LOG_TARGET_INVALID_ACCESS; } } else { if (isSource) { - logger.error("source - The access key is invalid."); - errMessage = "source - The access key is invalid."; + logger.error(LOG_SOURCE_INVALID_ACCESS); + errMessage = LOG_SOURCE_INVALID_ACCESS; } else { - logger.error("target - The access key is invalid."); - errMessage = "target - The access key is invalid."; + logger.error(LOG_TARGET_INVALID_ACCESS); + errMessage = LOG_TARGET_INVALID_ACCESS; } } return INVALID_ACCESS_KEY; @@ -311,19 +332,19 @@ private int createBucket(boolean isCheck) { case SIGNATURE_DOES_NOT_MATCH: if (isCheck) { if (isSource) { - logger.error("source - The secret key is invalid."); - errMessage = "source - The secret key is invalid."; + logger.error(LOG_SOURCE_INVALID_SECRET); + errMessage = LOG_SOURCE_INVALID_SECRET; } else { - logger.error("target - The secret key is invalid."); - errMessage = "target The secret key is invalid."; + logger.error(LOG_TARGET_INVALID_SECRET); + errMessage = LOG_TARGET_INVALID_SECRET; } } else { if (isSource) { - logger.error("source - The secret key is invalid."); - errMessage = "source - The secret key is invalid."; + logger.error(LOG_SOURCE_INVALID_SECRET); + errMessage = LOG_SOURCE_INVALID_SECRET; } else { - logger.error("target - The secret key is invalid."); - errMessage = "target - The secret key is invalid."; + logger.error(LOG_TARGET_INVALID_SECRET); + errMessage = LOG_TARGET_INVALID_SECRET; } } return INVALID_SECRET_KEY; @@ -356,11 +377,11 @@ private boolean createBucket(String bucket) { public int init(String type) { if (config.getEndPoint() == null || config.getEndPoint().isEmpty()) { if (isSource) { - logger.error("source - endpoint is null"); - errMessage = "source - endpoint is null"; + logger.error(LOG_SOURCE_ENDPOINT_NULL); + errMessage = LOG_SOURCE_ENDPOINT_NULL; } else { - logger.error("target - endpoint is null"); - errMessage = "target - endpoint is null"; + logger.error(LOG_TARGET_ENDPOINT_NULL); + errMessage = LOG_TARGET_ENDPOINT_NULL; } DBManager.insertErrorJob(jobId, errMessage); return ENDPOINT_IS_NULL; @@ -369,11 +390,11 @@ public int init(String type) { if (!type.equalsIgnoreCase(Repository.SWIFT)) { if (config.getBucket() == null || config.getBucket().isEmpty()) { if (isSource) { - logger.error("source - bucket is null"); - errMessage = "source - bucket is null"; + logger.error(LOG_SOURCE_BUCKET_NULL); + errMessage = LOG_SOURCE_BUCKET_NULL; } else { - logger.error("target - bucket is null"); - errMessage = "target - bucket is null"; + logger.error(LOG_TARGET_BUCKET_NULL); + errMessage = LOG_TARGET_BUCKET_NULL; } DBManager.insertErrorJob(jobId, errMessage); return BUCKET_IS_NULL; @@ -399,8 +420,8 @@ public int init(String type) { result = existBucket(true, config.getBucket()); if (result == BUCKET_NO_EXIST) { if (isSource) { - logger.error("source - bucket is not exist"); - errMessage = "source - bucket is not exist"; + logger.error(LOG_SOURCE_BUCKET_NOT_EXIST); + errMessage = LOG_SOURCE_BUCKET_NOT_EXIST; return BUCKET_NO_EXIST; } else { result = createBucket(false); @@ -782,8 +803,8 @@ public ObjectData getObject(String bucket, String key, String versionId, long st } @Override - public String startMultipart(String bucket, String key) { - InitiateMultipartUploadResult initMultipart = client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key)); + public String startMultipart(String bucket, String key, ObjectMetadata objectMetadata) { + InitiateMultipartUploadResult initMultipart = client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key, objectMetadata)); return initMultipart.getUploadId(); } @@ -818,7 +839,7 @@ public String putObject(boolean isFile, String bucket, String key, ObjectData da } putObjectRequest = new PutObjectRequest(bucket, key, data.getInputStream(), data.getMetadata()); } - putObjectRequest.getRequestClientOptions().setReadLimit(1024 * 1024 * 1024); + // putObjectRequest.getRequestClientOptions().setReadLimit(512 * 1024 * 1024); return client.putObject(putObjectRequest).getETag(); } else { ObjectMetadata meta = new ObjectMetadata(); diff --git a/src/main/java/ifs_mover/repository/S3.java b/src/main/java/ifs_mover/repository/S3.java index 7ba2de8..4142ba5 100644 --- a/src/main/java/ifs_mover/repository/S3.java +++ b/src/main/java/ifs_mover/repository/S3.java @@ -13,13 +13,14 @@ import java.io.InputStream; import java.util.List; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.Tag; public interface S3 { void setVersioning(); void setBucketVersioning(String status); - String startMultipart(String bucket, String key); + String startMultipart(String bucket, String key, ObjectMetadata objectMetadata); String uploadPart(String bucket, String key, String uploadId, InputStream is, int partNumber, long partSize); String completeMultipart(String bucket, String key, String uploadId, List list); void setTagging(String bucket, String key, List tagSet);