From 5a7ea75cd93b5f5eb02b7c384748109a86585d89 Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Thu, 30 May 2024 17:56:20 +0300 Subject: [PATCH] Add HDD and SSD limit_bytes counters (#4765) --- ydb/core/protos/counters_schemeshard.proto | 3 + .../tablet/tablet_counters_aggregator.cpp | 12 + ydb/core/tx/schemeshard/schemeshard_impl.cpp | 22 +- ydb/core/tx/schemeshard/schemeshard_impl.h | 3 +- .../tx/schemeshard/schemeshard_info_types.cpp | 54 +++- .../tx/schemeshard/schemeshard_info_types.h | 23 +- .../functional/tenants/test_db_counters.py | 281 ++++++++++++++++-- ydb/tests/library/harness/daemon.py | 48 +-- ydb/tests/library/harness/ydb_fixtures.py | 26 +- 9 files changed, 380 insertions(+), 92 deletions(-) diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 1dd87237b3de..07820555042b 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -212,6 +212,9 @@ enum ESimpleCounters { COUNTER_DISK_SPACE_TABLES_DATA_BYTES_ON_HDD = 168 [(CounterOpts) = {Name: "DiskSpaceTablesDataBytesOnHdd"}]; COUNTER_DISK_SPACE_TABLES_INDEX_BYTES_ON_HDD = 169 [(CounterOpts) = {Name: "DiskSpaceTablesIndexBytesOnHdd"}]; COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES_ON_HDD = 170 [(CounterOpts) = {Name: "DiskSpaceTablesTotalBytesOnHdd"}]; + + COUNTER_DISK_SPACE_SOFT_QUOTA_BYTES_ON_SSD = 171 [(CounterOpts) = {Name: "DiskSpaceSoftQuotaBytesOnSsd"}]; + COUNTER_DISK_SPACE_SOFT_QUOTA_BYTES_ON_HDD = 172 [(CounterOpts) = {Name: "DiskSpaceSoftQuotaBytesOnHdd"}]; } enum ECumulativeCounters { diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp index c50dc994205c..bf77a032fa90 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator.cpp @@ -773,6 +773,8 @@ class TTabletMon { TCounterPtr ResourcesStorageUsedBytesOnSsd; TCounterPtr ResourcesStorageUsedBytesOnHdd; TCounterPtr ResourcesStorageLimitBytes; + TCounterPtr ResourcesStorageLimitBytesOnSsd; + TCounterPtr ResourcesStorageLimitBytesOnHdd; TCounterPtr ResourcesStorageTableUsedBytes; TCounterPtr ResourcesStorageTableUsedBytesOnSsd; TCounterPtr ResourcesStorageTableUsedBytesOnHdd; @@ -814,6 +816,8 @@ class TTabletMon { TCounterPtr DiskSpaceTablesTotalBytesOnHdd; TCounterPtr DiskSpaceTopicsTotalBytes; TCounterPtr DiskSpaceSoftQuotaBytes; + TCounterPtr DiskSpaceSoftQuotaBytesOnSsd; + TCounterPtr DiskSpaceSoftQuotaBytesOnHdd; TCounterPtr StreamShardsCount; TCounterPtr StreamShardsQuota; @@ -872,6 +876,10 @@ class TTabletMon { "resources.storage.used_bytes.hdd", false); ResourcesStorageLimitBytes = ydbGroup->GetNamedCounter("name", "resources.storage.limit_bytes", false); + ResourcesStorageLimitBytesOnSsd = ydbGroup->GetNamedCounter("name", + "resources.storage.limit_bytes.ssd", false); + ResourcesStorageLimitBytesOnHdd = ydbGroup->GetNamedCounter("name", + "resources.storage.limit_bytes.hdd", false); ResourcesStorageTableUsedBytes = ydbGroup->GetNamedCounter("name", "resources.storage.table.used_bytes", false); ResourcesStorageTableUsedBytesOnSsd = ydbGroup->GetNamedCounter("name", @@ -948,6 +956,8 @@ class TTabletMon { DiskSpaceTablesTotalBytesOnHdd = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTablesTotalBytesOnHdd)"); DiskSpaceTopicsTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTopicsTotalBytes)"); DiskSpaceSoftQuotaBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceSoftQuotaBytes)"); + DiskSpaceSoftQuotaBytesOnSsd = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceSoftQuotaBytesOnSsd)"); + DiskSpaceSoftQuotaBytesOnHdd = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceSoftQuotaBytesOnHdd)"); StreamShardsCount = appGroup->GetCounter("SUM(SchemeShard/StreamShardsCount)"); StreamShardsQuota = appGroup->GetCounter("SUM(SchemeShard/StreamShardsQuota)"); @@ -988,6 +998,8 @@ class TTabletMon { if (DiskSpaceTablesTotalBytes) { ResourcesStorageLimitBytes->Set(DiskSpaceSoftQuotaBytes->Val()); + ResourcesStorageLimitBytesOnSsd->Set(DiskSpaceSoftQuotaBytesOnSsd->Val()); + ResourcesStorageLimitBytesOnHdd->Set(DiskSpaceSoftQuotaBytesOnHdd->Val()); ResourcesStorageTableUsedBytes->Set(DiskSpaceTablesTotalBytes->Val()); ResourcesStorageTableUsedBytesOnSsd->Set(DiskSpaceTablesTotalBytesOnSsd->Val()); ResourcesStorageTableUsedBytesOnHdd->Set(DiskSpaceTablesTotalBytesOnHdd->Val()); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index d4ab9de1ea60..439d43e042eb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -7222,15 +7222,15 @@ void TSchemeShard::ChangeDiskSpaceTablesTotalBytes(i64 delta) { TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES].Add(delta); } -void TSchemeShard::ChangeDiskSpaceTables(EUserFacingStorageType storageType, i64 dataDelta, i64 indexDelta) { +void TSchemeShard::AddDiskSpaceTables(EUserFacingStorageType storageType, ui64 data, ui64 index) { if (storageType == EUserFacingStorageType::Ssd) { - TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_DATA_BYTES_ON_SSD].Add(dataDelta); - TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_INDEX_BYTES_ON_SSD].Add(indexDelta); - TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES_ON_SSD].Add(dataDelta + indexDelta); + TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_DATA_BYTES_ON_SSD].Add(data); + TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_INDEX_BYTES_ON_SSD].Add(index); + TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES_ON_SSD].Add(data + index); } else if (storageType == EUserFacingStorageType::Hdd) { - TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_DATA_BYTES_ON_HDD].Add(dataDelta); - TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_INDEX_BYTES_ON_HDD].Add(indexDelta); - TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES_ON_HDD].Add(dataDelta + indexDelta); + TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_DATA_BYTES_ON_HDD].Add(data); + TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_INDEX_BYTES_ON_HDD].Add(index); + TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES_ON_HDD].Add(data + index); } } @@ -7250,6 +7250,14 @@ void TSchemeShard::ChangeDiskSpaceSoftQuotaBytes(i64 delta) { TabletCounters->Simple()[COUNTER_DISK_SPACE_SOFT_QUOTA_BYTES].Add(delta); } +void TSchemeShard::AddDiskSpaceSoftQuotaBytes(EUserFacingStorageType storageType, ui64 addend) { + if (storageType == EUserFacingStorageType::Ssd) { + TabletCounters->Simple()[COUNTER_DISK_SPACE_SOFT_QUOTA_BYTES_ON_SSD].Add(addend); + } else if (storageType == EUserFacingStorageType::Hdd) { + TabletCounters->Simple()[COUNTER_DISK_SPACE_SOFT_QUOTA_BYTES_ON_HDD].Add(addend); + } +} + void TSchemeShard::Handle(TEvSchemeShard::TEvLogin::TPtr &ev, const TActorContext &ctx) { Execute(CreateTxLogin(ev), ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index dfa710b88163..109cbe1b23a0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1380,11 +1380,12 @@ class TSchemeShard void ChangeDiskSpaceTablesDataBytes(i64 delta) override; void ChangeDiskSpaceTablesIndexBytes(i64 delta) override; void ChangeDiskSpaceTablesTotalBytes(i64 delta) override; - void ChangeDiskSpaceTables(EUserFacingStorageType storageType, i64 dataDelta, i64 indexDelta) override; + void AddDiskSpaceTables(EUserFacingStorageType storageType, ui64 data, ui64 index) override; void ChangeDiskSpaceTopicsTotalBytes(ui64 value) override; void ChangeDiskSpaceQuotaExceeded(i64 delta) override; void ChangeDiskSpaceHardQuotaBytes(i64 delta) override; void ChangeDiskSpaceSoftQuotaBytes(i64 delta) override; + void AddDiskSpaceSoftQuotaBytes(EUserFacingStorageType storageType, ui64 addend) override; NLogin::TLoginProvider LoginProvider; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 8127ca4f2009..949ad97cea8a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -174,6 +174,48 @@ bool TSubDomainInfo::CheckDiskSpaceQuotas(IQuotaCounters* counters) { return false; } +void TSubDomainInfo::CountDiskSpaceQuotas(IQuotaCounters* counters, const TDiskSpaceQuotas& quotas) { + if (quotas.HardQuota != 0) { + counters->ChangeDiskSpaceHardQuotaBytes(quotas.HardQuota); + } + if (quotas.SoftQuota != 0) { + counters->ChangeDiskSpaceSoftQuotaBytes(quotas.SoftQuota); + } + for (const auto& [poolKind, poolQuotas] : quotas.StoragePoolsQuotas) { + if (poolQuotas.SoftQuota != 0) { + counters->AddDiskSpaceSoftQuotaBytes(GetUserFacingStorageType(poolKind), poolQuotas.SoftQuota); + } + } +} + +void TSubDomainInfo::CountDiskSpaceQuotas(IQuotaCounters* counters, const TDiskSpaceQuotas& prev, const TDiskSpaceQuotas& next) { + i64 hardDelta = next.HardQuota - prev.HardQuota; + if (hardDelta != 0) { + counters->ChangeDiskSpaceHardQuotaBytes(hardDelta); + } + i64 softDelta = next.SoftQuota - prev.SoftQuota; + if (softDelta != 0) { + counters->ChangeDiskSpaceSoftQuotaBytes(softDelta); + } + for (const auto& [poolKind, newPoolQuotas] : next.StoragePoolsQuotas) { + const auto* oldPoolQuotas = prev.StoragePoolsQuotas.FindPtr(poolKind); + ui64 addend = newPoolQuotas.SoftQuota - (oldPoolQuotas ? oldPoolQuotas->SoftQuota : 0u); + if (addend != 0u) { + counters->AddDiskSpaceSoftQuotaBytes(GetUserFacingStorageType(poolKind), addend); + } + } + for (const auto& [poolKind, oldPoolQuotas] : prev.StoragePoolsQuotas) { + if (const auto* newPoolQuotas = next.StoragePoolsQuotas.FindPtr(poolKind); + !newPoolQuotas + ) { + ui64 addend = -oldPoolQuotas.SoftQuota; + if (addend != 0u) { + counters->AddDiskSpaceSoftQuotaBytes(GetUserFacingStorageType(poolKind), addend); + } + } + } +} + void TSubDomainInfo::AggrDiskSpaceUsage(IQuotaCounters* counters, const TPartitionStats& newAggr, const TPartitionStats& oldAggr) { DiskSpaceUsage.Tables.DataSize += (newAggr.DataSize - oldAggr.DataSize); counters->ChangeDiskSpaceTablesDataBytes(newAggr.DataSize - oldAggr.DataSize); @@ -188,21 +230,21 @@ void TSubDomainInfo::AggrDiskSpaceUsage(IQuotaCounters* counters, const TPartiti for (const auto& [poolKind, newStoragePoolStats] : newAggr.StoragePoolsStats) { const auto* oldStats = oldAggr.StoragePoolsStats.FindPtr(poolKind); - const i64 dataSizeIncrement = newStoragePoolStats.DataSize - (oldStats ? oldStats->DataSize : 0u); - const i64 indexSizeIncrement = newStoragePoolStats.IndexSize - (oldStats ? oldStats->IndexSize : 0u); + const ui64 dataSizeIncrement = newStoragePoolStats.DataSize - (oldStats ? oldStats->DataSize : 0u); + const ui64 indexSizeIncrement = newStoragePoolStats.IndexSize - (oldStats ? oldStats->IndexSize : 0u); auto& [dataSize, indexSize] = DiskSpaceUsage.StoragePoolsUsage[poolKind]; dataSize += dataSizeIncrement; indexSize += indexSizeIncrement; - counters->ChangeDiskSpaceTables(GetUserFacingStorageType(poolKind), dataSizeIncrement, indexSizeIncrement); + counters->AddDiskSpaceTables(GetUserFacingStorageType(poolKind), dataSizeIncrement, indexSizeIncrement); } for (const auto& [poolKind, oldStoragePoolStats] : oldAggr.StoragePoolsStats) { if (const auto* newStats = newAggr.StoragePoolsStats.FindPtr(poolKind); !newStats) { - const i64 dataSizeDecrement = oldStoragePoolStats.DataSize; - const i64 indexSizeDecrement = oldStoragePoolStats.IndexSize; + const ui64 dataSizeDecrement = oldStoragePoolStats.DataSize; + const ui64 indexSizeDecrement = oldStoragePoolStats.IndexSize; auto& [dataSize, indexSize] = DiskSpaceUsage.StoragePoolsUsage[poolKind]; dataSize -= dataSizeDecrement; indexSize -= indexSizeDecrement; - counters->ChangeDiskSpaceTables(GetUserFacingStorageType(poolKind), -dataSizeDecrement, -indexSizeDecrement); + counters->AddDiskSpaceTables(GetUserFacingStorageType(poolKind), -dataSizeDecrement, -indexSizeDecrement); } } } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 1c39c38faac6..ee48a2b0bf5e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -1295,11 +1295,12 @@ struct IQuotaCounters { virtual void ChangeDiskSpaceTablesDataBytes(i64 delta) = 0; virtual void ChangeDiskSpaceTablesIndexBytes(i64 delta) = 0; virtual void ChangeDiskSpaceTablesTotalBytes(i64 delta) = 0; - virtual void ChangeDiskSpaceTables(EUserFacingStorageType storageType, i64 dataDelta, i64 indexDelta) = 0; + virtual void AddDiskSpaceTables(EUserFacingStorageType storageType, ui64 data, ui64 index) = 0; virtual void ChangeDiskSpaceTopicsTotalBytes(ui64 value) = 0; virtual void ChangeDiskSpaceQuotaExceeded(i64 delta) = 0; virtual void ChangeDiskSpaceHardQuotaBytes(i64 delta) = 0; virtual void ChangeDiskSpaceSoftQuotaBytes(i64 delta) = 0; + virtual void AddDiskSpaceSoftQuotaBytes(EUserFacingStorageType storageType, ui64 addend) = 0; }; struct TSubDomainInfo: TSimpleRefCount { @@ -1631,25 +1632,9 @@ struct TSubDomainInfo: TSimpleRefCount { return TDuration::Seconds(DatabaseQuotas->ttl_min_run_internal_seconds()); } - static void CountDiskSpaceQuotas(IQuotaCounters* counters, const TDiskSpaceQuotas& quotas) { - if (quotas.HardQuota != 0) { - counters->ChangeDiskSpaceHardQuotaBytes(quotas.HardQuota); - } - if (quotas.SoftQuota != 0) { - counters->ChangeDiskSpaceSoftQuotaBytes(quotas.SoftQuota); - } - } + static void CountDiskSpaceQuotas(IQuotaCounters* counters, const TDiskSpaceQuotas& quotas); - static void CountDiskSpaceQuotas(IQuotaCounters* counters, const TDiskSpaceQuotas& prev, const TDiskSpaceQuotas& next) { - i64 hardDelta = i64(next.HardQuota) - i64(prev.HardQuota); - if (hardDelta != 0) { - counters->ChangeDiskSpaceHardQuotaBytes(hardDelta); - } - i64 softDelta = i64(next.SoftQuota) - i64(prev.SoftQuota); - if (softDelta != 0) { - counters->ChangeDiskSpaceSoftQuotaBytes(softDelta); - } - } + static void CountDiskSpaceQuotas(IQuotaCounters* counters, const TDiskSpaceQuotas& prev, const TDiskSpaceQuotas& next); static void CountStreamShardsQuota(IQuotaCounters* counters, const i64 delta) { counters->ChangeStreamShardsQuota(delta); diff --git a/ydb/tests/functional/tenants/test_db_counters.py b/ydb/tests/functional/tenants/test_db_counters.py index 7d323486c3a6..a5340adbdd2f 100644 --- a/ydb/tests/functional/tenants/test_db_counters.py +++ b/ydb/tests/functional/tenants/test_db_counters.py @@ -3,22 +3,37 @@ import os import time import requests +import subprocess +from google.protobuf import json_format +import pytest -from hamcrest import ( - assert_that, - equal_to, - greater_than, - not_none -) +from hamcrest import assert_that, equal_to, greater_than, not_none +from ydb.tests.library.common.msgbus_types import MessageBusStatus +from ydb.tests.library.common.protobuf_ss import AlterTableRequest from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.util import LogLevels +from ydb.tests.library.harness.ydb_fixtures import ydb_database_ctx +from ydb.tests.library.matchers.response_matchers import ProtobufWithStatusMatcher from ydb.tests.oss.ydb_sdk_import import ydb logger = logging.getLogger(__name__) +def get_db_counters(mon_port, service): + counters_url = f"http://localhost:{mon_port}/counters/counters%3D{service}/json" + reply = requests.get(counters_url) + if reply.status_code == 204: + return None + + assert_that(reply.status_code, equal_to(200)) + counters = reply.json() + + assert_that(counters, not_none()) + return counters + + class BaseDbCounters(object): @classmethod def setup_class(cls): @@ -66,19 +81,6 @@ def create_table(self, driver, table): ) ) - def get_db_counters(self, service, node_index=0): - mon_port = self.cluster.slots[node_index + 1].mon_port - counters_url = 'http://localhost:{}/counters/counters%3D{}/json'.format(mon_port, service) - reply = requests.get(counters_url) - if reply.status_code == 204: - return None - - assert_that(reply.status_code, equal_to(200)) - ret = reply.json() - - assert_that(ret, not_none()) - return ret - def check_db_counters(self, sensors_to_check, group): table = os.path.join(self.database, 'table') @@ -98,7 +100,7 @@ def check_db_counters(self, sensors_to_check, group): for i in range(30): checked = 0 - counters = self.get_db_counters('db') + counters = get_db_counters(self.cluster.slots[1].mon_port, 'db') if counters: sensors = counters['sensors'] for sensor in sensors: @@ -129,3 +131,242 @@ def test_case(self): 'Responses/Bytes', } self.check_db_counters(sensors_to_check, 'kqp') + + +@pytest.fixture(scope="function") +def ydb_database(ydb_cluster, ydb_root, ydb_safe_test_name): + database = os.path.join(ydb_root, ydb_safe_test_name) + + with ydb_database_ctx(ydb_cluster, database, storage_pools={"hdd": 1, "hdd1": 1}): + yield database + + +def ydbcli_db_schema_exec(node, operation_proto): + endpoint = f"{node.host}:{node.port}" + args = [ + node.binary_path, + f"--server=grpc://{endpoint}", + "db", + "schema", + "exec", + operation_proto, + ] + command = subprocess.run(args, capture_output=True) + assert command.returncode == 0, command.stderr.decode("utf-8") + + +def alter_database_quotas(node, database_path, database_quotas): + logger.debug(f"adding storage quotas to db {database_path}") + alter_proto = """ModifyScheme { + OperationType: ESchemeOpAlterExtSubDomain + WorkingDir: "%s" + SubDomain { + Name: "%s" + DatabaseQuotas { + %s + } + } + }""" % ( + os.path.dirname(database_path), + os.path.basename(database_path), + database_quotas, + ) + + ydbcli_db_schema_exec(node, alter_proto) + + +def create_table(session, table): + session.execute_scheme( + f""" + CREATE TABLE `{table}` ( + key Int32, + value String FAMILY custom, + PRIMARY KEY (key), + FAMILY default (DATA = "hdd"), + FAMILY custom (DATA = "hdd1") + ); + """ + ) + + +def alter_partition_config(client, table, partition_config): + response = client.send_and_poll_request( + AlterTableRequest(os.path.dirname(table), os.path.basename(table)) + .with_partition_config(partition_config) + .protobuf + ) + assert_that(response, ProtobufWithStatusMatcher(MessageBusStatus.MSTATUS_OK)) + + +def insert_data(session, table): + session.transaction().execute( + f""" + UPSERT INTO `{table}` ( + key, + value + ) + VALUES + (1, "foo"), + (2, "bar"), + (3, "baz"); + """, + commit_tx=True, + ) + + +def drop_table(session, table): + session.drop_table(table) + + +def describe(client, path): + return client.describe(path, token="") + + +def check_disk_quota_exceedance(client, database, retries, sleep_duration): + for attempt in range(retries): + path_description = describe(client, database) + domain_description = path_description.PathDescription.DomainDescription + quota_exceeded = domain_description.DomainState.DiskQuotaExceeded + logger.debug( + f"attempt: {attempt}\n" + f"database storage usage: {domain_description.DiskSpaceUsage}" + f"quotas: {domain_description.DatabaseQuotas}" + f"quota exceedance state: {quota_exceeded}" + ) + if quota_exceeded: + return + time.sleep(sleep_duration) + + assert False, "database did not move into DiskQuotaExceeded state" + + +def check_counters(mon_port, sensors_to_check, retries, sleep_duration): + for attempt in range(retries + 1): + counters = get_db_counters(mon_port, "ydb") + correct_sensors = 0 + if counters: + for sensor in counters["sensors"]: + for target_name, expected_value in sensors_to_check.items(): + if sensor["labels"]["name"] == target_name: + logger.debug( + f"sensor {target_name}: expected {expected_value}, " + f'got {sensor["value"]} in {sleep_duration * attempt} seconds' + ) + if sensor["value"] == expected_value: + correct_sensors += 1 + if correct_sensors == len(sensors_to_check): + return + + logger.debug( + f"got {correct_sensors} out of {len(sensors_to_check)} correct sensors " + f"in {sleep_duration * attempt} seconds" + ) + time.sleep(sleep_duration) + + assert False, ( + f"didn't receive expected values for sensors {sensors_to_check.keys()} " + f"in {sleep_duration * retries} seconds" + ) + + +class TestStorageCounters: + def test_storage_counters(self, ydb_cluster, ydb_database, ydb_client_session): + database_path = ydb_database + node = ydb_cluster.nodes[1] + + alter_database_quotas( + node, + database_path, + """ + storage_quotas { + unit_kind: "hdd" + data_size_hard_quota: 2 + data_size_soft_quota: 1 + } + storage_quotas { + unit_kind: "hdd1" + data_size_hard_quota: 20 + data_size_soft_quota: 10 + } + """, + ) + + client = ydb_cluster.client + quotas = describe(client, database_path).PathDescription.DomainDescription.DatabaseQuotas.storage_quotas + assert len(quotas) == 2 + assert json_format.MessageToDict(quotas[0], preserving_proto_field_name=True) == { + "unit_kind": "hdd", + "data_size_hard_quota": "2", + "data_size_soft_quota": "1", + } + assert json_format.MessageToDict(quotas[1], preserving_proto_field_name=True) == { + "unit_kind": "hdd1", + "data_size_hard_quota": "20", + "data_size_soft_quota": "10", + } + + slot_mon_port = ydb_cluster.slots[1].mon_port + # Note 1: limit_bytes is equal to the database's SOFT quota + # Note 2: .hdd counter aggregates quotas across all storage pool kinds with prefix "hdd", i.e. "hdd" and "hdd1" + # Note 3: 200 seconds can sometimes be not enough + check_counters(slot_mon_port, {"resources.storage.limit_bytes.hdd": 11}, retries=60, sleep_duration=5) + + pool = ydb_client_session(database_path) + with pool.checkout() as session: + table = os.path.join(database_path, "table") + + create_table(session, table) + + old_partition_config = describe(client, table).PathDescription.Table.PartitionConfig + # this forces MemTable to be written out to the storage pools sooner + old_partition_config.CompactionPolicy.InMemForceSizeToSnapshot = 1 + alter_partition_config(client, table, old_partition_config) + new_partition_config = describe(client, table).PathDescription.Table.PartitionConfig + assert_that(new_partition_config.CompactionPolicy.InMemForceSizeToSnapshot, equal_to(1)) + + insert_data(session, table) + check_disk_quota_exceedance(client, database_path, retries=10, sleep_duration=5) + + usage = describe(client, table).PathDescription.TableStats.StoragePools.PoolsUsage + assert len(usage) == 2 + assert json_format.MessageToDict(usage[0], preserving_proto_field_name=True) == { + "PoolKind": "hdd", + "DataSize": "50", + "IndexSize": "0", + } + assert json_format.MessageToDict(usage[1], preserving_proto_field_name=True) == { + "PoolKind": "hdd1", + "DataSize": "71", + "IndexSize": "0", + } + + # Note: .hdd counter aggregates usage across all storage pool kinds with prefix "hdd", i.e. "hdd" and "hdd1" + check_counters( + slot_mon_port, + { + "resources.storage.used_bytes": 121, + "resources.storage.used_bytes.ssd": 0, + "resources.storage.used_bytes.hdd": 121, + "resources.storage.table.used_bytes": 121, + "resources.storage.table.used_bytes.ssd": 0, + "resources.storage.table.used_bytes.hdd": 121, + }, + retries=60, + sleep_duration=5, + ) + + drop_table(session, table) + + check_counters( + slot_mon_port, + { + "resources.storage.used_bytes": 0, + "resources.storage.used_bytes.ssd": 0, + "resources.storage.used_bytes.hdd": 0, + "resources.storage.table.used_bytes": 0, + "resources.storage.table.used_bytes.ssd": 0, + "resources.storage.table.used_bytes.hdd": 0, + }, + retries=60, + sleep_duration=5, + ) diff --git a/ydb/tests/library/harness/daemon.py b/ydb/tests/library/harness/daemon.py index 3c0ee0419f46..f5e2885545d6 100644 --- a/ydb/tests/library/harness/daemon.py +++ b/ydb/tests/library/harness/daemon.py @@ -35,26 +35,22 @@ def extract_stderr_details(stderr_file, max_lines=0): class DaemonError(RuntimeError): def __init__(self, message, stdout, stderr, exit_code, max_stderr_lines=0): - super(DaemonError, self).__init__( '\n'.join( [ "Daemon failed with message: {message}.".format(message=message), "Process exit_code = {exit_code}.".format(exit_code=exit_code), "Stdout file name: \n{}".format(stdout if stdout is not None else "is not present."), - "Stderr file name: \n{}".format(stderr if stderr is not None else "is not present.") - ] + extract_stderr_details(stderr, max_stderr_lines) + "Stderr file name: \n{}".format(stderr if stderr is not None else "is not present."), + ] + + extract_stderr_details(stderr, max_stderr_lines) ) ) class SeveralDaemonErrors(RuntimeError): def __init__(self, exceptions): - super(SeveralDaemonErrors, self).__init__( - "\n".join( - str(x) for x in exceptions - ) - ) + super(SeveralDaemonErrors, self).__init__("\n".join(str(x) for x in exceptions)) class Daemon(object): @@ -67,7 +63,7 @@ def __init__( stdout_file=yatest_common.work_path('stdout'), stderr_file=yatest_common.work_path('stderr'), stderr_on_error_lines=0, - core_pattern=None + core_pattern=None, ): self.__cwd = cwd self.__timeout = timeout @@ -123,7 +119,7 @@ def start(self): stdout=self.__stdout_file, stderr=stderr_stream, wait=False, - core_pattern=self.__core_pattern + core_pattern=self.__core_pattern, ) wait_for(self.is_alive, self.__timeout) @@ -225,7 +221,14 @@ class ExternalNodeDaemon(object): def __init__(self, host): self._host = host self._ssh_username = param_constants.ssh_username - self._ssh_options = ["-o", "UserKnownHostsFile=/dev/null", "-o", "StrictHostKeyChecking=no", "-o", "LogLevel=ERROR"] + self._ssh_options = [ + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "StrictHostKeyChecking=no", + "-o", + "LogLevel=ERROR", + ] self.logger = logger.getChild(self.__class__.__name__) @property @@ -255,8 +258,7 @@ def copy_file_or_dir(self, file_or_dir, target_path): self.ssh_command(['sudo', 'rm', '-rf', target_path], raise_on_error=True) self._run_in_subprocess( - ["scp"] + self._ssh_options + [ '-r', file_or_dir, self._path_at_host(transit_path)], - raise_on_error=True + ["scp"] + self._ssh_options + ['-r', file_or_dir, self._path_at_host(transit_path)], raise_on_error=True ) self.ssh_command(["sudo", "mv", transit_path, target_path], raise_on_error=True) @@ -268,19 +270,19 @@ def logs_directory(self): def cleanup_logs(self): self.ssh_command("sudo dmesg --clear", raise_on_error=True) self.ssh_command( - 'sudo rm -rf {}/* && sudo service rsyslog restart'.format(self.logs_directory), - raise_on_error=True) + 'sudo rm -rf {}/* && sudo service rsyslog restart'.format(self.logs_directory), raise_on_error=True + ) def sky_get_and_move(self, rb_torrent, item_to_move, target_path): self.ssh_command(['sky get -d %s %s' % (self._artifacts_path, rb_torrent)], raise_on_error=True) self.ssh_command( - ['sudo mv %s %s' % (os.path.join(self._artifacts_path, item_to_move), target_path)], - raise_on_error=True + ['sudo mv %s %s' % (os.path.join(self._artifacts_path, item_to_move), target_path)], raise_on_error=True ) def send_signal(self, signal): self.ssh_command( - "ps aux | grep %d | grep -v daemon | grep -v grep | awk '{ print $2 }' | xargs sudo kill -%d" % ( + "ps aux | grep %d | grep -v daemon | grep -v grep | awk '{ print $2 }' | xargs sudo kill -%d" + % ( int(self.ic_port), int(signal), ) @@ -288,13 +290,15 @@ def send_signal(self, signal): def kill_process_and_daemon(self): self.ssh_command( - "ps aux | grep daemon | grep %d | grep -v grep | awk '{ print $2 }' | xargs sudo kill -%d" % ( + "ps aux | grep daemon | grep %d | grep -v grep | awk '{ print $2 }' | xargs sudo kill -%d" + % ( int(self.ic_port), int(signal.SIGKILL), ) ) self.ssh_command( - "ps aux | grep %d | grep -v grep | awk '{ print $2 }' | xargs sudo kill -%d" % ( + "ps aux | grep %d | grep -v grep | awk '{ print $2 }' | xargs sudo kill -%d" + % ( int(self.ic_port), int(signal.SIGKILL), ) @@ -314,4 +318,6 @@ def _run_in_subprocess(self, command, raise_on_error=False): self.logger.exception("Ssh command failed with output = " + e.output.decode("utf-8", errors="replace")) raise else: - self.logger.info("Ssh command failed with output (it was ignored) = " + e.output.decode("utf-8", errors="replace")) + self.logger.info( + "Ssh command failed with output (it was ignored) = " + e.output.decode("utf-8", errors="replace") + ) diff --git a/ydb/tests/library/harness/ydb_fixtures.py b/ydb/tests/library/harness/ydb_fixtures.py index 5a7fcef94d1b..5a2bbda07a72 100644 --- a/ydb/tests/library/harness/ydb_fixtures.py +++ b/ydb/tests/library/harness/ydb_fixtures.py @@ -24,7 +24,7 @@ 'SCHEME_BOARD_SUBSCRIBER': LogLevels.WARN, 'TX_DATASHARD': LogLevels.DEBUG, 'CHANGE_EXCHANGE': LogLevels.DEBUG, - } + }, ) @@ -75,24 +75,15 @@ def ydb_safe_test_name(request): @contextlib.contextmanager -def ydb_database_ctx(ydb_cluster, database_path, node_count=1, timeout_seconds=20): - ''' ??? ''' +def ydb_database_ctx(ydb_cluster, database_path, node_count=1, timeout_seconds=20, storage_pools={'hdd': 1}): + '''???''' assert os.path.abspath(database_path), 'database_path should be an (absolute) path, not a database name' - ydb_cluster.remove_database( - database_path, - timeout_seconds=timeout_seconds - ) + ydb_cluster.remove_database(database_path, timeout_seconds=timeout_seconds) logger.debug("create database %s: create path and declare internals", database_path) - ydb_cluster.create_database( - database_path, - storage_pool_units_count={ - 'hdd': 1 - }, - timeout_seconds=timeout_seconds - ) + ydb_cluster.create_database(database_path, storage_pool_units_count=storage_pools, timeout_seconds=timeout_seconds) logger.debug("create database %s: start nodes and construct internals", database_path) database_nodes = ydb_cluster.register_and_start_slots(database_path, node_count) @@ -104,10 +95,7 @@ def ydb_database_ctx(ydb_cluster, database_path, node_count=1, timeout_seconds=2 yield database_path logger.debug("destroy database %s: remove path and dismantle internals", database_path) - ydb_cluster.remove_database( - database_path, - timeout_seconds=timeout_seconds - ) + ydb_cluster.remove_database(database_path, timeout_seconds=timeout_seconds) logger.debug("destroy database %s: stop nodes", database_path) ydb_cluster.unregister_and_stop_slots(database_nodes) @@ -139,6 +127,7 @@ def stop_driver(): request.addfinalizer(stop_driver) return driver + return _make_driver @@ -153,6 +142,7 @@ def stop_pool(): request.addfinalizer(stop_pool) return pool + return _make_pool