Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feat: Implement S3 Metrics Collection and Reporting #12213

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,5 +593,12 @@ void registerVeloxMetrics() {
// The number of times that storage IOs get throttled in a storage cluster.
DEFINE_METRIC(
kMetricStorageGlobalThrottled, facebook::velox::StatType::COUNT);

#ifdef VELOX_ENABLE_S3
DEFINE_METRIC(
kMetricS3ActiveConnections, facebook::velox::StatType::COUNT);
DEFINE_METRIC(
kMetricS3MetadataCalls, facebook::velox::StatType::COUNT);
#endif
}
} // namespace facebook::velox
6 changes: 6 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,10 @@ constexpr folly::StringPiece kMetricStorageGlobalThrottled{

constexpr folly::StringPiece kMetricStorageNetworkThrottled{
"velox.storage_network_throttled_count"};

constexpr folly::StringPiece kMetricS3ActiveConnections{
"velox.presto_hive_s3_presto_s3_file_system_active_connections_total_count"};

constexpr folly::StringPiece kMetricS3MetadataCalls{
"velox.presto_hive_s3_presto_s3_file_system_metadata_calls_one_minute_count"};
} // namespace facebook::velox
13 changes: 13 additions & 0 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ PeriodicStatsReporter::PeriodicStatsReporter(const Options& options)
cache_(options.cache),
arbitrator_(options.arbitrator),
spillMemoryPool_(options.spillMemoryPool),
fileSystem_(options.fileSystem),
options_(options) {}

void PeriodicStatsReporter::start() {
Expand All @@ -84,6 +85,10 @@ void PeriodicStatsReporter::start() {
"report_spill_stats",
[this]() { reportSpillStats(); },
options_.spillStatsIntervalMs);
addTask(
"report_file_system_stats",
[this]() {reportFileSystemMetrics();},
options_.filesystemStatsIntervalMs);
}

void PeriodicStatsReporter::stop() {
Expand Down Expand Up @@ -256,4 +261,12 @@ void PeriodicStatsReporter::reportSpillStats() {
RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
}

void PeriodicStatsReporter::reportFileSystemMetrics() {
if (fileSystem_ == nullptr) {
return;
}
fileSystem_->reportMetrics();
}


} // namespace facebook::velox
10 changes: 8 additions & 2 deletions velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ class PeriodicStatsReporter {
const memory::MemoryPool* spillMemoryPool{nullptr};
uint64_t spillStatsIntervalMs{60'000};

const filesystems::FileSystem* fileSystem{nullptr};
uint64_t filesystemStatsIntervalMs{60'000};

std::string toString() const {
return fmt::format(
"allocatorStatsIntervalMs:{}, cacheStatsIntervalMs:{}, "
"arbitratorStatsIntervalMs:{}, spillStatsIntervalMs:{}",
"arbitratorStatsIntervalMs:{}, spillStatsIntervalMs:{}, filesystemStatsIntervalMs:{}",
allocatorStatsIntervalMs,
cacheStatsIntervalMs,
arbitratorStatsIntervalMs,
spillStatsIntervalMs);
spillStatsIntervalMs,
filesystemStatsIntervalMs);
}
};

Expand Down Expand Up @@ -95,11 +99,13 @@ class PeriodicStatsReporter {
void reportAllocatorStats();
void reportArbitratorStats();
void reportSpillStats();
void reportFileSystemMetrics();

const velox::memory::MemoryAllocator* const allocator_{nullptr};
const velox::cache::AsyncDataCache* const cache_{nullptr};
const velox::memory::MemoryArbitrator* const arbitrator_{nullptr};
const velox::memory::MemoryPool* const spillMemoryPool_{nullptr};
const velox::filesystems::FileSystem* const fileSystem_{nullptr};
const Options options_;

cache::CacheStats lastCacheStats_;
Expand Down
8 changes: 8 additions & 0 deletions velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ struct FileSystemOptions {
bool readAheadEnabled{false};
};

struct FileSystemMetrics {
virtual ~FileSystemMetrics() = default;
};

/// An abstract FileSystem
class FileSystem {
public:
Expand All @@ -95,6 +99,10 @@ class FileSystem {
/// Returns the name of the File System
virtual std::string name() const = 0;

virtual void reportMetrics() const {
VELOX_NYI();
}

/// Returns the file path without the fs scheme prefix such as "local:" prefix
/// for local file system.
virtual std::string_view extractPath(std::string_view path) const {
Expand Down
11 changes: 11 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"
Expand Down Expand Up @@ -97,6 +99,7 @@ class S3ReadFile final : public ReadFile {
request.SetKey(awsString(key_));

auto outcome = client_->HeadObject(request);
S3FileSystem::metrics->metadataCalls.fetch_add(1);
VELOX_CHECK_AWS_OUTCOME(
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
Expand Down Expand Up @@ -179,8 +182,10 @@ class S3ReadFile final : public ReadFile {
request.SetRange(awsString(ss.str()));
request.SetResponseStreamFactory(
AwsWriteableStreamFactory(position, length));
S3FileSystem::metrics->activeConnections.fetch_add(1);
auto outcome = client_->GetObject(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);
S3FileSystem::metrics->activeConnections.fetch_sub(1);
}

Aws::S3::S3Client* client_;
Expand Down Expand Up @@ -233,6 +238,7 @@ class S3WriteFile::Impl {
request.SetBucket(awsString(bucket_));
request.SetKey(awsString(key_));
auto objectMetadata = client_->HeadObject(request);
S3FileSystem::metrics->metadataCalls.fetch_add(1);
VELOX_CHECK(!objectMetadata.IsSuccess(), "S3 object already exists");
}

Expand Down Expand Up @@ -761,4 +767,9 @@ std::string S3FileSystem::name() const {
return "S3";
}

void S3FileSystem::reportMetrics() const {
RECORD_METRIC_VALUE(kMetricS3ActiveConnections, S3FileSystem::metrics->activeConnections);
RECORD_METRIC_VALUE(kMetricS3MetadataCalls, S3FileSystem::metrics->metadataCalls);
}

} // namespace facebook::velox::filesystems
17 changes: 17 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ bool initializeS3(std::string_view logLevel = "FATAL");

void finalizeS3();


struct S3FileSystemMetrics : FileSystemMetrics {
std::atomic<uint64_t> activeConnections{0};
std::atomic<uint64_t> metadataCalls{0};

void reset() {
activeConnections.store(0);
metadataCalls.store(0);
}
};

/// Implementation of S3 filesystem and file interface.
/// We provide a registration method for read and write files so the appropriate
/// type of file can be constructed based on a filename.
Expand All @@ -35,6 +46,8 @@ class S3FileSystem : public FileSystem {

std::string name() const override;

void reportMetrics() const override;

std::unique_ptr<ReadFile> openFileForRead(
std::string_view s3Path,
const FileOptions& options = {},
Expand Down Expand Up @@ -74,9 +87,13 @@ class S3FileSystem : public FileSystem {

std::string getLogLevelName() const;

static std::shared_ptr<S3FileSystemMetrics> metrics;

protected:
class Impl;
std::shared_ptr<Impl> impl_;
};

std::shared_ptr<S3FileSystemMetrics> S3FileSystem::metrics = std::make_shared<S3FileSystemMetrics>();

} // namespace facebook::velox::filesystems