Skip to content

Commit

Permalink
table I/O for big tz dates (#5846)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored Jun 21, 2024
1 parent 1fb6b33 commit 789f39c
Show file tree
Hide file tree
Showing 27 changed files with 603 additions and 15 deletions.
24 changes: 12 additions & 12 deletions ydb/library/yql/minikql/mkql_type_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2606,23 +2606,23 @@ bool DeserializeTzTimestamp(TStringBuf buf, ui64& timestamp, ui16& tzId) {
}

void SerializeTzDate32(i32 date, ui16 tzId, IOutputStream& out) {
date = SwapBytes(date);
auto value = 0x80 ^ SwapBytes((ui32)date);
tzId = SwapBytes(tzId);
out.Write(&date, sizeof(date));
out.Write(&value, sizeof(value));
out.Write(&tzId, sizeof(tzId));
}

void SerializeTzDatetime64(i64 datetime, ui16 tzId, IOutputStream& out) {
datetime = SwapBytes(datetime);
auto value = 0x80 ^ SwapBytes((ui64)datetime);
tzId = SwapBytes(tzId);
out.Write(&datetime, sizeof(datetime));
out.Write(&value, sizeof(value));
out.Write(&tzId, sizeof(tzId));
}

void SerializeTzTimestamp64(i64 timestamp, ui16 tzId, IOutputStream& out) {
timestamp = SwapBytes(timestamp);
auto value = 0x80 ^ SwapBytes((ui64)timestamp);
tzId = SwapBytes(tzId);
out.Write(&timestamp, sizeof(timestamp));
out.Write(&value, sizeof(value));
out.Write(&tzId, sizeof(tzId));
}

Expand All @@ -2631,8 +2631,8 @@ bool DeserializeTzDate32(TStringBuf buf, i32& date, ui16& tzId) {
return false;
}

date = ReadUnaligned<i32>(buf.data());
date = SwapBytes(date);
auto value = ReadUnaligned<ui32>(buf.data());
date = (i32)(SwapBytes(value ^ 0x80));
if (date < NUdf::MIN_DATE32 || date > NUdf::MAX_DATE32) {
return false;
}
Expand All @@ -2651,8 +2651,8 @@ bool DeserializeTzDatetime64(TStringBuf buf, i64& datetime, ui16& tzId) {
return false;
}

datetime = ReadUnaligned<i64>(buf.data());
datetime = SwapBytes(datetime);
auto value = ReadUnaligned<ui64>(buf.data());
datetime = (i64)(SwapBytes(0x80 ^ value));
if (datetime < NUdf::MIN_DATETIME64 || datetime > NUdf::MAX_DATETIME64) {
return false;
}
Expand All @@ -2671,8 +2671,8 @@ bool DeserializeTzTimestamp64(TStringBuf buf, i64& timestamp, ui16& tzId) {
return false;
}

timestamp = ReadUnaligned<i64>(buf.data());
timestamp = SwapBytes(timestamp);
auto value = ReadUnaligned<ui64>(buf.data());
timestamp = (i64)(SwapBytes(0x80 ^ value));
if (timestamp < NUdf::MIN_TIMESTAMP64 || timestamp > NUdf::MAX_TIMESTAMP64) {
return false;
}
Expand Down
129 changes: 129 additions & 0 deletions ydb/library/yql/providers/common/codec/yql_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,21 @@ NYT::TNode DataValueToNode(const NKikimr::NUdf::TUnboxedValuePod& value, NKikimr
NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
return NYT::TNode(ToString(TStringBuf(value.AsStringRef())));
}
case NUdf::TDataType<NUdf::TTzDate32>::Id: {
TStringStream out;
out << value.Get<i32>() << "," << NKikimr::NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId());
return NYT::TNode(out.Str());
}
case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
TStringStream out;
out << value.Get<i64>() << "," << NKikimr::NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId());
return NYT::TNode(out.Str());
}
case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
TStringStream out;
out << value.Get<i64>() << "," << NKikimr::NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId());
return NYT::TNode(out.Str());
}
}
YQL_ENSURE(false, "Unsupported type: " << static_cast<int>(dataType->GetSchemeType()));
}
Expand Down Expand Up @@ -1084,6 +1099,57 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef());
}

case NUdf::TDataType<NUdf::TTzDate32>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
if (isTableFormat) {
i32 value;
ui16 tzId = 0;
YQL_ENSURE(DeserializeTzDate32(nextString, value, tzId));
data = NUdf::TUnboxedValuePod(value);
data.SetTimezoneId(tzId);
} else {
data = ValueFromString(NUdf::EDataSlot::TzDate32, nextString);
YQL_ENSURE(data, "incorrect tz date format for value " << nextString);
}

return data;
}

case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
if (isTableFormat) {
i64 value;
ui16 tzId = 0;
YQL_ENSURE(DeserializeTzDatetime64(nextString, value, tzId));
data = NUdf::TUnboxedValuePod(value);
data.SetTimezoneId(tzId);
} else {
data = ValueFromString(NUdf::EDataSlot::TzDatetime64, nextString);
YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString);
}

