Skip to content

Commit

Permalink
[C++] Use anonymous namespace in arrow/IPC/reader.cc
Browse files Browse the repository at this point in the history
  • Loading branch information
pegasas committed Aug 9, 2023
1 parent 2972e49 commit 5fa2346
Showing 1 changed file with 40 additions and 40 deletions.
80 changes: 40 additions & 40 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ Status InvalidMessageType(MessageType expected, MessageType actual) {
} \
} while (0)

} // namespace

// ----------------------------------------------------------------------
// Record batch read path

Expand Down Expand Up @@ -634,7 +632,7 @@ Status GetCompressionExperimental(const flatbuf::Message* message,
return Status::OK();
}

static Status ReadContiguousPayload(io::InputStream* file,
Status ReadContiguousPayload(io::InputStream* file,
std::unique_ptr<Message>* message) {
ARROW_ASSIGN_OR_RAISE(*message, ReadMessage(file));
if (*message == nullptr) {
Expand All @@ -643,27 +641,6 @@ static Status ReadContiguousPayload(io::InputStream* file,
return Status::OK();
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const std::shared_ptr<Schema>& schema, const DictionaryMemo* dictionary_memo,
const IpcReadOptions& options, io::InputStream* file) {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, options,
reader.get());
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Message& message, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options) {
CHECK_MESSAGE_TYPE(MessageType::RECORD_BATCH, message.type());
CHECK_HAS_BODY(message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options,
reader.get());
}

Result<RecordBatchWithMetadata> ReadRecordBatchInternal(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const std::vector<bool>& inclusion_mask, IpcReadContext& context,
Expand Down Expand Up @@ -764,22 +741,6 @@ Status UnpackSchemaMessage(const Message& message, const IpcReadOptions& options
out_schema, field_inclusion_mask, swap_endian);
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
io::RandomAccessFile* file) {
std::shared_ptr<Schema> out_schema;
// Empty means do not use
std::vector<bool> inclusion_mask;
IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo), options, false);
RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields,
&inclusion_mask, &out_schema));
ARROW_ASSIGN_OR_RAISE(
auto batch_and_custom_metadata,
ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file));
return batch_and_custom_metadata.batch;
}

Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
DictionaryKind* kind, io::RandomAccessFile* file) {
const flatbuf::Message* message = nullptr;
Expand Down Expand Up @@ -851,6 +812,45 @@ Status ReadDictionary(const Message& message, const IpcReadContext& context,
return ReadDictionary(*message.metadata(), context, kind, reader.get());
}

} // namespace

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
io::RandomAccessFile* file) {
std::shared_ptr<Schema> out_schema;
// Empty means do not use
std::vector<bool> inclusion_mask;
IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo), options, false);
RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields,
&inclusion_mask, &out_schema));
ARROW_ASSIGN_OR_RAISE(
auto batch_and_custom_metadata,
ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file));
return batch_and_custom_metadata.batch;
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const std::shared_ptr<Schema>& schema, const DictionaryMemo* dictionary_memo,
const IpcReadOptions& options, io::InputStream* file) {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, options,
reader.get());
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Message& message, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options) {
CHECK_MESSAGE_TYPE(MessageType::RECORD_BATCH, message.type());
CHECK_HAS_BODY(message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options,
reader.get());
}

// Streaming format decoder
class StreamDecoderInternal : public MessageDecoderListener {
public:
Expand Down

0 comments on commit 5fa2346

Please sign in to comment.