diff --git a/fbpcf/gcp/GCSUtil.cpp b/fbpcf/gcp/GCSUtil.cpp index 6f17306d..ee67b21c 100644 --- a/fbpcf/gcp/GCSUtil.cpp +++ b/fbpcf/gcp/GCSUtil.cpp @@ -58,4 +58,8 @@ GCSObjectReference uriToObjectReference(std::string url) { return GCSObjectReference{bucket, path.substr(pos + 1)}; } +std::unique_ptr createGCSClient() { + return std::make_unique(); +} + } // namespace fbpcf::gcp diff --git a/fbpcf/gcp/GCSUtil.h b/fbpcf/gcp/GCSUtil.h index 69839b41..6820c30c 100644 --- a/fbpcf/gcp/GCSUtil.h +++ b/fbpcf/gcp/GCSUtil.h @@ -22,6 +22,5 @@ struct GCSObjectReference { }; GCSObjectReference uriToObjectReference(std::string url); -std::unique_ptr createGCSClient( - const GCSClientOption& option); +std::unique_ptr createGCSClient(); } // namespace fbpcf::gcp diff --git a/fbpcf/io/cloud_util/CloudFileUtil.cpp b/fbpcf/io/cloud_util/CloudFileUtil.cpp index 2f39cba9..15863dcd 100644 --- a/fbpcf/io/cloud_util/CloudFileUtil.cpp +++ b/fbpcf/io/cloud_util/CloudFileUtil.cpp @@ -5,10 +5,17 @@ * LICENSE file in the root directory of this source tree. */ -#include "fbpcf/io/cloud_util/CloudFileUtil.h" +#include +#include #include + #include "fbpcf/aws/S3Util.h" #include "fbpcf/exception/PcfException.h" +#include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/CloudFileUtil.h" +#include "fbpcf/io/cloud_util/GCSClient.h" +#include "fbpcf/io/cloud_util/GCSFileReader.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" #include "fbpcf/io/cloud_util/S3Client.h" #include "fbpcf/io/cloud_util/S3FileReader.h" #include "fbpcf/io/cloud_util/S3FileUploader.h" @@ -16,15 +23,17 @@ namespace fbpcf::cloudio { CloudFileType getCloudFileType(const std::string& filePath) { - // S3 file format: - // 1. https://bucket-name.s3.region.amazonaws.com/key-name - // 2. https://bucket-name.s3-region.amazonaws.com/key-name - // 3. s3://bucket-name/key-name - // GCS file format: - // 1. https://storage.cloud.google.com/bucket-name/key-name - // 2. https://bucket-name.storage.googleapis.com/key-name - // 3. https://storage.googleapis.com/bucket-name/key-name - // 4. gs://bucket-name/key-name + /* + * S3 file format: + * 1. https://bucket-name.s3.region.amazonaws.com/key-name + * 2. https://bucket-name.s3-region.amazonaws.com/key-name + * 3. s3://bucket-name/key-name + * GCS file format: + * 1. https://storage.cloud.google.com/bucket-name/key-name + * 2. https://bucket-name.storage.googleapis.com/key-name + * 3. https://storage.googleapis.com/bucket-name/key-name + * 4. gs://bucket-name/key-name + */ static const re2::RE2 s3Regex1( "https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+"); static const re2::RE2 s3Regex2( @@ -58,8 +67,12 @@ std::unique_ptr getCloudFileReader(const std::string& filePath) { fbpcf::cloudio::S3Client::getInstance( fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client()); + } else if (fileType == CloudFileType::GCS) { + return std::make_unique( + fbpcf::cloudio::GCSClient::getInstance(fbpcf::gcp::GCSClientOption{}) + .getGCSClient()); } else { - return nullptr; + throw fbpcf::PcfException("Not supported yet."); } } @@ -73,6 +86,11 @@ std::unique_ptr getCloudFileUploader( fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client(), filePath); + } else if (fileType == CloudFileType::GCS) { + return std::make_unique( + fbpcf::cloudio::GCSClient::getInstance(fbpcf::gcp::GCSClientOption{}) + .getGCSClient(), + filePath); } else { throw fbpcf::PcfException("Not supported yet."); } diff --git a/fbpcf/io/cloud_util/GCSClient.cpp b/fbpcf/io/cloud_util/GCSClient.cpp new file mode 100644 index 00000000..06829fc0 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSClient.cpp @@ -0,0 +1,17 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include "fbpcf/io/cloud_util/GCSClient.h" + +namespace fbpcf::cloudio { +GCSClient& GCSClient::getInstance(const fbpcf::gcp::GCSClientOption& option) { + static GCSClient GCSClient(option); + return GCSClient; +} +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSClient.h b/fbpcf/io/cloud_util/GCSClient.h new file mode 100644 index 00000000..049ff171 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSClient.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +#include +#include "fbpcf/gcp/GCSUtil.h" + +namespace fbpcf::cloudio { + +class GCSClient { + private: + explicit GCSClient(const fbpcf::gcp::GCSClientOption& option) { + GCSClient_ = fbpcf::gcp::createGCSClient(); + } + + public: + static GCSClient& getInstance(const fbpcf::gcp::GCSClientOption& option); + + std::shared_ptr getGCSClient() { + return GCSClient_; + } + + private: + std::shared_ptr GCSClient_; +}; + +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileReader.cpp b/fbpcf/io/cloud_util/GCSFileReader.cpp index e88cc377..b18d7229 100644 --- a/fbpcf/io/cloud_util/GCSFileReader.cpp +++ b/fbpcf/io/cloud_util/GCSFileReader.cpp @@ -12,8 +12,8 @@ namespace fbpcf::cloudio { -template -std::string GCSFileReader::readBytes( +// template +std::string GCSFileReader::readBytes( const std::string& filePath, std::size_t start, std::size_t end) { @@ -29,15 +29,16 @@ std::string GCSFileReader::readBytes( return ss.str(); } -template -size_t GCSFileReader::getFileContentLength( - const std::string& filePath) { +// template +size_t GCSFileReader::getFileContentLength(const std::string& filePath) { const auto& ref = fbpcf::gcp::uriToObjectReference(filePath); auto outcome = GCSClient_->GetObjectMetadata(ref.bucket, ref.key); if (!outcome) { - throw GcpException{"Error getting object metadata for object " + ref.key}; + throw GcpException{ + "Error getting object metadata for object " + ref.key + + " Reason: " + outcome.status().message()}; } - return outcome.size(); + return outcome->size(); } } // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileReader.h b/fbpcf/io/cloud_util/GCSFileReader.h index fa77e639..441ca81b 100644 --- a/fbpcf/io/cloud_util/GCSFileReader.h +++ b/fbpcf/io/cloud_util/GCSFileReader.h @@ -14,10 +14,9 @@ #include "fbpcf/io/cloud_util/IFileReader.h" namespace fbpcf::cloudio { -template class GCSFileReader : public IFileReader { public: - explicit GCSFileReader(std::shared_ptr client) + explicit GCSFileReader(std::shared_ptr client) : GCSClient_{std::move(client)} {} std::string readBytes( @@ -28,7 +27,7 @@ class GCSFileReader : public IFileReader { size_t getFileContentLength(const std::string& filePath) override; private: - std::shared_ptr GCSClient_; + std::shared_ptr GCSClient_; }; } // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileUploader.cpp b/fbpcf/io/cloud_util/GCSFileUploader.cpp new file mode 100644 index 00000000..cb1ab287 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.cpp @@ -0,0 +1,45 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include "fbpcf/exception/GcpException.h" +#include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" + +namespace fbpcf::cloudio { +static const std::string FILE_TYPE = "text/csv"; + +void GCSFileUploader::init() {} + +int32_t GCSFileUploader::upload(std::vector& buf) { + XLOG(INFO) << "Start resumable upload. "; + const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_); + std::string bucket_ = ref.bucket; + std::string object_ = ref.key; + + namespace gcs = ::google::cloud::storage; + using ::google::cloud::StatusOr; + std::string str(buf.begin(), buf.end()); + + StatusOr object_metadata = gcsClient_->InsertObject( + bucket_, object_, str, gcs::ContentType(FILE_TYPE)); + + if (!object_metadata) { + throw GcpException{ + "Resumable upload failed: " + object_metadata.status().message()}; + return 0; + } + XLOG(INFO) << " Resumable upload successful "; + XLOG(INFO) << "Bucket: " << bucket_ << ", Object Name: " << object_; + return str.size(); +} + +int GCSFileUploader::complete() { + return 0; +} +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileUploader.h b/fbpcf/io/cloud_util/GCSFileUploader.h new file mode 100644 index 00000000..c099ec00 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#include +#include "fbpcf/io/cloud_util/IFileUploader.h" + +namespace fbpcf::cloudio { +class GCSFileUploader : public IFileUploader { + public: + explicit GCSFileUploader( + std::shared_ptr gcsClient, + const std::string& filePath) + : gcsClient_{std::move(gcsClient)}, filePath_{filePath} { + init(); + } + int upload(std::vector& buf) override; + int complete() override; + + private: + void init() override; + + std::shared_ptr gcsClient_; + const std::string filePath_; +}; + +} // namespace fbpcf::cloudio