From b7c3e5989227721dc1d137638ad296efe8de09d0 Mon Sep 17 00:00:00 2001 From: Xin Zhang Date: Wed, 12 Feb 2025 18:26:16 +0000 Subject: [PATCH] Add s3 metrics. --- .../storage_adapters/s3fs/S3FileSystem.cpp | 23 +++++++++++- .../hive/storage_adapters/s3fs/S3FileSystem.h | 30 ++++++++++++++++ .../s3fs/tests/S3FileSystemTest.cpp | 35 +++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 3af3aa9e0cec..bd5622661146 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -95,8 +95,12 @@ class S3ReadFile final : public ReadFile { Aws::S3::Model::HeadObjectRequest request; request.SetBucket(awsString(bucket_)); request.SetKey(awsString(key_)); - + ++S3Metrics::getInstance().metadataCalls; auto outcome = client_->HeadObject(request); + if (!outcome.IsSuccess()) { + ++S3Metrics::getInstance().getMetadataErrors; + } + S3Metrics::getInstance().getMetadataRetries += outcome.GetRetryCount(); VELOX_CHECK_AWS_OUTCOME( outcome, "Failed to get metadata for S3 object", bucket_, key_); length_ = outcome.GetResult().GetContentLength(); @@ -184,7 +188,13 @@ class S3ReadFile final : public ReadFile { request.SetRange(awsString(ss.str())); request.SetResponseStreamFactory( AwsWriteableStreamFactory(position, length)); + ++S3Metrics::getInstance().activeConnections; auto outcome = client_->GetObject(request); + if (!outcome.IsSuccess()) { + ++S3Metrics::getInstance().getObjectErrors; + } + S3Metrics::getInstance().getObjectRetries += outcome.GetRetryCount(); + --S3Metrics::getInstance().activeConnections; VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_); } @@ -237,7 +247,12 @@ class S3WriteFile::Impl { Aws::S3::Model::HeadObjectRequest request; request.SetBucket(awsString(bucket_)); request.SetKey(awsString(key_)); + ++S3Metrics::getInstance().metadataCalls; auto objectMetadata = client_->HeadObject(request); + if (!objectMetadata.IsSuccess()) { + ++S3Metrics::getInstance().getMetadataErrors; + } + S3Metrics::getInstance().getMetadataRetries += objectMetadata.GetRetryCount(); VELOX_CHECK(!objectMetadata.IsSuccess(), "S3 object already exists"); } @@ -305,6 +320,7 @@ class S3WriteFile::Impl { if (closed()) { return; } + ++S3Metrics::getInstance().startedUploads; uploadPart({currentPart_->data(), currentPart_->size()}, true); VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size()); // Complete the multipart upload. @@ -318,6 +334,11 @@ class S3WriteFile::Impl { request.SetMultipartUpload(std::move(completedUpload)); auto outcome = client_->CompleteMultipartUpload(request); + if (outcome.IsSuccess()) { + ++S3Metrics::getInstance().successfulUploads; + } else { + ++S3Metrics::getInstance().failedUploads; + } VELOX_CHECK_AWS_OUTCOME( outcome, "Failed to complete multiple part upload", bucket_, key_); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index c1e73198d48f..13cb731206bc 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -24,6 +24,36 @@ bool initializeS3(std::string_view logLevel = "FATAL"); void finalizeS3(); +struct S3Metrics { + static S3Metrics& getInstance() { + static S3Metrics instance; + return instance; + } + + S3Metrics(const S3Metrics&) = delete; + S3Metrics& operator=(const S3Metrics&) = delete; + S3Metrics(S3Metrics&&) = delete; + S3Metrics& operator=(S3Metrics&&) = delete; + + ~S3Metrics() = default; + + std::atomic activeConnections{0}; + std::atomic startedUploads{0}; + std::atomic failedUploads{0}; + std::atomic successfulUploads{0}; + std::atomic metadataCalls{0}; + std::atomic listStatusCalls{0}; + std::atomic listLocatedStatusCalls{0}; + std::atomic listObjectsCalls{0}; + std::atomic getObjectErrors{0}; + std::atomic getMetadataErrors{0}; + std::atomic getObjectRetries{0}; + std::atomic getMetadataRetries{0}; + + private: + S3Metrics() = default; +}; + /// Implementation of S3 filesystem and file interface. /// We provide a registration method for read and write files so the appropriate /// type of file can be constructed based on a filename. diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index c97bff4ee894..c58b8dbfe121 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -265,4 +265,39 @@ TEST_F(S3FileSystemTest, invalidConnectionSettings) { VELOX_ASSERT_THROW( filesystems::S3FileSystem("", hiveConfig), "Invalid duration"); } + +TEST_F(S3FileSystemTest, metrics) { + const auto bucketName = "metrics"; + const auto file = "test.txt"; + const auto filename = localPath(bucketName) + "/" + file; + const auto s3File = s3URI(bucketName, file); + + auto hiveConfig = minioServer_->hiveConfig(); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); + + EXPECT_EQ(S3Metrics::getInstance().metadataCalls, 0); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + EXPECT_EQ(S3Metrics::getInstance().metadataCalls, 1); + + std::string dataContent = + "Dance me to your beauty with a burning violin" + "Dance me through the panic till I'm gathered safely in" + "Lift me like an olive branch and be my homeward dove" + "Dance me to the end of love"; + + writeFile->append(dataContent); + EXPECT_EQ(S3Metrics::getInstance().startedUploads, 0); + EXPECT_EQ(S3Metrics::getInstance().successfulUploads, 0); + writeFile->close(); + EXPECT_EQ(S3Metrics::getInstance().startedUploads, 1); + EXPECT_EQ(S3Metrics::getInstance().successfulUploads, 1); + + EXPECT_EQ(S3Metrics::getInstance().metadataCalls, 1); + auto readFile = s3fs.openFileForRead(s3File); + EXPECT_EQ(S3Metrics::getInstance().metadataCalls, 2); + + readFile->pread(0, dataContent.length()); +} } // namespace facebook::velox::filesystems