Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36103: [C++] Initial device sync API #37040

Merged
merged 29 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
22c0c98
GH-36103: [C++] Initial device-specific synchronization API
zeroshade Aug 1, 2023
20d8781
more tests
zeroshade Aug 7, 2023
e6ec912
some cleanup
zeroshade Aug 7, 2023
c114b91
linting stuff
zeroshade Aug 7, 2023
3ca47c8
fix comments and cleanup
zeroshade Aug 7, 2023
3978429
lint
zeroshade Aug 7, 2023
8a6148b
fix redundant 'virtual'
zeroshade Aug 7, 2023
fd2edab
fix windows compile error
zeroshade Aug 7, 2023
23ba278
Update cpp/src/arrow/device.h
zeroshade Aug 9, 2023
c12dedb
updates from feedback
zeroshade Aug 9, 2023
2405c85
updates from feedback
zeroshade Aug 10, 2023
6374cea
fix test
zeroshade Aug 11, 2023
f3ad6b2
updates from feedback
zeroshade Aug 14, 2023
4b11843
i'm dumb
zeroshade Aug 14, 2023
e84667b
Fixed
zeroshade Aug 14, 2023
d51ea87
clean up windows warning
zeroshade Aug 14, 2023
4849c81
remove thread-safety, leave it for consumers to impl if they need
zeroshade Aug 15, 2023
e23709e
overhaul update from feedback, redesign for RAII
zeroshade Aug 16, 2023
9aeb27d
update to MakeDeviceSyncEvent
zeroshade Aug 16, 2023
e4c07df
updates from review feedback
zeroshade Aug 18, 2023
3c63d2a
make docs build happy
zeroshade Aug 18, 2023
ac317e0
make the linter happy
zeroshade Aug 18, 2023
859c385
Update cpp/src/arrow/device.h
zeroshade Aug 21, 2023
4021156
fix nits
zeroshade Aug 21, 2023
df46ff9
slight fix in tests
zeroshade Aug 21, 2023
de7c319
shift stream wait to Stream object
zeroshade Aug 22, 2023
9544495
linting
zeroshade Aug 22, 2023
0b74c00
Update cpp/src/arrow/c/bridge.cc
zeroshade Aug 22, 2023
1fdcd0a
rename to WrapDeviceSyncEvent
zeroshade Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
// segment-keys is used to refine the partitioning. However, segment-keys are different in
// that they partition only consecutive rows into a single group. Such a partition of
// consecutive rows is called a segment group. For example, consider a column X with
// values [A, A, B, A] at row-indices [0, 1, 2, 3]. A regular group-by aggregation with keys
// [X] yields a row-index partitioning [[0, 1, 3], [2]] whereas a segmented-group-by
// values [A, A, B, A] at row-indices [0, 1, 2, 3]. A regular group-by aggregation with
// keys [X] yields a row-index partitioning [[0, 1, 3], [2]] whereas a segmented-group-by
// aggregation with segment-keys [X] yields [[0, 1], [2], [3]].
//
// The implementation first segments the input using the segment-keys, then groups by the
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ class ARROW_EXPORT Buffer {
static Result<std::shared_ptr<Buffer>> ViewOrCopy(
std::shared_ptr<Buffer> source, const std::shared_ptr<MemoryManager>& to);

virtual std::shared_ptr<DeviceSync> get_device_sync() { return nullptr; }
zeroshade marked this conversation as resolved.
Show resolved Hide resolved

protected:
bool is_mutable_;
bool is_cpu_;
Expand Down
43 changes: 23 additions & 20 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ struct ExportedArrayPrivateData : PoolAllocationMixin<ExportedArrayPrivateData>
SmallVector<struct ArrowArray*, 4> child_pointers_;

std::shared_ptr<ArrayData> data_;

RawSyncEvent sync_event_;
std::shared_ptr<DeviceSync> sync_;

ExportedArrayPrivateData() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
Expand All @@ -547,10 +546,10 @@ void ReleaseExportedArray(struct ArrowArray* array) {
}
DCHECK_NE(array->private_data, nullptr);
auto* pdata = reinterpret_cast<ExportedArrayPrivateData*>(array->private_data);
if (pdata->sync_event_.sync_event != nullptr &&
pdata->sync_event_.release_func != nullptr) {
pdata->sync_event_.release_func(pdata->sync_event_.sync_event);
if (pdata->sync_) {
pdata->sync_->clear_event();
}

delete pdata;

ArrowArrayMarkReleased(array);
Expand Down Expand Up @@ -591,7 +590,7 @@ struct ArrayExporter {
// Store owning pointer to ArrayData
export_.data_ = data;

export_.sync_event_ = RawSyncEvent();
export_.sync_ = nullptr;
return Status::OK();
}

Expand Down Expand Up @@ -714,11 +713,11 @@ Result<std::pair<std::optional<DeviceAllocationType>, int64_t>> ValidateDeviceIn
return std::make_pair(device_type, device_id);
}

Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
Status ExportDeviceArray(const Array& array, std::shared_ptr<DeviceSync>& sync,
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) {
if (sync_event.sync_event != nullptr && sync_event.release_func) {
return Status::Invalid(
"Must provide a release event function if providing a non-null event");
void* sync_event{nullptr};
if (sync) {
ARROW_ASSIGN_OR_RAISE(sync_event, sync->get_event());
}

SchemaExportGuard guard(out_schema);
Expand All @@ -739,19 +738,20 @@ Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
exporter.Finish(&out->array);

auto* pdata = reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
pdata->sync_event_ = sync_event;
out->sync_event = sync_event.sync_event;
pdata->sync_ = sync;
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
out->sync_event = sync_event;

guard.Detach();
return Status::OK();
}

Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event,
Status ExportDeviceRecordBatch(const RecordBatch& batch,
std::shared_ptr<DeviceSync>& sync,
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowDeviceArray* out,
struct ArrowSchema* out_schema) {
if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) {
return Status::Invalid(
"Must provide a release event function if providing a non-null event");
void* sync_event{nullptr};
if (sync) {
ARROW_ASSIGN_OR_RAISE(sync_event, sync->get_event());
}

// XXX perhaps bypass ToStructArray for speed?
Expand All @@ -776,8 +776,8 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event
exporter.Finish(&out->array);

auto* pdata = reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
pdata->sync_event_ = sync_event;
out->sync_event = sync_event.sync_event;
pdata->sync_ = sync;
out->sync_event = sync_event;

guard.Detach();
return Status::OK();
Expand Down Expand Up @@ -1362,7 +1362,7 @@ namespace {
// The ArrowArray is released on destruction.
struct ImportedArrayData {
struct ArrowArray array_;
void* sync_event_;
std::shared_ptr<DeviceSync> device_sync;
zeroshade marked this conversation as resolved.
Show resolved Hide resolved

ImportedArrayData() {
ArrowArrayMarkReleased(&array_); // Initially released
Expand Down Expand Up @@ -1395,6 +1395,8 @@ class ImportedBuffer : public Buffer {

~ImportedBuffer() override {}

std::shared_ptr<DeviceSync> get_device_sync() override { return import_->device_sync; }

protected:
std::shared_ptr<ImportedArrayData> import_;
};
Expand All @@ -1409,7 +1411,8 @@ struct ArrayImporter {
ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id));
device_type_ = static_cast<DeviceAllocationType>(src->device_type);
RETURN_NOT_OK(Import(&src->array));
import_->sync_event_ = src->sync_event;
ARROW_ASSIGN_OR_RAISE(import_->device_sync,
memory_mgr_->MakeDeviceSync(src->sync_event));
// reset internal state before next import
memory_mgr_.reset();
device_type_ = DeviceAllocationType::kCPU;
Expand Down
29 changes: 10 additions & 19 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "arrow/c/abi.h"
#include "arrow/device.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
Expand Down Expand Up @@ -172,33 +173,22 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
///
/// @{

/// \brief EXPERIMENTAL: Type for freeing a sync event
///
/// If synchronization is necessary for accessing the data on a device,
/// a pointer to an event needs to be passed when exporting the device
/// array. It's the responsibility of the release function for the array
/// to release the event. Both can be null if no sync'ing is necessary.
struct RawSyncEvent {
void* sync_event = NULL;
std::function<void(void*)> release_func;
};

/// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray.
///
/// The resulting ArrowDeviceArray struct keeps the array data and buffers alive
/// until its release callback is called by the consumer. All buffers in
/// the provided array MUST have the same device_type, otherwise an error
/// will be returned.
///
/// If a non-null sync_event is provided, then the sync_release func must also be
/// non-null. If the sync_event is null, then the sync_release parameter is not called.
/// If sync is non-null, get_event will be called on it in order to
/// potentially provide an event for consumers to synchronize on.
///
/// \param[in] array Array object to export
/// \param[in] sync_event A struct containing what is needed for syncing if necessary
/// \param[in] sync shared_ptr to object derived from DeviceSync or null
/// \param[out] out C struct to export the array to
/// \param[out] out_schema optional C struct to export the array type to
ARROW_EXPORT
Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
Status ExportDeviceArray(const Array& array, std::shared_ptr<DeviceSync>& sync,
struct ArrowDeviceArray* out,
struct ArrowSchema* out_schema = NULLPTR);

Expand All @@ -212,15 +202,16 @@ Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
/// otherwise an error will be returned. If columns are on different devices,
/// they should be exported using different ArrowDeviceArray instances.
///
/// If a non-null sync_event is provided, then the sync_release func must also be
/// non-null. If the sync_event is null, then the sync_release parameter is ignored.
/// If sync is non-null, get_event will be called on it in order to
/// potentially provide an event for consumers to synchronize on.
///
/// \param[in] batch Record batch to export
/// \param[in] sync_event A struct containing what is needed for syncing if necessary
/// \param[in] sync shared_ptr to object derived from DeviceSync or null
/// \param[out] out C struct where to export the record batch
/// \param[out] out_schema optional C struct where to export the record batch schema
ARROW_EXPORT
Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event,
Status ExportDeviceRecordBatch(const RecordBatch& batch,
std::shared_ptr<DeviceSync>& sync,
struct ArrowDeviceArray* out,
struct ArrowSchema* out_schema = NULLPTR);

Expand Down
Loading
Loading