return data;
}

case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
if (isTableFormat) {
i64 value;
ui16 tzId = 0;
YQL_ENSURE(DeserializeTzTimestamp64(nextString, value, tzId));
data = NUdf::TUnboxedValuePod(value);
data.SetTimezoneId(tzId);
} else {
data = ValueFromString(NUdf::EDataSlot::TzTimestamp64, nextString);
YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString);
}

return data;
}

default:
YQL_ENSURE(false, "Unsupported data type: " << schemeType);
}
Expand Down Expand Up @@ -2198,6 +2264,39 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtType
break;
}

case NUdf::TDataType<NUdf::TTzDate32>::Id: {
ui16 tzId = SwapBytes(value.GetTimezoneId());
ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
ui32 size = sizeof(data) + sizeof(tzId);
buf.Write(StringMarker);
buf.WriteVarI32(size);
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
break;
}

case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
ui16 tzId = SwapBytes(value.GetTimezoneId());
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
ui32 size = sizeof(data) + sizeof(tzId);
buf.Write(StringMarker);
buf.WriteVarI32(size);
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
break;
}

case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
ui16 tzId = SwapBytes(value.GetTimezoneId());
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
ui32 size = sizeof(data) + sizeof(tzId);
buf.Write(StringMarker);
buf.WriteVarI32(size);
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
break;
}

case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
buf.Write(StringMarker);
NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
Expand Down Expand Up @@ -2480,6 +2579,36 @@ void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, cons
break;
}

case NUdf::TDataType<NUdf::TTzDate32>::Id: {
ui16 tzId = SwapBytes(value.GetTimezoneId());
ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
ui32 size = sizeof(data) + sizeof(tzId);
buf.WriteMany((const char*)&size, sizeof(size));
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
break;
}

case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
ui16 tzId = SwapBytes(value.GetTimezoneId());
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
ui32 size = sizeof(data) + sizeof(tzId);
buf.WriteMany((const char*)&size, sizeof(size));
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
break;
}

case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
ui16 tzId = SwapBytes(value.GetTimezoneId());
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
ui32 size = sizeof(data) + sizeof(tzId);
buf.WriteMany((const char*)&size, sizeof(size));
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
break;
}

case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
auto str = json.AsStringRef();
Expand Down
63 changes: 63 additions & 0 deletions ydb/library/yql/providers/yt/codec/codegen/ut/yt_codec_cg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,42 @@ Y_UNIT_TEST_SUITE(TYtCodegenCodec) {
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
}

Y_UNIT_TEST(TestReadTzDate32) {
// full size = 4 + 2 = 6
TStringBuf buf = "\6\0\0\0\x7F\xFF\xFE\xFE\0\1"sv;
TReadSetup setup("ReadTzDate32", buf);
typedef void(*TFunc)(TInputBuf&, void*);
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
NUdf::TUnboxedValue val;
funcPtr(setup.Buf_, &val);
UNIT_ASSERT_VALUES_EQUAL(val.Get<i32>(), -258);
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
}

Y_UNIT_TEST(TestReadTzDatetime64) {
// full size = 8 + 2 = 10
TStringBuf buf = "\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFD\0\1"sv;
TReadSetup setup("ReadTzDatetime64", buf);
typedef void(*TFunc)(TInputBuf&, void*);
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
NUdf::TUnboxedValue val;
funcPtr(setup.Buf_, &val);
UNIT_ASSERT_VALUES_EQUAL(val.Get<i64>(), -259);
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
}

Y_UNIT_TEST(TestReadTzTimestamp64) {
// full size = 8 + 2 = 10
TStringBuf buf = "\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFC\0\1"sv;
TReadSetup setup("ReadTzTimestamp64", buf);
typedef void(*TFunc)(TInputBuf&, void*);
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
NUdf::TUnboxedValue val;
funcPtr(setup.Buf_, &val);
UNIT_ASSERT_VALUES_EQUAL(val.Get<i64>(), -260);
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
}

