diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index c3a6eb0eace0e..f05d89963ce45 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -395,10 +396,15 @@ bool S3Options::Equals(const S3Options& other) const { namespace { +Status ErrorS3Finalized() { return Status::Invalid("S3 subsystem is finalized"); } + Status CheckS3Initialized() { if (!IsS3Initialized()) { + if (IsS3Finalized()) { + return ErrorS3Finalized(); + } return Status::Invalid( - "S3 subsystem not initialized; please call InitializeS3() " + "S3 subsystem is not initialized; please call InitializeS3() " "before carrying out any S3-related operation"); } return Status::OK(); @@ -697,6 +703,138 @@ void DisableRedirects(Aws::Client::ClientConfiguration* c) { DisableRedirectsImpl(&c->followRedirects); } +// ----------------------------------------------------------------------- +// S3 client protection against use after finalization +// +// Applications are advised to call FinalizeS3() before process end. +// However, once this is done, AWS APIs cannot reliably be called anymore +// (even destructors may crash or trigger UB). +// To prevent such issues, we wrap all S3Client instances in a special +// structure (S3ClientHolder) that prevents usage of S3Client after +// S3 was finalized. +// +// See: GH-36346, GH-15054. + +class S3ClientFinalizer; + +class S3ClientLock { + public: + S3Client* get() { return client_.get(); } + S3Client* operator->() { return client_.get(); } + + protected: + friend class S3ClientHolder; + + // Locks the finalizer until the S3ClientLock gets out of scope. + std::shared_lock lock_; + std::shared_ptr client_; +}; + +class S3ClientHolder { + public: + /// \brief Return a RAII guard guaranteeing a S3Client is safe for use + /// + /// S3 finalization will be deferred until the returned S3ClientLock + /// goes out of scope. + /// An error is returned if S3 is already finalized. + Result Lock(); + + S3ClientHolder(std::weak_ptr finalizer, + std::shared_ptr client) + : finalizer_(std::move(finalizer)), client_(std::move(client)) {} + + void Finalize(); + + protected: + std::mutex mutex_; + std::weak_ptr finalizer_; + std::shared_ptr client_; +}; + +class S3ClientFinalizer : public std::enable_shared_from_this { + using ClientHolderList = std::vector>; + + public: + Result> AddClient(std::shared_ptr client) { + std::unique_lock lock(mutex_); + if (finalized_) { + return ErrorS3Finalized(); + } + + auto holder = std::make_shared(shared_from_this(), std::move(client)); + + // Remove expired entries before adding new one + auto end = std::remove_if( + holders_.begin(), holders_.end(), + [](std::weak_ptr holder) { return holder.expired(); }); + holders_.erase(end, holders_.end()); + holders_.emplace_back(holder); + return holder; + } + + void Finalize() { + std::unique_lock lock(mutex_); + finalized_ = true; + + ClientHolderList finalizing = std::move(holders_); + lock.unlock(); // avoid lock ordering issue with S3ClientHolder::Finalize + + // Finalize all client holders, such that no S3Client remains alive + // after this. + for (auto&& weak_holder : finalizing) { + auto holder = weak_holder.lock(); + if (holder) { + holder->Finalize(); + } + } + } + + auto LockShared() { return std::shared_lock(mutex_); } + + protected: + friend class S3ClientHolder; + + std::shared_mutex mutex_; + ClientHolderList holders_; + bool finalized_ = false; +}; + +Result S3ClientHolder::Lock() { + std::lock_guard lock(mutex_); + auto finalizer = finalizer_.lock(); + if (!finalizer) { + return ErrorS3Finalized(); + } + S3ClientLock client_lock; + // Lock the finalizer before examining it + client_lock.lock_ = finalizer->LockShared(); + if (finalizer->finalized_) { + return ErrorS3Finalized(); + } + // (the client can be cleared only if finalizer->finalized_ is true) + DCHECK(client_) << "inconsistent S3ClientHolder"; + client_lock.client_ = client_; + return client_lock; +} + +void S3ClientHolder::Finalize() { + std::lock_guard lock(mutex_); + client_.reset(); +} + +std::shared_ptr GetClientFinalizer() { + static auto finalizer = std::make_shared(); + return finalizer; +} + +Result> GetClientHolder( + std::shared_ptr client) { + return GetClientFinalizer()->AddClient(std::move(client)); +} + +// ----------------------------------------------------------------------- +// S3 client factory: build S3Client from S3Options + class ClientBuilder { public: explicit ClientBuilder(S3Options options) : options_(std::move(options)) {} @@ -705,7 +843,7 @@ class ClientBuilder { Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; } - Result> BuildClient( + Result> BuildClient( std::optional io_context = std::nullopt) { credentials_provider_ = options_.credentials_provider; if (!options_.region.empty()) { @@ -778,7 +916,7 @@ class ClientBuilder { Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing); client->s3_retry_strategy_ = options_.retry_strategy; - return client; + return GetClientHolder(std::move(client)); } const S3Options& options() const { return options_; } @@ -839,7 +977,8 @@ class RegionResolver { } Result ResolveRegionUncached(const std::string& bucket) { - return client_->GetBucketRegion(bucket); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + return client_lock->GetBucketRegion(bucket); } protected: @@ -849,13 +988,13 @@ class RegionResolver { DCHECK(builder_.options().endpoint_override.empty()); // On Windows with AWS SDK >= 1.8, it is necessary to disable redirects (ARROW-10085). DisableRedirects(builder_.mutable_config()); - return builder_.BuildClient().Value(&client_); + return builder_.BuildClient().Value(&holder_); } static std::shared_ptr instance_; ClientBuilder builder_; - std::shared_ptr client_; + std::shared_ptr holder_; std::mutex cache_mutex_; // XXX Should cache size be bounded? It must be quite unusual to query millions @@ -999,10 +1138,9 @@ Status SetObjectMetadata(const std::shared_ptr& metadata // A RandomAccessFile that reads from a S3 object class ObjectInputFile final : public io::RandomAccessFile { public: - ObjectInputFile(std::shared_ptr client, - const io::IOContext& io_context, const S3Path& path, - int64_t size = kNoSize) - : client_(std::move(client)), + ObjectInputFile(std::shared_ptr holder, const io::IOContext& io_context, + const S3Path& path, int64_t size = kNoSize) + : holder_(std::move(holder)), io_context_(io_context), path_(path), content_length_(size) {} @@ -1019,7 +1157,8 @@ class ObjectInputFile final : public io::RandomAccessFile { req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); - auto outcome = client_->HeadObject(req); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock->HeadObject(req); if (!outcome.IsSuccess()) { if (IsNotFound(outcome.GetError())) { return PathNotFound(path_); @@ -1065,7 +1204,7 @@ class ObjectInputFile final : public io::RandomAccessFile { } Status Close() override { - client_ = nullptr; + holder_ = nullptr; closed_ = true; return Status::OK(); } @@ -1100,8 +1239,10 @@ class ObjectInputFile final : public io::RandomAccessFile { } // Read the desired range of bytes - ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result, - GetObjectRange(client_.get(), path_, position, nbytes, out)); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + ARROW_ASSIGN_OR_RAISE( + S3Model::GetObjectResult result, + GetObjectRange(client_lock.get(), path_, position, nbytes, out)); auto& stream = result.GetBody(); stream.ignore(nbytes); @@ -1140,7 +1281,7 @@ class ObjectInputFile final : public io::RandomAccessFile { } protected: - std::shared_ptr client_; + std::shared_ptr holder_; const io::IOContext io_context_; S3Path path_; @@ -1165,10 +1306,11 @@ class ObjectOutputStream final : public io::OutputStream { struct UploadState; public: - ObjectOutputStream(std::shared_ptr client, const io::IOContext& io_context, - const S3Path& path, const S3Options& options, + ObjectOutputStream(std::shared_ptr holder, + const io::IOContext& io_context, const S3Path& path, + const S3Options& options, const std::shared_ptr& metadata) - : client_(std::move(client)), + : holder_(std::move(holder)), io_context_(io_context), path_(path), metadata_(metadata), @@ -1182,6 +1324,8 @@ class ObjectOutputStream final : public io::OutputStream { } Status Init() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + // Initiate the multi-part upload S3Model::CreateMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); @@ -1199,7 +1343,7 @@ class ObjectOutputStream final : public io::OutputStream { req.SetContentType("application/octet-stream"); } - auto outcome = client_->CreateMultipartUpload(req); + auto outcome = client_lock->CreateMultipartUpload(req); if (!outcome.IsSuccess()) { return ErrorToStatus( std::forward_as_tuple("When initiating multiple part upload for key '", @@ -1217,12 +1361,14 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + S3Model::AbortMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); req.SetUploadId(upload_id_); - auto outcome = client_->AbortMultipartUpload(req); + auto outcome = client_lock->AbortMultipartUpload(req); if (!outcome.IsSuccess()) { return ErrorToStatus( std::forward_as_tuple("When aborting multiple part upload for key '", path_.key, @@ -1230,7 +1376,7 @@ class ObjectOutputStream final : public io::OutputStream { "AbortMultipartUpload", outcome.GetError()); } current_part_.reset(); - client_ = nullptr; + holder_ = nullptr; closed_ = true; return Status::OK(); } @@ -1257,6 +1403,8 @@ class ObjectOutputStream final : public io::OutputStream { // Wait for in-progress uploads to finish (if async writes are enabled) return FlushAsync().Then([this]() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + // At this point, all part uploads have finished successfully DCHECK_GT(part_number_, 1); DCHECK_EQ(upload_state_->completed_parts.size(), @@ -1270,7 +1418,7 @@ class ObjectOutputStream final : public io::OutputStream { req.SetUploadId(upload_id_); req.SetMultipartUpload(std::move(completed_upload)); - auto outcome = client_->CompleteMultipartUploadWithErrorFixup(std::move(req)); + auto outcome = client_lock->CompleteMultipartUploadWithErrorFixup(std::move(req)); if (!outcome.IsSuccess()) { return ErrorToStatus( std::forward_as_tuple("When completing multiple part upload for key '", @@ -1278,7 +1426,7 @@ class ObjectOutputStream final : public io::OutputStream { "CompleteMultipartUpload", outcome.GetError()); } - client_ = nullptr; + holder_ = nullptr; closed_ = true; return Status::OK(); }); @@ -1379,6 +1527,8 @@ class ObjectOutputStream final : public io::OutputStream { Status UploadPart(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + S3Model::UploadPartRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); @@ -1388,7 +1538,7 @@ class ObjectOutputStream final : public io::OutputStream { if (!background_writes_) { req.SetBody(std::make_shared(data, nbytes)); - auto outcome = client_->UploadPart(req); + auto outcome = client_lock->UploadPart(req); if (!outcome.IsSuccess()) { return UploadPartError(req, outcome); } else { @@ -1412,10 +1562,13 @@ class ObjectOutputStream final : public io::OutputStream { upload_state_->pending_parts_completed = Future<>::Make(); } } - auto client = client_; - ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() { - return client->UploadPart(req); - })); + // XXX This callback returns Aws::Utils::Outcome, it cannot easily call + // `holder->Lock()` which returns arrow::Result. + ARROW_ASSIGN_OR_RAISE( + auto fut, + SubmitIO(io_context_, [client_lock = std::move(client_lock), req]() mutable { + return client_lock->UploadPart(req); + })); // The closure keeps the buffer and the upload state alive auto state = upload_state_; auto part_number = part_number_; @@ -1475,7 +1628,7 @@ class ObjectOutputStream final : public io::OutputStream { } protected: - std::shared_ptr client_; + std::shared_ptr holder_; const io::IOContext io_context_; const S3Path path_; const std::shared_ptr metadata_; @@ -1520,7 +1673,7 @@ struct TreeWalker : public std::enable_shared_from_this { using ErrorHandler = std::function& error)>; using RecursionHandler = std::function(int32_t nesting_depth)>; - std::shared_ptr client_; + std::shared_ptr holder_; io::IOContext io_context_; const std::string bucket_; const std::string base_dir_; @@ -1540,11 +1693,11 @@ struct TreeWalker : public std::enable_shared_from_this { return self->DoWalk(); } - TreeWalker(std::shared_ptr client, io::IOContext io_context, + TreeWalker(std::shared_ptr holder, io::IOContext io_context, std::string bucket, std::string base_dir, int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler, RecursionHandler recursion_handler) - : client_(std::move(client)), + : holder_(std::move(holder)), io_context_(io_context), bucket_(std::move(bucket)), base_dir_(std::move(base_dir)), @@ -1595,8 +1748,8 @@ struct TreeWalker : public std::enable_shared_from_this { void SpawnListObjectsV2() { auto cb = *this; walker->task_group_->Append([cb]() mutable { - Result result = - cb.walker->client_->ListObjectsV2(cb.req); + ARROW_ASSIGN_OR_RAISE(auto client_lock, cb.walker->holder_->Lock()); + Result result = client_lock->ListObjectsV2(cb.req); return cb(result); }); } @@ -1663,7 +1816,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this client_; + std::shared_ptr holder_; std::optional backend_; const int32_t kListObjectsMaxKeys = 1000; @@ -1675,7 +1828,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this BucketExists(const std::string& bucket) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + S3Model::HeadBucketRequest req; req.SetBucket(ToAwsString(bucket)); - auto outcome = client_->HeadBucket(req); + auto outcome = client_lock->HeadBucket(req); if (!outcome.IsSuccess()) { if (!IsNotFound(outcome.GetError())) { return ErrorToStatus(std::forward_as_tuple( @@ -1709,11 +1864,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisLock()); + // Check bucket exists first. { S3Model::HeadBucketRequest req; req.SetBucket(ToAwsString(bucket)); - auto outcome = client_->HeadBucket(req); + auto outcome = client_lock->HeadBucket(req); if (outcome.IsSuccess()) { return Status::OK(); @@ -1743,7 +1900,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisCreateBucket(req); + auto outcome = client_lock->CreateBucket(req); if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) { return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "), "CreateBucket", outcome.GetError()); @@ -1753,13 +1910,15 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisLock()); + S3Model::PutObjectRequest req; req.SetBucket(ToAwsString(bucket)); req.SetKey(ToAwsString(key)); req.SetBody(std::make_shared("")); return OutcomeToStatus( std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "), - "PutObject", client_->PutObject(req)); + "PutObject", client_lock->PutObject(req)); } Status CreateEmptyDir(const std::string& bucket, const std::string& key) { @@ -1768,15 +1927,19 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisLock()); + S3Model::DeleteObjectRequest req; req.SetBucket(ToAwsString(bucket)); req.SetKey(ToAwsString(key)); return OutcomeToStatus( std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "), - "DeleteObject", client_->DeleteObject(req)); + "DeleteObject", client_lock->DeleteObject(req)); } Status CopyObject(const S3Path& src_path, const S3Path& dest_path) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + S3Model::CopyObjectRequest req; req.SetBucket(ToAwsString(dest_path.bucket)); req.SetKey(ToAwsString(dest_path.key)); @@ -1787,7 +1950,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisCopyObject(req)); + "CopyObject", client_lock->CopyObject(req)); } // On Minio, an empty "directory" doesn't satisfy the same API requests as @@ -1799,6 +1962,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this IsEmptyDirectory( const std::string& bucket, const std::string& key, const S3Model::HeadObjectOutcome* previous_outcome = nullptr) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + if (previous_outcome) { // Fetch the backend from the previous error DCHECK(!previous_outcome->IsSuccess()); @@ -1824,7 +1989,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisHeadObject(req); + auto outcome = client_lock->HeadObject(req); if (outcome.IsSuccess()) { return true; } @@ -1850,12 +2015,14 @@ class S3FileSystem::Impl : public std::enable_shared_from_this IsNonEmptyDirectory(const S3Path& path) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + S3Model::ListObjectsV2Request req; req.SetBucket(ToAwsString(path.bucket)); req.SetPrefix(ToAwsString(path.key) + kSep); req.SetDelimiter(Aws::String() + kSep); req.SetMaxKeys(1); - auto outcome = client_->ListObjectsV2(req); + auto outcome = client_lock->ListObjectsV2(req); if (outcome.IsSuccess()) { const S3Model::ListObjectsV2Result& r = outcome.GetResult(); // In some cases, there may be 0 keys but some prefixes @@ -1939,6 +2106,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this* out) { + RETURN_NOT_OK(CheckS3Initialized()); + FileInfoCollector collector(bucket, key, select); auto handle_error = [&](const AWSError& error) -> Status { @@ -1960,7 +2129,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisFinish(self.get()); @@ -2056,7 +2225,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this DeleteObjectsAsync(const std::string& bucket, const std::vector& keys) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + struct DeleteCallback { - const std::string bucket; + std::string bucket; - Status operator()(const S3Model::DeleteObjectsOutcome& outcome) { + Status operator()(const S3Model::DeleteObjectsOutcome& outcome) const { if (!outcome.IsSuccess()) { return ErrorToStatus("DeleteObjects", outcome.GetError()); } @@ -2089,8 +2260,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(kMultipleDeleteMaxKeys); - DeleteCallback delete_cb{bucket}; - auto client = client_; + const DeleteCallback delete_cb{bucket}; std::vector> futures; futures.reserve(keys.size() / chunk_size + 1); @@ -2103,10 +2273,14 @@ class S3FileSystem::Impl : public std::enable_shared_from_thisDeleteObjects(req); - })); - futures.push_back(std::move(fut).Then(delete_cb)); + ARROW_ASSIGN_OR_RAISE( + auto fut, + SubmitIO(io_context_, + [holder = holder_, req = std::move(req), delete_cb]() -> Status { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return delete_cb(client_lock->DeleteObjects(req)); + })); + futures.push_back(std::move(fut)); } return AllComplete(futures); @@ -2169,13 +2343,19 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> ListBuckets() { - auto outcome = client_->ListBuckets(); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + auto outcome = client_lock->ListBuckets(); return ProcessListBuckets(outcome); } Future> ListBucketsAsync(io::IOContext ctx) { - auto self = shared_from_this(); - return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); })) + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + return DeferNotOk(SubmitIO(ctx, + [client_lock = std::move(client_lock)]() mutable { + return client_lock->ListBuckets(); + })) // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets) .Then([](const Aws::S3::Model::ListBucketsOutcome& outcome) { return Impl::ProcessListBuckets(outcome); @@ -2188,7 +2368,9 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path); + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = std::make_shared(holder_, fs->io_context(), path); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -2206,8 +2388,10 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path, info.size()); + std::make_shared(holder_, fs->io_context(), path, info.size()); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -2250,6 +2434,8 @@ S3Options S3FileSystem::options() const { return impl_->options(); } std::string S3FileSystem::region() const { return impl_->region(); } Result S3FileSystem::GetFileInfo(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, impl_->holder_->Lock()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); FileInfo info; info.set_path(s); @@ -2263,7 +2449,7 @@ Result S3FileSystem::GetFileInfo(const std::string& s) { S3Model::HeadBucketRequest req; req.SetBucket(ToAwsString(path.bucket)); - auto outcome = impl_->client_->HeadBucket(req); + auto outcome = client_lock->HeadBucket(req); if (!outcome.IsSuccess()) { if (!IsNotFound(outcome.GetError())) { const auto msg = "When getting information for bucket '" + path.bucket + "': "; @@ -2283,7 +2469,7 @@ Result S3FileSystem::GetFileInfo(const std::string& s) { req.SetBucket(ToAwsString(path.bucket)); req.SetKey(ToAwsString(path.key)); - auto outcome = impl_->client_->HeadObject(req); + auto outcome = client_lock->HeadObject(req); if (outcome.IsSuccess()) { // "File" object found FileObjectToInfo(outcome.GetResult(), &info); @@ -2427,18 +2613,18 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { Status S3FileSystem::DeleteDir(const std::string& s) { ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); - if (path.empty()) { return Status::NotImplemented("Cannot delete all S3 buckets"); } RETURN_NOT_OK(impl_->DeleteDirContentsAsync(path.bucket, path.key).status()); if (path.key.empty() && options().allow_bucket_deletion) { // Delete bucket + ARROW_ASSIGN_OR_RAISE(auto client_lock, impl_->holder_->Lock()); S3Model::DeleteBucketRequest req; req.SetBucket(ToAwsString(path.bucket)); return OutcomeToStatus( std::forward_as_tuple("When deleting bucket '", path.bucket, "': "), - "DeleteBucket", impl_->client_->DeleteBucket(req)); + "DeleteBucket", client_lock->DeleteBucket(req)); } else if (path.key.empty()) { return Status::IOError("Would delete bucket '", path.bucket, "'. ", "To delete buckets, enable the allow_bucket_deletion option."); @@ -2480,6 +2666,8 @@ Status S3FileSystem::DeleteRootDirContents() { } Status S3FileSystem::DeleteFile(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, impl_->holder_->Lock()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); @@ -2488,7 +2676,7 @@ Status S3FileSystem::DeleteFile(const std::string& s) { req.SetBucket(ToAwsString(path.bucket)); req.SetKey(ToAwsString(path.key)); - auto outcome = impl_->client_->HeadObject(req); + auto outcome = client_lock->HeadObject(req); if (!outcome.IsSuccess()) { if (IsNotFound(outcome.GetError())) { return PathNotFound(path); @@ -2562,7 +2750,9 @@ Result> S3FileSystem::OpenOutputStream( ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); - auto ptr = std::make_shared(impl_->client_, io_context(), path, + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = std::make_shared(impl_->holder_, io_context(), path, impl_->options(), metadata); RETURN_NOT_OK(ptr->Init()); return ptr; @@ -2581,17 +2771,20 @@ Result> S3FileSystem::OpenAppendStream( namespace { -struct AwsInstance : public ::arrow::internal::Executor::Resource { +struct AwsInstance { AwsInstance() : is_initialized_(false), is_finalized_(false) {} ~AwsInstance() { Finalize(/*from_destructor=*/true); } // Returns true iff the instance was newly initialized with `options` Result EnsureInitialized(const S3GlobalOptions& options) { - bool expected = false; + // NOTE: The individual accesses are atomic but the entire sequence below is not. + // The application should serialize calls to InitializeS3() and FinalizeS3() + // (see docstrings). if (is_finalized_.load()) { return Status::Invalid("Attempt to initialize S3 after it has been finalized"); } - if (is_initialized_.compare_exchange_strong(expected, true)) { + if (!is_initialized_.exchange(true)) { + // Not already initialized DoInitialize(options); return true; } @@ -2600,17 +2793,22 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { bool IsInitialized() { return !is_finalized_ && is_initialized_; } + bool IsFinalized() { return is_finalized_; } + void Finalize(bool from_destructor = false) { - bool expected = true; - is_finalized_.store(true); - if (is_initialized_.compare_exchange_strong(expected, false)) { + if (is_finalized_.exchange(true)) { + // Already finalized + return; + } + if (is_initialized_.exchange(false)) { + // Was initialized if (from_destructor) { ARROW_LOG(WARNING) << " arrow::fs::FinalizeS3 was not called even though S3 was initialized. " "This could lead to a segmentation fault at exit"; - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options_); } + GetClientFinalizer()->Finalize(); + Aws::ShutdownAPI(aws_options_); } } @@ -2670,21 +2868,13 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { std::atomic is_finalized_; }; -std::shared_ptr CreateAwsInstance() { - auto instance = std::make_shared(); - // Don't let S3 be shutdown until all Arrow threads are done using it - arrow::internal::GetCpuThreadPool()->KeepAlive(instance); - io::internal::GetIOThreadPool()->KeepAlive(instance); - return instance; -} - -AwsInstance& GetAwsInstance() { - static auto instance = CreateAwsInstance(); - return *instance; +AwsInstance* GetAwsInstance() { + static auto instance = std::make_unique(); + return instance.get(); } Result EnsureAwsInstanceInitialized(const S3GlobalOptions& options) { - return GetAwsInstance().EnsureInitialized(options); + return GetAwsInstance()->EnsureInitialized(options); } } // namespace @@ -2705,18 +2895,22 @@ Status EnsureS3Initialized() { } Status FinalizeS3() { - GetAwsInstance().Finalize(); + GetAwsInstance()->Finalize(); return Status::OK(); } Status EnsureS3Finalized() { return FinalizeS3(); } -bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); } +bool IsS3Initialized() { return GetAwsInstance()->IsInitialized(); } + +bool IsS3Finalized() { return GetAwsInstance()->IsFinalized(); } // ----------------------------------------------------------------------- // Top-level utility functions Result ResolveS3BucketRegion(const std::string& bucket) { + RETURN_NOT_OK(CheckS3Initialized()); + if (bucket.empty() || bucket.find_first_of(kSep) != bucket.npos || internal::IsLikelyUri(bucket)) { return Status::Invalid("Not a valid bucket name: '", bucket, "'"); diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 0a7ca73ccb20c..cc870c5abea0d 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -334,15 +334,17 @@ struct ARROW_EXPORT S3GlobalOptions { int num_event_loop_threads = 1; }; -/// Initialize the S3 APIs. It is required to call this function at least once -/// before using S3FileSystem. +/// \brief Initialize the S3 APIs. +/// +/// It is required to call this function at least once before using S3FileSystem. /// /// Once this function is called you MUST call FinalizeS3 before the end of the /// application in order to avoid a segmentation fault at shutdown. ARROW_EXPORT Status InitializeS3(const S3GlobalOptions& options); -/// Ensure the S3 APIs are initialized, but only if not already done. +/// \brief Ensure the S3 APIs are initialized, but only if not already done. +/// /// If necessary, this will call InitializeS3() with some default options. ARROW_EXPORT Status EnsureS3Initialized(); @@ -351,11 +353,25 @@ Status EnsureS3Initialized(); ARROW_EXPORT bool IsS3Initialized(); -/// Shutdown the S3 APIs. +/// Whether S3 was finalized. +ARROW_EXPORT +bool IsS3Finalized(); + +/// \brief Shutdown the S3 APIs. +/// +/// This can wait for some S3 concurrent calls to finish so as to avoid +/// race conditions. +/// After this function has been called, all S3 calls will fail with an error. +/// +/// Calls to InitializeS3() and FinalizeS3() should be serialized by the +/// application (this also applies to EnsureS3Initialized() and +/// EnsureS3Finalized()). ARROW_EXPORT Status FinalizeS3(); -/// Ensure the S3 APIs are shutdown, but only if not already done. +/// \brief Ensure the S3 APIs are shutdown, but only if not already done. +/// +/// If necessary, this will call FinalizeS3(). ARROW_EXPORT Status EnsureS3Finalized(); diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx index 766caccd5ce19..e76c7b9ffa730 100644 --- a/python/pyarrow/_s3fs.pyx +++ b/python/pyarrow/_s3fs.pyx @@ -253,9 +253,13 @@ cdef class S3FileSystem(FileSystem): allow_bucket_creation=False, allow_bucket_deletion=False, retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)): cdef: - CS3Options options + optional[CS3Options] options shared_ptr[CS3FileSystem] wrapped + # Need to do this before initializing `options` as the S3Options + # constructor has a debug check against use after S3 finalization. + ensure_s3_initialized() + if access_key is not None and secret_key is None: raise ValueError( 'In order to initialize with explicit credentials both ' @@ -317,56 +321,60 @@ cdef class S3FileSystem(FileSystem): options = CS3Options.Defaults() if region is not None: - options.region = tobytes(region) + options.value().region = tobytes(region) if request_timeout is not None: - options.request_timeout = request_timeout + options.value().request_timeout = request_timeout if connect_timeout is not None: - options.connect_timeout = connect_timeout + options.value().connect_timeout = connect_timeout if scheme is not None: - options.scheme = tobytes(scheme) + options.value().scheme = tobytes(scheme) if endpoint_override is not None: - options.endpoint_override = tobytes(endpoint_override) + options.value().endpoint_override = tobytes(endpoint_override) if background_writes is not None: - options.background_writes = background_writes + options.value().background_writes = background_writes if default_metadata is not None: if not isinstance(default_metadata, KeyValueMetadata): default_metadata = KeyValueMetadata(default_metadata) - options.default_metadata = pyarrow_unwrap_metadata( + options.value().default_metadata = pyarrow_unwrap_metadata( default_metadata) if proxy_options is not None: if isinstance(proxy_options, dict): - options.proxy_options.scheme = tobytes(proxy_options["scheme"]) - options.proxy_options.host = tobytes(proxy_options["host"]) - options.proxy_options.port = proxy_options["port"] + options.value().proxy_options.scheme = tobytes( + proxy_options["scheme"]) + options.value().proxy_options.host = tobytes( + proxy_options["host"]) + options.value().proxy_options.port = proxy_options["port"] proxy_username = proxy_options.get("username", None) if proxy_username: - options.proxy_options.username = tobytes(proxy_username) + options.value().proxy_options.username = tobytes( + proxy_username) proxy_password = proxy_options.get("password", None) if proxy_password: - options.proxy_options.password = tobytes(proxy_password) + options.value().proxy_options.password = tobytes( + proxy_password) elif isinstance(proxy_options, str): - options.proxy_options = GetResultValue( + options.value().proxy_options = GetResultValue( CS3ProxyOptions.FromUriString(tobytes(proxy_options))) else: raise TypeError( "'proxy_options': expected 'dict' or 'str', " f"got {type(proxy_options)} instead.") - options.allow_bucket_creation = allow_bucket_creation - options.allow_bucket_deletion = allow_bucket_deletion + options.value().allow_bucket_creation = allow_bucket_creation + options.value().allow_bucket_deletion = allow_bucket_deletion if isinstance(retry_strategy, AwsStandardS3RetryStrategy): - options.retry_strategy = CS3RetryStrategy.GetAwsStandardRetryStrategy( + options.value().retry_strategy = CS3RetryStrategy.GetAwsStandardRetryStrategy( retry_strategy.max_attempts) elif isinstance(retry_strategy, AwsDefaultS3RetryStrategy): - options.retry_strategy = CS3RetryStrategy.GetAwsDefaultRetryStrategy( + options.value().retry_strategy = CS3RetryStrategy.GetAwsDefaultRetryStrategy( retry_strategy.max_attempts) else: raise ValueError(f'Invalid retry_strategy {retry_strategy!r}') with nogil: - wrapped = GetResultValue(CS3FileSystem.Make(options)) + wrapped = GetResultValue(CS3FileSystem.Make(options.value())) self.init( wrapped) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 1c7dd448bc9ef..9e139be3e5918 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -38,9 +38,20 @@ cdef extern from * namespace "std" nogil: cdef extern from "" namespace "std" nogil: cdef cppclass optional[T]: + ctypedef T value_type + optional() + optional(nullopt_t) + optional(optional&) except + + optional(T&) except + c_bool has_value() - T value() - optional(T&) + T& value() + T& value_or[U](U& default_value) + void swap(optional&) + void reset() + T& emplace(...) + T& operator*() + # T* operator->() # Not Supported + optional& operator=(optional&) optional& operator=[U](U&) diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index a629db73e2c5b..b64680c87c8a5 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -20,6 +20,8 @@ import os import pathlib import pickle +import subprocess +import sys import pytest import weakref @@ -1821,3 +1823,51 @@ def check_copied_files(destination_dir): destination_dir5.mkdir() copy_files(source_dir, destination_dir5, chunk_size=1, use_threads=False) check_copied_files(destination_dir5) + + +@pytest.mark.s3 +def test_s3_finalize(): + # Once finalize_s3() was called, most/all operations on S3 filesystems + # should raise. + code = """if 1: + import pytest + from pyarrow.fs import (FileSystem, S3FileSystem, + ensure_s3_initialized, finalize_s3) + + fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt') + assert fs.region == 'eu-west-1' + f = fs.open_input_stream(path) + f.read(50) + + finalize_s3() + + with pytest.raises(ValueError, match="S3 .* finalized"): + f.read(50) + with pytest.raises(ValueError, match="S3 .* finalized"): + fs.open_input_stream(path) + with pytest.raises(ValueError, match="S3 .* finalized"): + S3FileSystem(anonymous=True) + with pytest.raises(ValueError, match="S3 .* finalized"): + FileSystem.from_uri('s3://mf-nwp-models/README.txt') + """ + subprocess.check_call([sys.executable, "-c", code]) + + +@pytest.mark.s3 +def test_s3_finalize_region_resolver(): + # Same as test_s3_finalize(), but exercising region resolution + code = """if 1: + import pytest + from pyarrow.fs import resolve_s3_region, ensure_s3_initialized, finalize_s3 + + resolve_s3_region('mf-nwp-models') + + finalize_s3() + + # Testing both cached and uncached accesses + with pytest.raises(ValueError, match="S3 .* finalized"): + resolve_s3_region('mf-nwp-models') + with pytest.raises(ValueError, match="S3 .* finalized"): + resolve_s3_region('voltrondata-labs-datasets') + """ + subprocess.check_call([sys.executable, "-c", code])