Skip to content

Commit

Permalink
CloudFileUploader - GCS (facebookresearch#249)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookresearch#249

Diff for supporting multipart upload in GCP.

Interesting that GCP Client Libarary doesn't support multipart upload (source: https://cloud.google.com/storage/docs/uploads-downloads#support_per_tool). There is a comment in stackoverflower (https://stackoverflow.com/a/71698884) mentioned that "You can do Multipart upload to GCS using the S3 library, as the GCS XML API is compatible with this. There is a guide to using the S3 library with GCS here. " So I'm following it here.

Differential Revision: D37490665

fbshipit-source-id: fd23d17eaf02fb4d67d93c818f77eef17e868505
  • Loading branch information
wenhaizhumeta authored and facebook-github-bot committed Jun 29, 2022
1 parent d18f8ed commit 2a52574
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 8 deletions.
6 changes: 6 additions & 0 deletions fbpcf/aws/S3Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ std::unique_ptr<Aws::S3::S3Client> createS3Client(
const S3ClientOption& option) {
Aws::Client::ClientConfiguration config;

if (option.endpointOverride.has_value()) {
config.endpointOverride = option.endpointOverride.value();
} else if (std::getenv("AWS_ENDPOINT_OVERRIDE")) {
config.endpointOverride = std::getenv("AWS_ENDPOINT_OVERRIDE");
}

if (option.region.has_value()) {
config.region = option.region.value();
} else if (std::getenv("AWS_DEFAULT_REGION")) {
Expand Down
2 changes: 2 additions & 0 deletions fbpcf/aws/S3Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace fbpcf::aws {
// referencee of environment variables:
// https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
struct S3ClientOption {
// AWS_ENDPOINT_OVERRIDE
std::optional<std::string> endpointOverride;
// AWS_DEFAULT_REGION
std::optional<std::string> region;
// AWS_ACCESS_KEY_ID
Expand Down
6 changes: 5 additions & 1 deletion fbpcf/gcp/GCSUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
namespace fbpcf::gcp {
// Format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. gs://bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
GCSObjectReference uriToObjectReference(std::string url) {
std::string bucket;
std::string key;
Expand All @@ -37,6 +39,8 @@ GCSObjectReference uriToObjectReference(std::string url) {

if (boost::iequals(scheme, "gs")) {
bucket = host;
} else if (host.find(".storage.googleapis.com") != std::string::npos) {
bucket = host.substr(0, host.find_first_of("."));
} else {
// Remove the first character '/' in path
path = path.substr(1);
Expand Down
16 changes: 16 additions & 0 deletions fbpcf/gcp/test/GCSUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ TEST(GCSUtil, uriToObjectReference) {
EXPECT_EQ("key", ref.key);
}

TEST(GCSUtil, uriToObjectReference_virtualHostStyle) {
auto uri = "https://bucket-name.storage.googleapis.com/key-name";
auto ref = fbpcf::gcp::uriToObjectReference(uri);

EXPECT_EQ("bucket-name", ref.bucket);
EXPECT_EQ("key-name", ref.key);
}

TEST(GCSUtil, uriToObjectReference_pathStyle) {
auto uri = "https://storage.googleapis.com/bucket-name/key-name";
auto ref = fbpcf::gcp::uriToObjectReference(uri);

EXPECT_EQ("bucket-name", ref.bucket);
EXPECT_EQ("key-name", ref.key);
}

TEST(GCSUtil, uriToObjectReference_Subfolder) {
auto uri = "https://storage.cloud.google.com/bucket/folder/key";
auto ref = fbpcf::gcp::uriToObjectReference(uri);
Expand Down
30 changes: 24 additions & 6 deletions fbpcf/io/cloud_util/CloudFileUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
*/

#include "fbpcf/io/cloud_util/CloudFileUtil.h"
#include <aws/s3/S3Client.h>
#include <re2/re2.h>
#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/PcfException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include "fbpcf/io/cloud_util/S3Client.h"
#include "fbpcf/io/cloud_util/S3FileReader.h"
#include "fbpcf/io/cloud_util/S3FileUploader.h"
Expand All @@ -17,12 +20,14 @@ namespace fbpcf::cloudio {

CloudFileType getCloudFileType(const std::string& filePath) {
// S3 file format:
// 1. https://bucket-name.s3.Region.amazonaws.com/key-name
// 2. https://bucket-name.s3-Region.amazonaws.com/key-name
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
// 3. s3://bucket-name/key-name
// GCS file format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. gs://bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
static const re2::RE2 s3Regex1(
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
static const re2::RE2 s3Regex2(
Expand All @@ -34,9 +39,14 @@ CloudFileType getCloudFileType(const std::string& filePath) {
return CloudFileType::S3;
}

static const re2::RE2 gcsRegex("https://storage.cloud.google.com/.*");
bool isGCSFile =
re2::RE2::FullMatch(filePath, gcsRegex) || filePath.find("gs://", 0) == 0;
static const re2::RE2 gcsRegex1("https://storage.cloud.google.com/.*");
static const re2::RE2 gcsRegex2(
"https://[a-z0-9.-]+.storage.googleapis.com/.+");
static const re2::RE2 gcsRegex3("https://storage.googleapis.com/.*");
bool isGCSFile = re2::RE2::FullMatch(filePath, gcsRegex1) ||
re2::RE2::FullMatch(filePath, gcsRegex2) ||
re2::RE2::FullMatch(filePath, gcsRegex3) ||
filePath.find("gs://", 0) == 0;
if (isGCSFile) {
return CloudFileType::GCS;
}
Expand Down Expand Up @@ -64,6 +74,14 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
fbpcf::aws::S3ClientOption{.region = ref.region})
.getS3Client(),
filePath);
} else if (fileType == CloudFileType::GCS) {
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath);
return std::make_unique<GCSFileUploader>(
fbpcf::cloudio::S3Client::getInstance(
fbpcf::aws::S3ClientOption{
.endpointOverride = "https://storage.googleapis.com/"})
.getS3Client(),
filePath);
} else {
throw fbpcf::PcfException("Not supported yet.");
}
Expand Down
125 changes: 125 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CompletedMultipartUpload.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <folly/logging/xlog.h>
#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/AwsException.h"
#include "fbpcf/gcp/GCSUtil.h"

namespace fbpcf::cloudio {

static const std::string FILE_TYPE = "text/csv";
static const int MAX_RETRY_COUNT = 3;

void GCSFileUploader::init() {
XLOG(INFO) << "Start multipart upload initialization. ";
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
bucket_ = ref.bucket;
key_ = ref.key;
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetContentType(FILE_TYPE);

XLOG(INFO) << "Bucket: " << bucket_ << ", Key: " << key_;

auto createMultipartUploadOutcome =
gcsClient_->CreateMultipartUpload(request);

if (createMultipartUploadOutcome.IsSuccess()) {
uploadId_ = createMultipartUploadOutcome.GetResult().GetUploadId();
XLOG(INFO) << "Multipart upload initialization succeed. Upload id is: "
<< uploadId_;
} else {
XLOG(ERR) << createMultipartUploadOutcome.GetError();
throw AwsException{
"Multipart upload initialization failed: " +
createMultipartUploadOutcome.GetError().GetMessage()};
}
}

int GCSFileUploader::upload(std::vector<char>& buf) {
XLOG(INFO) << "Start uploading part:"
<< "Part number: " << partNumber_ << "\nBucket: " << bucket_
<< "\nKey: " << key_;
Aws::S3::Model::UploadPartRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadId_);
request.SetPartNumber(partNumber_);
request.SetContentLength(buf.size());

Aws::String str(buf.begin(), buf.end());
auto inputData = Aws::MakeShared<Aws::StringStream>("UploadPartStream", str);
request.SetBody(inputData);
XLOG(INFO) << "Upload stream size: " << str.size();

auto uploadPartResult = gcsClient_->UploadPart(request);
int retryCount = 0;
while (!uploadPartResult.IsSuccess() && retryCount < MAX_RETRY_COUNT) {
XLOG(INFO) << "Upload part " << partNumber_ << " failed. Retrying...";
uploadPartResult = gcsClient_->UploadPart(request);
retryCount++;
}

if (uploadPartResult.IsSuccess()) {
XLOG(INFO) << "Upload part " << partNumber_ << " succeeed.";
Aws::S3::Model::CompletedPart part;
part.SetPartNumber(request.GetPartNumber());
part.SetETag(uploadPartResult.GetResult().GetETag());
completedParts_.push_back(part);
partNumber_++;
return str.size();
} else {
XLOG(INFO) << "Upload part " << partNumber_ << " failed. Aborting...";
abortUpload();
return 0;
}
}

int GCSFileUploader::complete() {
Aws::S3::Model::CompleteMultipartUploadRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadId_);
request.SetMultipartUpload(
Aws::S3::Model::CompletedMultipartUpload().WithParts(completedParts_));

auto completeMultipartUploadResult =
gcsClient_->CompleteMultipartUpload(request);
if (completeMultipartUploadResult.IsSuccess()) {
XLOG(INFO) << "File " << filePath_ << " uploaded successfully.";
return 0;
} else {
XLOG(ERR) << "File " << filePath_ << " failed to upload.";
XLOG(ERR) << "Error: " << completeMultipartUploadResult.GetError();
abortUpload();
return -1;
}
}

void GCSFileUploader::abortUpload() {
Aws::S3::Model::AbortMultipartUploadRequest abortRequest;
abortRequest.SetBucket(bucket_);
abortRequest.SetKey(key_);
abortRequest.SetUploadId(uploadId_);
auto abortMultipartUploadResult =
gcsClient_->AbortMultipartUpload(abortRequest);
if (abortMultipartUploadResult.IsSuccess()) {
XLOG(INFO) << "Abort upload successed. ";
} else {
XLOG(ERR) << "Abort upload failed. Upload ID: " + uploadId_;
}
}

} // namespace fbpcf::cloudio
37 changes: 37 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CompletedPart.h>
#include <memory>
#include "fbpcf/io/cloud_util/IFileUploader.h"

namespace fbpcf::cloudio {
class GCSFileUploader : public IFileUploader {
public:
explicit GCSFileUploader(
std::shared_ptr<Aws::S3::S3Client> client,
const std::string& filePath)
: gcsClient_{std::move(client)}, filePath_{filePath} {
init();
}
int upload(std::vector<char>& buf) override;
int complete() override;

private:
void init() override;
void abortUpload();
std::shared_ptr<Aws::S3::S3Client> gcsClient_;
const std::string filePath_;
std::string bucket_;
std::string key_;
std::string uploadId_;
std::size_t partNumber_ = 1;
Aws::Vector<Aws::S3::Model::CompletedPart> completedParts_;
};
} // namespace fbpcf::cloudio
10 changes: 9 additions & 1 deletion fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ TEST(FileManagerUtilTest, TestGetCloudFileType) {
getCloudFileType("https://storage.cloud.google.com/bucket-name/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType1);

auto gcsType2 = getCloudFileType("gs://bucket-name/key-name");
auto gcsType2 =
getCloudFileType("https://bucket-name.storage.googleapis.com/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType2);

auto gcsType3 =
getCloudFileType("https://storage.googleapis.com/bucket-name/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType3);

auto gcsType4 = getCloudFileType("gs://bucket-name/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType4);

auto unkonwnType =
getCloudFileType("https://storage.test.com/bucket-name/key-name");
EXPECT_EQ(CloudFileType::UNKNOWN, unkonwnType);
Expand Down

0 comments on commit 2a52574

Please sign in to comment.