Skip to content

Commit

Permalink
Test deleting from s3 by ttl (#13533)
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored Jan 20, 2025
1 parent 29374cb commit fdde655
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 0 deletions.
164 changes: 164 additions & 0 deletions ydb/tests/olap/ttl_tiering/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import yatest.common
import os
import time
import ydb
import logging
import boto3
import requests
from library.recipes import common as recipes_common

from ydb.tests.library.harness.kikimr_runner import KiKiMR
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator


logger = logging.getLogger(__name__)


class S3Client:
def __init__(self, endpoint, region, key_id, key_secret):
self.endpoint = endpoint
self.region = region
self.key_id = key_id
self.key_secret = key_secret

session = boto3.session.Session()
self.s3 = session.resource(
service_name="s3",
aws_access_key_id=key_id,
aws_secret_access_key=key_secret,
region_name=region,
endpoint_url=endpoint
)
self.client = session.client(
service_name="s3",
aws_access_key_id=key_id,
aws_secret_access_key=key_secret,
region_name=region,
endpoint_url=endpoint
)

def create_bucket(self, name: str):
self.client.create_bucket(Bucket=name)

def get_bucket_stat(self, bucket_name: str) -> (int, int):
bucket = self.s3.Bucket(bucket_name)
count = 0
size = 0
for obj in bucket.objects.all():
count += 1
size += obj.size
return (count, size)


class YdbClient:
def __init__(self, endpoint, database):
self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
self.database = database
self.session_pool = ydb.QuerySessionPool(self.driver)

def stop(self):
self.session_pool.stop()
self.driver.stop()

def wait_connection(self, timeout=5):
self.driver.wait(timeout, fail_fast=True)

def query(self, statement):
return self.session_pool.execute_with_retries(statement)


class ColumnTableHelper:
def __init__(self, ydb_client: YdbClient, path: str):
self.ydb_client = ydb_client
self.path = path

def get_row_count(self) -> int:
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}`")[0].rows[0]["Rows"]

def get_portion_count(self) -> int:
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"]

def get_portion_stat_by_tier(self) -> dict[str, dict[str, int]]:
results = self.ydb_client.query(f"select TierName, sum(Rows) as Rows, count(*) as Portions from `{self.path}/.sys/primary_index_portion_stats` group by TierName")
return {row["TierName"]: {"Rows": row["Rows"], "Portions": row["Portions"]} for result_set in results for row in result_set.rows}

def get_blob_stat_by_tier(self) -> dict[str, (int, int)]:
stmt = f"""
select TierName, count(*) as Portions, sum(BlobSize) as BlobSize, sum(BlobCount) as BlobCount from (
select TabletId, PortionId, TierName, sum(BlobRangeSize) as BlobSize, count(*) as BlobCount from `{self.path}/.sys/primary_index_stats` group by TabletId, PortionId, TierName
) group by TierName
"""
results = self.ydb_client.query(stmt)
return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows}


class TllTieringTestBase(object):
@classmethod
def setup_class(cls):
cls._setup_ydb()
cls._setup_s3()

@classmethod
def teardown_class(cls):
recipes_common.stop_daemon(cls.s3_pid)
cls.ydb_client.stop()
cls.cluster.stop()

@classmethod
def _setup_ydb(cls):
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
config = KikimrConfigGenerator(
extra_feature_flags={
"enable_external_data_sources": True,
"enable_tiering_in_column_shard": True
},
column_shard_config={
"lag_for_compaction_before_tierings_ms": 0,
"compaction_actualization_lag_ms": 0,
"optimizer_freshness_check_duration_ms": 0,
"small_portion_detect_size_limit": 0,
}
)
cls.cluster = KiKiMR(config)
cls.cluster.start()
node = cls.cluster.nodes[1]
cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
cls.ydb_client.wait_connection()

@classmethod
def _setup_s3(cls):
s3_pid_file = "s3.pid"
moto_server_path = os.environ["MOTO_SERVER_PATH"]

port_manager = yatest.common.network.PortManager()
port = port_manager.get_port()
endpoint = f"http://localhost:{port}"
command = [yatest.common.binary_path(moto_server_path), "s3", "--port", str(port)]

def is_s3_ready():
try:
response = requests.get(endpoint)
response.raise_for_status()
return True
except requests.RequestException as err:
logging.debug(err)
return False

recipes_common.start_daemon(
command=command, environment=None, is_alive_check=is_s3_ready, pid_file_name=s3_pid_file
)

with open(s3_pid_file, 'r') as f:
cls.s3_pid = int(f.read())

cls.s3_client = S3Client(endpoint, "us-east-1", "fake_key_id", "fake_key_secret")

@staticmethod
def wait_for(condition_func, timeout_seconds):
t0 = time.time()
while time.time() - t0 < timeout_seconds:
if condition_func():
return True
time.sleep(1)
return False
163 changes: 163 additions & 0 deletions ydb/tests/olap/ttl_tiering/ttl_delete_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import time
import logging
from .base import TllTieringTestBase, ColumnTableHelper

logger = logging.getLogger(__name__)


class TestDeleteS3Ttl(TllTieringTestBase):
''' Implements https://github.com/ydb-platform/ydb/issues/13467 '''

test_name = "delete_s3_ttl"
row_count = 10 ** 7
single_upsert_row_count = 10 ** 6
cold_bucket = "cold"
frozen_bucket = "frozen"
days_to_cool = 1000
days_to_freeze = 3000

@classmethod
def setup_class(cls):
super(TestDeleteS3Ttl, cls).setup_class()
cls.s3_client.create_bucket(cls.cold_bucket)
cls.s3_client.create_bucket(cls.frozen_bucket)

def get_row_count_by_date(self, table_path: str, past_days: int) -> int:
return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"]

def test(self):
test_dir = f"{self.ydb_client.database}/{self.test_name}"
table_path = f"{test_dir}/table"
secret_prefix = self.test_name
access_key_id_secret_name = f"{secret_prefix}_key_id"
access_key_secret_secret_name = f"{secret_prefix}_key_secret"
cold_eds_path = f"{test_dir}/{self.cold_bucket}"
frozen_eds_path = f"{test_dir}/{self.frozen_bucket}"

# Expect empty buckets to avoid unintentional data deletion/modification
if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0):
raise Exception("Bucket for cold data is not empty")
if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0):
raise Exception("Bucket for frozen data is not empty")

self.ydb_client.query(f"""
CREATE TABLE `{table_path}` (
ts Timestamp NOT NULL,
s String,
val Uint64,
PRIMARY KEY(ts),
)
WITH (STORE = COLUMN)
"""
)

logger.info(f"Table {table_path} created")

self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'")
self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'")

self.ydb_client.query(f"""
CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
AWS_REGION="{self.s3_client.region}"
)
""")

self.ydb_client.query(f"""
CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{self.s3_client.endpoint}/{self.frozen_bucket}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
AWS_REGION="{self.s3_client.region}"
)
""")
table = ColumnTableHelper(self.ydb_client, table_path)

cur_rows = 0
while cur_rows < self.row_count:
self.ydb_client.query("""
$row_count = %i;
$from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64);
$to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64);
$dt = $to_us - $from_us;
$k = ((1ul << 64) - 1) / CAST($dt - 1 as Double);
$rows= ListMap(ListFromRange(0, $row_count), ($i)->{
$us = CAST(RandomNumber($i) / $k as Uint64) + $from_us;
$ts = Unwrap(CAST($us as Timestamp));
return <|
ts: $ts,
s: 'some date:' || CAST($ts as String),
val: $us
|>;
});
upsert into `%s`
select * FROM AS_TABLE($rows);
""" % (min(self.row_count - cur_rows, self.single_upsert_row_count), table_path))
cur_rows = table.get_row_count()
logger.info(f"{cur_rows} rows inserted in total, portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}")

logger.info(f"Rows older than {self.days_to_cool} days: {self.get_row_count_by_date(table_path, self.days_to_cool)}")
logger.info(f"Rows older than {self.days_to_freeze} days: {self.get_row_count_by_date(table_path, self.days_to_freeze)}")

def portions_actualized_in_sys():
portions = table.get_portion_stat_by_tier()
logger.info(f"portions: {portions}, blobs: {table.get_blob_stat_by_tier()}")
if len(portions) != 1 or "__DEFAULT" not in portions:
raise Exception("Data not in __DEFAULT teir")
return self.row_count <= portions["__DEFAULT"]["Rows"]

if not self.wait_for(lambda: portions_actualized_in_sys(), 120):
raise Exception(".sys reports incorrect data portions")

t0 = time.time()
stmt = f"""
ALTER TABLE `{table_path}` SET (TTL =
Interval("P{self.days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`,
Interval("P{self.days_to_freeze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}`
ON ts
)
"""
logger.info(stmt)
self.ydb_client.query(stmt)
logger.info(f"TTL set in {time.time() - t0} seconds")

def data_distributes_across_tiers():
cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket)
frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket)
logger.info(f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
# TODO FIXME
# We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525
# So we wait until some data appears in any bucket
return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0

if not self.wait_for(lambda: data_distributes_across_tiers(), 600):
raise Exception("Data eviction has not been started")

t0 = time.time()
stmt = f"""
ALTER TABLE `{table_path}` SET (TTL =
Interval("P{self.days_to_cool}D")
ON ts
)
"""
logger.info(stmt)
self.ydb_client.query(stmt)
logger.info(f"TTL set in {time.time() - t0} seconds")

# TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523
def data_deleted_from_buckets():
cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket)
frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket)
logger.info(
f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0

if not self.wait_for(lambda: data_deleted_from_buckets(), 120):
# raise Exception("not all data deleted") TODO FIXME after https://github.com/ydb-platform/ydb/issues/13535
pass
27 changes: 27 additions & 0 deletions ydb/tests/olap/ttl_tiering/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
PY3TEST()
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server")
ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG")

TEST_SRCS(
base.py
ttl_delete_s3.py
)

SIZE(MEDIUM)

PEERDIR(
ydb/tests/library
ydb/public/sdk/python
ydb/public/sdk/python/enable_v3_new_behavior
contrib/python/boto3
library/recipes/common
)

DEPENDS(
ydb/apps/ydbd
contrib/python/moto/bin
)

END()

1 change: 1 addition & 0 deletions ydb/tests/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ RECURSE(
scenario
docs
load
ttl_tiering
)

0 comments on commit fdde655

Please sign in to comment.