-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added test which checks that data is unchanged after changing ttl set… #13680
Merged
aavdonkin
merged 7 commits into
ydb-platform:main
from
aavdonkin:check_data_unchanged_ttl
Jan 24, 2025
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
3f7161d
Added test which checks that data is unchanged after changing ttl set…
aavdonkin 0196df4
Full answers of selects are compared
aavdonkin e8bb933
Different buckets for test
aavdonkin 14f41c8
Reduced timeout
aavdonkin 1be9437
Wait actualization in sys
aavdonkin 170ed3d
Fixed issues
aavdonkin df452ef
Removed second alters
aavdonkin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,14 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_all_rows(answer): | ||
result = [] | ||
for set in answer: | ||
result += set.rows | ||
return result | ||
|
||
|
||
class TestDeleteS3Ttl(TllTieringTestBase): | ||
''' Implements https://github.com/ydb-platform/ydb/issues/13467 ''' | ||
|
||
test_name = "delete_s3_ttl" | ||
row_count = 10 ** 7 | ||
|
@@ -22,10 +28,194 @@ def setup_class(cls): | |
cls.s3_client.create_bucket(cls.cold_bucket) | ||
cls.s3_client.create_bucket(cls.frozen_bucket) | ||
|
||
def portions_actualized_in_sys(self, table): | ||
portions = table.get_portion_stat_by_tier() | ||
logger.info(f"portions: {portions}, blobs: {table.get_blob_stat_by_tier()}") | ||
return "__DEFAULT" in portions and self.row_count <= portions["__DEFAULT"]["Rows"] | ||
|
||
def get_row_count_by_date(self, table_path: str, past_days: int) -> int: | ||
return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"] | ||
|
||
def test(self): | ||
def test_data_unchanged_after_ttl_change(self): | ||
''' Implements https://github.com/ydb-platform/ydb/issues/13542 ''' | ||
self.row_count = 100000 | ||
aavdonkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
single_upsert_row_count = 10000 | ||
test_name = 'test_data_unchanged_after_ttl_change' | ||
cold_bucket = 'cold_uc' | ||
frozen_bucket = 'frozen_uc' | ||
self.s3_client.create_bucket(cold_bucket) | ||
self.s3_client.create_bucket(frozen_bucket) | ||
test_dir = f"{self.ydb_client.database}/{test_name}" | ||
table_path = f"{test_dir}/table" | ||
secret_prefix = test_name | ||
access_key_id_secret_name = f"{secret_prefix}_key_id" | ||
access_key_secret_secret_name = f"{secret_prefix}_key_secret" | ||
cold_eds_path = f"{test_dir}/{cold_bucket}" | ||
frozen_eds_path = f"{test_dir}/{frozen_bucket}" | ||
|
||
days_to_medium = 2000 | ||
medium_bucket = 'medium' | ||
self.s3_client.create_bucket(medium_bucket) | ||
medium_eds_path = f"{test_dir}/{medium_bucket}" | ||
# Expect empty buckets to avoid unintentional data deletion/modification | ||
|
||
if self.s3_client.get_bucket_stat(cold_bucket) != (0, 0): | ||
raise Exception("Bucket for cold data is not empty") | ||
if self.s3_client.get_bucket_stat(frozen_bucket) != (0, 0): | ||
raise Exception("Bucket for frozen data is not empty") | ||
if self.s3_client.get_bucket_stat(medium_bucket) != (0, 0): | ||
raise Exception("Bucket for medium data is not empty") | ||
|
||
self.ydb_client.query(f""" | ||
CREATE TABLE `{table_path}` ( | ||
ts Timestamp NOT NULL, | ||
s String, | ||
val Uint64, | ||
PRIMARY KEY(ts), | ||
) | ||
WITH (STORE = COLUMN) | ||
""" | ||
) | ||
|
||
logger.info(f"Table {table_path} created") | ||
|
||
self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'") | ||
self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'") | ||
|
||
self.ydb_client.query(f""" | ||
CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH ( | ||
SOURCE_TYPE="ObjectStorage", | ||
LOCATION="{self.s3_client.endpoint}/{cold_bucket}", | ||
AUTH_METHOD="AWS", | ||
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", | ||
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", | ||
AWS_REGION="{self.s3_client.region}" | ||
) | ||
""") | ||
|
||
self.ydb_client.query(f""" | ||
CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH ( | ||
SOURCE_TYPE="ObjectStorage", | ||
LOCATION="{self.s3_client.endpoint}/{frozen_bucket}", | ||
AUTH_METHOD="AWS", | ||
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", | ||
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", | ||
AWS_REGION="{self.s3_client.region}" | ||
) | ||
""") | ||
|
||
self.ydb_client.query(f""" | ||
CREATE EXTERNAL DATA SOURCE `{medium_eds_path}` WITH ( | ||
SOURCE_TYPE="ObjectStorage", | ||
LOCATION="{self.s3_client.endpoint}/{medium_bucket}", | ||
AUTH_METHOD="AWS", | ||
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", | ||
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", | ||
AWS_REGION="{self.s3_client.region}" | ||
) | ||
""") | ||
table = ColumnTableHelper(self.ydb_client, table_path) | ||
|
||
cur_rows = 0 | ||
while cur_rows < self.row_count: | ||
self.ydb_client.query(""" | ||
$row_count = %i; | ||
$from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64); | ||
$to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64); | ||
$dt = $to_us - $from_us; | ||
$k = ((1ul << 64) - 1) / CAST($dt - 1 as Double); | ||
$rows= ListMap(ListFromRange(0, $row_count), ($i)->{ | ||
$us = CAST(RandomNumber($i) / $k as Uint64) + $from_us; | ||
$ts = Unwrap(CAST($us as Timestamp)); | ||
return <| | ||
ts: $ts, | ||
s: 'some date:' || CAST($ts as String), | ||
val: $us | ||
|>; | ||
}); | ||
upsert into `%s` | ||
select * FROM AS_TABLE($rows); | ||
""" % (min(self.row_count - cur_rows, single_upsert_row_count), table_path)) | ||
cur_rows = table.get_row_count() | ||
logger.info(f"{cur_rows} rows inserted in total, portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}") | ||
|
||
logger.info(f"Rows older than {self.days_to_cool} days: {self.get_row_count_by_date(table_path, self.days_to_cool)}") | ||
logger.info(f"Rows older than {self.days_to_freeze} days: {self.get_row_count_by_date(table_path, self.days_to_freeze)}") | ||
|
||
if not self.wait_for(lambda: self.portions_actualized_in_sys(table), 120): | ||
raise Exception(".sys reports incorrect data portions") | ||
|
||
answer = self.ydb_client.query(f"SELECT * from `{table_path}` ORDER BY ts") | ||
data = get_all_rows(answer) | ||
|
||
def change_ttl_and_check(days_to_cool, days_to_medium, days_to_freeze): | ||
t0 = time.time() | ||
stmt = f""" | ||
ALTER TABLE `{table_path}` SET (TTL = | ||
Interval("P{days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`, | ||
Interval("P{days_to_medium}D") TO EXTERNAL DATA SOURCE `{medium_eds_path}`, | ||
Interval("P{days_to_freeze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}` | ||
ON ts | ||
) | ||
""" | ||
logger.info(stmt) | ||
self.ydb_client.query(stmt) | ||
logger.info(f"TTL set in {time.time() - t0} seconds") | ||
|
||
def data_distributes_across_tiers(): | ||
cold_bucket_stat = self.s3_client.get_bucket_stat(cold_bucket) | ||
frozen_bucket_stat = self.s3_client.get_bucket_stat(frozen_bucket) | ||
medium_bucket_stat = self.s3_client.get_bucket_stat(medium_bucket) | ||
logger.info(f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}") | ||
# TODO FIXME | ||
# We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525 | ||
# So we wait until some data appears in any bucket | ||
return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0 or medium_bucket_stat[0] != 0 | ||
|
||
if not self.wait_for(lambda: data_distributes_across_tiers(), 120): | ||
raise Exception("Data eviction has not been started") | ||
|
||
answer1 = self.ydb_client.query(f"SELECT * from `{table_path}` ORDER BY ts") | ||
data1 = get_all_rows(answer1) | ||
logger.info("Old record count {} new record count {}".format(len(data), len(data1))) | ||
if data1 != data: | ||
raise Exception("Data changed after ttl change, was {} now {}".format(data, data1)) | ||
|
||
t0 = time.time() | ||
stmt = f""" | ||
ALTER TABLE `{table_path}` SET (TTL = | ||
Interval("PT800M") TO EXTERNAL DATA SOURCE `{cold_eds_path}`, | ||
Interval("PT850M") TO EXTERNAL DATA SOURCE `{medium_eds_path}`, | ||
Interval("PT900M") TO EXTERNAL DATA SOURCE `{frozen_eds_path}` | ||
ON ts | ||
) | ||
""" | ||
logger.info(stmt) | ||
self.ydb_client.query(stmt) | ||
logger.info(f"TTL set in {time.time() - t0} seconds") | ||
|
||
# TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523 | ||
def data_deleted_from_buckets(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тоже копипаста, кажется должно быть методом класса There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Нет, тут есть отличие, 3 бакета вместо 2 |
||
cold_bucket_stat = self.s3_client.get_bucket_stat(cold_bucket) | ||
medium_bucket_stat = self.s3_client.get_bucket_stat(medium_bucket) | ||
frozen_bucket_stat = self.s3_client.get_bucket_stat(frozen_bucket) | ||
logger.info( | ||
f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}") | ||
return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0 and medium_bucket_stat[0] == 0 | ||
|
||
if not self.wait_for(lambda: data_deleted_from_buckets(), 120): | ||
# raise Exception("not all data deleted") TODO FIXME after https://github.com/ydb-platform/ydb/issues/13535 | ||
pass | ||
answer1 = self.ydb_client.query(f"SELECT * from `{table_path}` ORDER BY ts") | ||
data1 = get_all_rows(answer1) | ||
logger.info("Old record count {} new record count {}".format(len(data), len(data1))) | ||
if data1 != data: | ||
raise Exception("Data changed after ttl change, was {} now {}".format(data, data1)) | ||
|
||
change_ttl_and_check(self.days_to_cool, days_to_medium, self.days_to_freeze) | ||
|
||
def test_ttl_delete(self): | ||
''' Implements https://github.com/ydb-platform/ydb/issues/13467 ''' | ||
test_dir = f"{self.ydb_client.database}/{self.test_name}" | ||
table_path = f"{test_dir}/table" | ||
secret_prefix = self.test_name | ||
|
@@ -105,14 +295,7 @@ def test(self): | |
logger.info(f"Rows older than {self.days_to_cool} days: {self.get_row_count_by_date(table_path, self.days_to_cool)}") | ||
logger.info(f"Rows older than {self.days_to_freeze} days: {self.get_row_count_by_date(table_path, self.days_to_freeze)}") | ||
|
||
def portions_actualized_in_sys(): | ||
portions = table.get_portion_stat_by_tier() | ||
logger.info(f"portions: {portions}, blobs: {table.get_blob_stat_by_tier()}") | ||
if len(portions) != 1 or "__DEFAULT" not in portions: | ||
raise Exception("Data not in __DEFAULT teir") | ||
return self.row_count <= portions["__DEFAULT"]["Rows"] | ||
|
||
if not self.wait_for(lambda: portions_actualized_in_sys(), 120): | ||
if not self.wait_for(lambda: self.portions_actualized_in_sys(table), 120): | ||
raise Exception(".sys reports incorrect data portions") | ||
|
||
t0 = time.time() | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тестовый бинарь похоже перестал проходить с первого раза
#13680 (comment)
Нужно чтобы проходил (тут предлагаю @zverevgeny решить как лучше поступить: перенести в large/засплитить выполнение тестов/убыстрить их)