Skip to content

Commit

Permalink
multipart 시, metadata 버그 수정 (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
HeoKyungseok committed Apr 11, 2022
1 parent 8177419 commit 921cb45
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 73 deletions.
28 changes: 20 additions & 8 deletions src/main/java/ifs_mover/ObjectMover.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
package ifs_mover;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -472,13 +475,13 @@ private boolean moveObject(String path, boolean isDelete, boolean isFile, String

try {
if (isDelete) {
targetRepository.deleteObject(targetBucket, targetPath, versionId);
targetRepository.deleteObject(targetBucket, targetPath, null);
logger.info("delete success : {}", sourcePath);
} else {
if (isFile) {
if (multipartInfo != null) {
// for swift large file (more than 5G)
String uploadId = targetRepository.startMultipart(targetBucket, targetPath);
String uploadId = targetRepository.startMultipart(targetBucket, targetPath, null);
List<PartETag> partList = new ArrayList<PartETag>();
String[] multiPath = multipartInfo.split("/", 2);
int partNumber = 0;
Expand All @@ -499,30 +502,39 @@ private boolean moveObject(String path, boolean isDelete, boolean isFile, String
targetRepository.setTagging(targetBucket, targetPath, tagSet);
}
logger.info("move success : {}", path);
} else if ((!type.equalsIgnoreCase(Repository.IFS_FILE) && moveSize != 0 && size > moveSize)
|| (type.equalsIgnoreCase(Repository.SWIFT) && size > GIGA_BYTES)) {
} else if ((!type.equalsIgnoreCase(Repository.IFS_FILE) && size > GIGA_BYTES)
|| (!type.equalsIgnoreCase(Repository.IFS_FILE) && (moveSize != 0 && size > moveSize))) {
// send multipart
if (versionId != null && !versionId.isEmpty()) {
logger.debug("send multipart : {}:{}, size {}", path, versionId, size);
} else {
logger.debug("send multipart : {}, size {}", path, size);
}
long limitSize = 0L;
if (moveSize == 0) {
limitSize = 100 * MEGA_BYTES;
} else {
limitSize = moveSize;
}
String uploadId = targetRepository.startMultipart(targetBucket, targetPath);
ObjectData objData = sourceRepository.getObject(sourceBucket, sourcePath, versionId);
String uploadId = targetRepository.startMultipart(targetBucket, targetPath, objData.getMetadata());
List<PartETag> partList = new ArrayList<PartETag>();
int partNumber = 1;
ObjectData data = null;

for (long i = 0; i < size; i += limitSize, partNumber++) {
long start = i;
long end = i + limitSize - 1;
if (end >= size) {
end = size - 1;
}
data = sourceRepository.getObject(sourceBucket, sourcePath, null, start, end);

ObjectData data = sourceRepository.getObject(sourceBucket, sourcePath, versionId, start, end);

if (data != null) {
String partETag = targetRepository.uploadPart(targetBucket, targetPath, uploadId, data.getInputStream(), partNumber, data.getSize());
partList.add(new PartETag(partNumber, partETag));
data.getInputStream().close();
logger.debug("{} - move part : {}, size : {}", path, partNumber, data.getSize());
}
}
targetRepository.completeMultipart(targetBucket, targetPath, uploadId, partList);
Expand Down Expand Up @@ -557,7 +569,7 @@ private boolean moveObject(String path, boolean isDelete, boolean isFile, String
}

if (!type.equalsIgnoreCase(Repository.IFS_FILE) && isFile && size > 0) {
if (etag.equals(s3ETag)) {
if (etag.equals(s3ETag) || etag.contains("-")) {
if (versionId != null && !versionId.isEmpty()) {
logger.info("move success : {}:{}", path, versionId);
} else {
Expand Down
149 changes: 85 additions & 64 deletions src/main/java/ifs_mover/repository/IfsS3.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,25 @@ public class IfsS3 implements Repository, S3 {
private final String BUCKET_ALREADY_EXISTS = "BucketAlreadyExists";
private final String INVALID_ACCESS_KEY_ID = "InvalidAccessKeyId";
private final String SIGNATURE_DOES_NOT_MATCH = "SignatureDoesNotMatch";


private final int MILLISECONDS = 1000;
private final int TIMEOUT = 300;
private final int RETRY_COUNT = 2;

private final String LOG_SOURCE_INVALID_ACCESS = "source - The access key is invalid.";
private final String LOG_SOURCE_INVALID_SECRET = "source - The secret key is invalid.";
private final String LOG_TARGET_INVALID_ACCESS = "target - The access key is invalid.";
private final String LOG_TARGET_INVALID_SECRET = "target - The secret key is invalid.";
private final String LOG_SOURCE_ENDPOINT_NULL = "source - endpoint is null";
private final String LOG_TARGET_ENDPOINT_NULL = "target - endpoint is null";
private final String LOG_SOURCE_BUCKET_NULL = "source - bucket is null";
private final String LOG_TARGET_BUCKET_NULL = "target - bucket is null";
private final String LOG_SOURCE_BUCKET_NOT_EXIST = "source - bucket is not exist";
private final String LOG_SOURCE_NOT_REGION = "source - unable to find region.";
private final String LOG_TARGET_NOT_REGION = "target - unable to find region.";
private final String LOG_SOURCE_INVALID_ENDPOINT = "source - endpoint is invalid.";
private final String LOG_TARGET_INVALID_ENDPOINT = "target - endpoint is invalid.";

IfsS3(String jobId) {
this.jobId = jobId;
}
Expand All @@ -94,23 +112,23 @@ public void setConfig(Config config, boolean isSource) {
public int check(String type) {
if (config.getEndPoint() == null || config.getEndPoint().isEmpty()) {
if (isSource) {
logger.error("source - endpoint is null");
errMessage = "source - endpoint is null";
logger.error(LOG_SOURCE_ENDPOINT_NULL);
errMessage = LOG_SOURCE_ENDPOINT_NULL;
} else {
logger.error("target - endpoint is null");
errMessage = "target - endpoint is null";
logger.error(LOG_TARGET_ENDPOINT_NULL);
errMessage = LOG_TARGET_ENDPOINT_NULL;
}
return ENDPOINT_IS_NULL;
}

if (!type.equalsIgnoreCase(Repository.SWIFT)) {
if (config.getBucket() == null || config.getBucket().isEmpty()) {
if (isSource) {
logger.error("source - bucket is null");
errMessage = "source - bucket is null";
logger.error(LOG_SOURCE_BUCKET_NULL);
errMessage = LOG_SOURCE_BUCKET_NULL;
} else {
logger.error("target - bucket is null");
errMessage = "target - bucket is null";
logger.error(LOG_TARGET_BUCKET_NULL);
errMessage = LOG_TARGET_BUCKET_NULL;
}
return BUCKET_IS_NULL;
}
Expand All @@ -134,8 +152,8 @@ public int check(String type) {
result = existBucket(true, config.getBucket());
if (result == BUCKET_NO_EXIST) {
if (isSource) {
logger.error("source - bucket is not exist");
errMessage = "source - bucket is not exist";
logger.error(LOG_SOURCE_BUCKET_NOT_EXIST);
errMessage = LOG_SOURCE_BUCKET_NOT_EXIST;
return BUCKET_NO_EXIST;
} else {
result = createBucket(true);
Expand All @@ -155,21 +173,21 @@ public int getClient() {
client = createClient(isAWS, isSecure, config.getEndPoint(), config.getAccessKey(), config.getSecretKey());
} catch (SdkClientException e) {
if (isSource) {
logger.error("source - unable to find region.");
errMessage = "source - unable to find region";
logger.error(LOG_SOURCE_NOT_REGION);
errMessage = LOG_SOURCE_NOT_REGION;
} else {
logger.error("target - unable to find region.");
errMessage = "target - unable to find region";
logger.error(LOG_TARGET_NOT_REGION);
errMessage = LOG_TARGET_NOT_REGION;
}

return UNABLE_FIND_REGION;
} catch (IllegalArgumentException e) {
if (isSource) {
logger.error("source - endpoint is invalid.");
errMessage = "source - endpoint is invalid.";
logger.error(LOG_SOURCE_INVALID_ENDPOINT);
errMessage = LOG_SOURCE_INVALID_ENDPOINT;
} else {
logger.error("target - endpoint is invalid.");
errMessage = "target - endpoint is invalid.";
logger.error(LOG_TARGET_INVALID_ENDPOINT);
errMessage = LOG_TARGET_INVALID_ENDPOINT;
}
return INVALID_ENDPOINT;
}
Expand All @@ -187,6 +205,9 @@ private AmazonS3 createClient(boolean isAWS, boolean isSecure, String URL, Strin
}

config.setSignerOverride(AWS_S3_V4_SIGNER_TYPE);
config.setMaxErrorRetry(RETRY_COUNT);
config.setConnectionTimeout(TIMEOUT * MILLISECONDS);
config.setSocketTimeout(TIMEOUT * MILLISECONDS);
AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard();

clientBuilder.setCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(AccessKey, SecretKey)));
Expand Down Expand Up @@ -217,19 +238,19 @@ private int existBucket(boolean isCheck, String bucket) {
case INVALID_ACCESS_KEY_ID:
if (isCheck) {
if (isSource) {
logger.error("source - The access key is invalid.");
errMessage = "source - The access key is invalid.";
logger.error(LOG_SOURCE_INVALID_ACCESS);
errMessage = LOG_SOURCE_INVALID_ACCESS;
} else {
logger.error("target - The access key is invalid.");
errMessage = "target - The access key is invalid.";
logger.error(LOG_TARGET_INVALID_ACCESS);
errMessage = LOG_TARGET_INVALID_ACCESS;
}
} else {
if (isSource) {
logger.error("source - The access key is invalid.");
errMessage = "source - The access key is invalid.";
logger.error(LOG_SOURCE_INVALID_ACCESS);
errMessage = LOG_SOURCE_INVALID_ACCESS;
} else {
logger.error("target - The access key is invalid.");
errMessage = "target - The access key is invalid.";
logger.error(LOG_TARGET_INVALID_ACCESS);
errMessage = LOG_TARGET_INVALID_ACCESS;
}
}
result = INVALID_ACCESS_KEY;
Expand All @@ -238,19 +259,19 @@ private int existBucket(boolean isCheck, String bucket) {
case SIGNATURE_DOES_NOT_MATCH:
if (isCheck) {
if (isSource) {
logger.error("source - The secret key is invalid.");
errMessage = "source - The secret key is invalid.";
logger.error(LOG_SOURCE_INVALID_SECRET);
errMessage = LOG_SOURCE_INVALID_SECRET;
} else {
logger.error("target - The secret key is invalid.");
errMessage = "target The secret key is invalid.";
logger.error(LOG_TARGET_INVALID_SECRET);
errMessage = LOG_TARGET_INVALID_SECRET;
}
} else {
if (isSource) {
logger.error("source - The secret key is invalid.");
errMessage = "source - The secret key is invalid.";
logger.error(LOG_SOURCE_INVALID_SECRET);
errMessage = LOG_SOURCE_INVALID_SECRET;
} else {
logger.error("target - The secret key is invalid.");
errMessage = "target - The secret key is invalid.";
logger.error(LOG_TARGET_INVALID_SECRET);
errMessage = LOG_TARGET_INVALID_SECRET;
}
}
result = INVALID_SECRET_KEY;
Expand Down Expand Up @@ -291,39 +312,39 @@ private int createBucket(boolean isCheck) {
case INVALID_ACCESS_KEY_ID:
if (isCheck) {
if (isSource) {
logger.error("source - The access key is invalid.");
errMessage = "source - The access key is invalid.";
logger.error(LOG_SOURCE_INVALID_ACCESS);
errMessage = LOG_SOURCE_INVALID_ACCESS;
} else {
logger.error("target - The access key is invalid.");
errMessage = "target - The access key is invalid.";
logger.error(LOG_TARGET_INVALID_ACCESS);
errMessage = LOG_TARGET_INVALID_ACCESS;
}
} else {
if (isSource) {
logger.error("source - The access key is invalid.");
errMessage = "source - The access key is invalid.";
logger.error(LOG_SOURCE_INVALID_ACCESS);
errMessage = LOG_SOURCE_INVALID_ACCESS;
} else {
logger.error("target - The access key is invalid.");
errMessage = "target - The access key is invalid.";
logger.error(LOG_TARGET_INVALID_ACCESS);
errMessage = LOG_TARGET_INVALID_ACCESS;
}
}
return INVALID_ACCESS_KEY;

case SIGNATURE_DOES_NOT_MATCH:
if (isCheck) {
if (isSource) {
logger.error("source - The secret key is invalid.");
errMessage = "source - The secret key is invalid.";
logger.error(LOG_SOURCE_INVALID_SECRET);
errMessage = LOG_SOURCE_INVALID_SECRET;
} else {
logger.error("target - The secret key is invalid.");
errMessage = "target The secret key is invalid.";
logger.error(LOG_TARGET_INVALID_SECRET);
errMessage = LOG_TARGET_INVALID_SECRET;
}
} else {
if (isSource) {
logger.error("source - The secret key is invalid.");
errMessage = "source - The secret key is invalid.";
logger.error(LOG_SOURCE_INVALID_SECRET);
errMessage = LOG_SOURCE_INVALID_SECRET;
} else {
logger.error("target - The secret key is invalid.");
errMessage = "target - The secret key is invalid.";
logger.error(LOG_TARGET_INVALID_SECRET);
errMessage = LOG_TARGET_INVALID_SECRET;
}
}
return INVALID_SECRET_KEY;
Expand Down Expand Up @@ -356,11 +377,11 @@ private boolean createBucket(String bucket) {
public int init(String type) {
if (config.getEndPoint() == null || config.getEndPoint().isEmpty()) {
if (isSource) {
logger.error("source - endpoint is null");
errMessage = "source - endpoint is null";
logger.error(LOG_SOURCE_ENDPOINT_NULL);
errMessage = LOG_SOURCE_ENDPOINT_NULL;
} else {
logger.error("target - endpoint is null");
errMessage = "target - endpoint is null";
logger.error(LOG_TARGET_ENDPOINT_NULL);
errMessage = LOG_TARGET_ENDPOINT_NULL;
}
DBManager.insertErrorJob(jobId, errMessage);
return ENDPOINT_IS_NULL;
Expand All @@ -369,11 +390,11 @@ public int init(String type) {
if (!type.equalsIgnoreCase(Repository.SWIFT)) {
if (config.getBucket() == null || config.getBucket().isEmpty()) {
if (isSource) {
logger.error("source - bucket is null");
errMessage = "source - bucket is null";
logger.error(LOG_SOURCE_BUCKET_NULL);
errMessage = LOG_SOURCE_BUCKET_NULL;
} else {
logger.error("target - bucket is null");
errMessage = "target - bucket is null";
logger.error(LOG_TARGET_BUCKET_NULL);
errMessage = LOG_TARGET_BUCKET_NULL;
}
DBManager.insertErrorJob(jobId, errMessage);
return BUCKET_IS_NULL;
Expand All @@ -399,8 +420,8 @@ public int init(String type) {
result = existBucket(true, config.getBucket());
if (result == BUCKET_NO_EXIST) {
if (isSource) {
logger.error("source - bucket is not exist");
errMessage = "source - bucket is not exist";
logger.error(LOG_SOURCE_BUCKET_NOT_EXIST);
errMessage = LOG_SOURCE_BUCKET_NOT_EXIST;
return BUCKET_NO_EXIST;
} else {
result = createBucket(false);
Expand Down Expand Up @@ -782,8 +803,8 @@ public ObjectData getObject(String bucket, String key, String versionId, long st
}

@Override
public String startMultipart(String bucket, String key) {
InitiateMultipartUploadResult initMultipart = client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key));
public String startMultipart(String bucket, String key, ObjectMetadata objectMetadata) {
InitiateMultipartUploadResult initMultipart = client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key, objectMetadata));
return initMultipart.getUploadId();
}

Expand Down Expand Up @@ -818,7 +839,7 @@ public String putObject(boolean isFile, String bucket, String key, ObjectData da
}
putObjectRequest = new PutObjectRequest(bucket, key, data.getInputStream(), data.getMetadata());
}
putObjectRequest.getRequestClientOptions().setReadLimit(1024 * 1024 * 1024);
// putObjectRequest.getRequestClientOptions().setReadLimit(512 * 1024 * 1024);
return client.putObject(putObjectRequest).getETag();
} else {
ObjectMetadata meta = new ObjectMetadata();
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/ifs_mover/repository/S3.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import java.io.InputStream;
import java.util.List;

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.Tag;

public interface S3 {
void setVersioning();
void setBucketVersioning(String status);
String startMultipart(String bucket, String key);
String startMultipart(String bucket, String key, ObjectMetadata objectMetadata);
String uploadPart(String bucket, String key, String uploadId, InputStream is, int partNumber, long partSize);
String completeMultipart(String bucket, String key, String uploadId, List<PartETag> list);
void setTagging(String bucket, String key, List<Tag> tagSet);
Expand Down

0 comments on commit 921cb45

Please sign in to comment.