Y_UNIT_TEST(TestWriteTzDate) {
TWriteSetup setup("WriteTzDate");
typedef void(*TFunc)(TOutputBuf&, ui16, ui16);
Expand All @@ -888,6 +924,33 @@ Y_UNIT_TEST_SUITE(TYtCodegenCodec) {
setup.Buf_.Finish();
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\x0a\0\0\0\0\0\0\0\0\0\1\4\0\1"sv).Quote());
}

Y_UNIT_TEST(TestWriteTzDate32) {
TWriteSetup setup("WriteTzDate32");
typedef void(*TFunc)(TOutputBuf&, i32, ui16);
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
funcPtr(setup.Buf_, -258, 1);
setup.Buf_.Finish();
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\6\0\0\0\x7F\xFF\xFE\xFE\0\1"sv).Quote());
}

Y_UNIT_TEST(TestWriteTzDatetime64) {
TWriteSetup setup("WriteTzDatetime64");
typedef void(*TFunc)(TOutputBuf&, i64, ui16);
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
funcPtr(setup.Buf_, -259, 1);
setup.Buf_.Finish();
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFD\0\1"sv).Quote());
}

Y_UNIT_TEST(TestWriteTzTimestamp64) {
TWriteSetup setup("WriteTzTimestamp64");
typedef void(*TFunc)(TOutputBuf&, i64, ui16);
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
funcPtr(setup.Buf_, -260, 1);
setup.Buf_.Finish();
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFC\0\1"sv).Quote());
}
#endif
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/providers/yt/codec/codegen/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,15 @@ IF (NOT MKQL_DISABLE_CODEGEN)
ReadTzDate
ReadTzDatetime
ReadTzTimestamp
ReadTzDate32
ReadTzDatetime64
ReadTzTimestamp64
WriteTzDate
WriteTzDatetime
WriteTzTimestamp
WriteTzDate32
WriteTzDatetime64
WriteTzTimestamp64
GetWrittenBytes
FillZero
)
Expand Down
69 changes: 69 additions & 0 deletions ydb/library/yql/providers/yt/codec/codegen/yt_codec_bc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,45 @@ extern "C" void ReadTzTimestamp(void* vbuf, void* vpod) {
(new (vpod) NUdf::TUnboxedValuePod(data))->SetTimezoneId(tzId);
}

extern "C" void ReadTzDate32(void* vbuf, void* vpod) {
NCommon::TInputBuf& buf = *(NCommon::TInputBuf*)vbuf;
ui32 size;
buf.ReadMany((char*)&size, sizeof(size));
ui32 data;
buf.ReadMany((char*)&data, sizeof(data));
ui16 tzId;
buf.ReadMany((char*)&tzId, sizeof(tzId));
i32 value = SwapBytes(0x80 ^ data);
tzId = SwapBytes(tzId);
(new (vpod) NUdf::TUnboxedValuePod(value))->SetTimezoneId(tzId);
}

extern "C" void ReadTzDatetime64(void* vbuf, void* vpod) {
NCommon::TInputBuf& buf = *(NCommon::TInputBuf*)vbuf;
ui32 size;
buf.ReadMany((char*)&size, sizeof(size));
ui64 data;
buf.ReadMany((char*)&data, sizeof(data));
ui16 tzId;
buf.ReadMany((char*)&tzId, sizeof(tzId));
i64 value = SwapBytes(0x80 ^ data);
tzId = SwapBytes(tzId);
(new (vpod) NUdf::TUnboxedValuePod(value))->SetTimezoneId(tzId);
}

extern "C" void ReadTzTimestamp64(void* vbuf, void* vpod) {
NCommon::TInputBuf& buf = *(NCommon::TInputBuf*)vbuf;
ui32 size;
buf.ReadMany((char*)&size, sizeof(size));
ui64 data;
buf.ReadMany((char*)&data, sizeof(data));
ui16 tzId;
buf.ReadMany((char*)&tzId, sizeof(tzId));
i64 value = SwapBytes(0x80 ^ data);
tzId = SwapBytes(tzId);
(new (vpod) NUdf::TUnboxedValuePod(value))->SetTimezoneId(tzId);
}

extern "C" void WriteTzDate(void* vbuf, ui16 value, ui16 tzId) {
value = SwapBytes(value);
tzId = SwapBytes(tzId);
Expand Down Expand Up @@ -297,6 +336,36 @@ extern "C" void WriteTzTimestamp(void* vbuf, ui64 value, ui16 tzId) {
buf.WriteMany((const char*)&tzId, sizeof(tzId));
}

extern "C" void WriteTzDate32(void* vbuf, i32 value, ui16 tzId) {
ui32 data = 0x80 ^ SwapBytes((ui32)value);
tzId = SwapBytes(tzId);
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
const ui32 size = sizeof(data) + sizeof(tzId);
buf.WriteMany((const char*)&size, sizeof(size));
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
}

extern "C" void WriteTzDatetime64(void* vbuf, i64 value, ui16 tzId) {
ui64 data = 0x80 ^ SwapBytes((ui64)value);
tzId = SwapBytes(tzId);
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
const ui32 size = sizeof(data) + sizeof(tzId);
buf.WriteMany((const char*)&size, sizeof(size));
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
}

extern "C" void WriteTzTimestamp64(void* vbuf, i64 value, ui16 tzId) {
ui64 data = 0x80 ^ SwapBytes((ui64)value);
tzId = SwapBytes(tzId);
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
const ui32 size = sizeof(data) + sizeof(tzId);
buf.WriteMany((const char*)&size, sizeof(size));
buf.WriteMany((const char*)&data, sizeof(data));
buf.WriteMany((const char*)&tzId, sizeof(tzId));
}

extern "C" ui64 GetWrittenBytes(void* vbuf) {
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
return buf.GetWrittenBytes();
Expand Down
Loading

0 comments on commit 789f39c

Please sign in to comment.