Skip to content

Commit

Permalink
Fixes for SloNN PR (#13592)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximyurchuk authored Jan 21, 2025
1 parent 8390c23 commit 440c81e
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 55 deletions.
37 changes: 37 additions & 0 deletions ydb/tests/library/common/workload_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import yatest.common


def get_pool(self, ydb_cli_path, endpoint, database, user, query):
# It's funny, but the only way to find resource pool that was used is to use YDB CLI

command = [
ydb_cli_path,
'-e', 'grpc://' + endpoint,
'-d', database,
"--user", user,
'--no-password',
"sql",
"-s", query,
'--stats', 'full',
'--format', 'json-unicode'
]

stdout = yatest.common.execute(command, wait=True).stdout.decode("utf-8")
resource_pool_in_use = _find_resource_pool_id(stdout)
return resource_pool_in_use


def _find_resource_pool_id(text):
key = '"ResourcePoolId"'
if key in text:
start_idx = text.find(key) + len(key)
# Skip spaces and colon
while start_idx < len(text) and text[start_idx] in ' :':
start_idx += 1
# Skip opening double quote
if start_idx < len(text) and text[start_idx] == '"':
start_idx += 1
end_idx = text.find('"', start_idx)
if end_idx != -1:
return text[start_idx:end_idx]
return None
1 change: 1 addition & 0 deletions ydb/tests/library/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PY_SRCS(
common/path_types.py
common/types.py
common/wait_for.py
common/workload_manager.py
kv/__init__.py
kv/helpers.py
harness/__init__.py
Expand Down
5 changes: 3 additions & 2 deletions ydb/tests/sql/large/test_bulkupserts_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import random
import ydb

from ydb.tests.sql.lib.test_base import TpchTestBaseH1, TestBase
from ydb.tests.sql.lib.test_base import TpchTestBaseH1
from ydb.tests.sql.lib.helpers import split_data_into_fixed_size_chunks


class TestTpchBulkUpsertsOperations(TpchTestBaseH1):
Expand Down Expand Up @@ -153,7 +154,7 @@ def bulk_upsert_operation(data_slice):
self.tpch_bulk_upsert_col_types()
)

for chunk in TestBase.split_data_into_fixed_size_chunks(upsert_data, 10000):
for chunk in split_data_into_fixed_size_chunks(upsert_data, 10000):
bulk_upsert_operation(chunk)

# Verify data integrity by fetching all records with the special l_suppkey
Expand Down
9 changes: 5 additions & 4 deletions ydb/tests/sql/large/test_insertinto_selectfrom.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
import random
from ydb.tests.sql.lib.test_base import TpchTestBaseH1, TestBase
from ydb.tests.sql.lib.test_base import TpchTestBaseH1
from ydb.tests.sql.lib.helpers import split_data_into_fixed_size_chunks


class TestConcurrentInsertAndCount(TpchTestBaseH1):
Expand Down Expand Up @@ -45,7 +46,7 @@ def insert_data():

data.append(lineitem)

for chunk in TestBase.split_data_into_fixed_size_chunks(data, 1000):
for chunk in split_data_into_fixed_size_chunks(data, 1000):
if not count_future.done():
self.bulk_upsert_operation(lineitem_table, chunk)

Expand Down Expand Up @@ -132,7 +133,7 @@ def upsert_data(session, chunk):
raise

# Split all data to chunks and UPSERT them
for chunk in TestBase.split_data_into_fixed_size_chunks(data, 1000):
for chunk in split_data_into_fixed_size_chunks(data, 1000):
if not count_future.done():
self.transactional(lambda session: upsert_data(session, chunk))

Expand Down Expand Up @@ -205,7 +206,7 @@ def insert_data():
data.append(lineitem)

# Split all data to chunks and UPSERT them
for chunk in TestBase.split_data_into_fixed_size_chunks(data, 100):
for chunk in split_data_into_fixed_size_chunks(data, 100):
query_texts = []
for lineitem in chunk:
query_texts.append(self.build_lineitem_upsert_query(lineitem_table, lineitem))
Expand Down
3 changes: 2 additions & 1 deletion ydb/tests/sql/large/test_tiering.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random
import ydb
from ydb.tests.sql.lib.test_base import TestBase
from ydb.tests.sql.lib.helpers import split_data_into_fixed_size_chunks
from ydb.tests.sql.lib.test_s3 import S3Base
import time

Expand Down Expand Up @@ -183,5 +184,5 @@ def bulk_upsert_operation(data_slice):
column_types
)

for chunk in TestBase.split_data_into_fixed_size_chunks(upsert_data, 10000):
for chunk in split_data_into_fixed_size_chunks(upsert_data, 10000):
bulk_upsert_operation(chunk)
16 changes: 9 additions & 7 deletions ydb/tests/sql/large/test_workload_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-
import concurrent.futures
from ydb.tests.sql.lib.test_base import TpchTestBaseH1
from ydb.tests.sql.lib.test_wm import WorkloadManager
from ydb.tests.library.common import workload_manager
import ydb
import time
import pytest


class TestWorkloadManager(TpchTestBaseH1, WorkloadManager):
class TestWorkloadManager(TpchTestBaseH1):
def get_pool(self, user, query):
return workload_manager.get_pool(self, self.ydb_cli_path, self.get_endpoint(), self.get_database(), user, query)

def test_crud(self):
"""
Expand Down Expand Up @@ -92,7 +94,7 @@ def test_pool_classifier_with_init_timeout(self):
# Wait until resource pool fetches resource classifiers list
time.sleep(12)

self.verify_pool("testuser", resource_pool, f'select count(*) from `{table_name}`')
assert self.get_pool("testuser", f'select count(*) from `{table_name}`') == resource_pool

def test_pool_classifier_without_init_timeout(self):
"""
Expand Down Expand Up @@ -130,7 +132,7 @@ def test_pool_classifier_without_init_timeout(self):
)"""
self.query(pool_classifier_definition)

self.verify_pool("testuser", resource_pool, f'select count(*) from `{table_name}`')
assert self.get_pool("testuser", f'select count(*) from `{table_name}`') == resource_pool

def test_resource_pool_queue_size_limit(self):
"""
Expand Down Expand Up @@ -170,7 +172,7 @@ def test_resource_pool_queue_size_limit(self):
# Wait until resource pool fetches resource classifiers list
time.sleep(12)

self.verify_pool(testuser_username, resource_pool, f'select count(*) from `{table_name}`')
assert self.get_pool(testuser_username, f'select count(*) from `{table_name}`') == resource_pool

test_user_connection = self.create_connection(testuser_username)

Expand Down Expand Up @@ -263,8 +265,8 @@ def test_resource_pool_queue_resource_weight(self, run_count, use_classifiers):
# Wait until resource pool fetches resource classifiers list
time.sleep(12)

self.verify_pool(high_priority_user, high_resource_pool, f'select count(*) from `{table_name}`')
self.verify_pool(low_priority_user, low_resource_pool, f'select count(*) from `{table_name}`')
assert self.get_pool(high_priority_user, f'select count(*) from `{table_name}`') == high_resource_pool
assert self.get_pool(low_priority_user, f'select count(*) from `{table_name}`') == low_resource_pool

highpriority_user_connection = self.create_connection(high_priority_user)
lowpriority_user_connection = self.create_connection(low_priority_user)
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/sql/lib/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
def split_data_into_fixed_size_chunks(data, chunk_size):
"""Splits data to N chunks of chunk_size size"""
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
11 changes: 9 additions & 2 deletions ydb/tests/sql/lib/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
from ydb.tests.library.harness.kikimr_runner import KiKiMR
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.common.types import Erasure
from .test_lib import TestLib
from .test_query import Query
from ydb.tests.library.harness.util import LogLevels


logger = logging.getLogger(__name__)


class TestBase(TestLib, Query):
class TestBase(Query):

@classmethod
def setup_class(cls):
Expand Down Expand Up @@ -195,3 +194,11 @@ def create_lineitem(self, update):

def tpch_est_records_count(self):
return 600_000_000

# Function to perform bulk upserts
def bulk_upsert_operation(self, table_name, data_slice):
self.driver.table_client.bulk_upsert(
f"{self.database}/{table_name}",
data_slice,
self.tpch_bulk_upsert_col_types()
)
14 changes: 0 additions & 14 deletions ydb/tests/sql/lib/test_lib.py

This file was deleted.

23 changes: 0 additions & 23 deletions ydb/tests/sql/lib/test_wm.py

This file was deleted.

3 changes: 1 addition & 2 deletions ydb/tests/sql/lib/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server")

PY_SRCS(
test_base.py
test_lib.py
helpers.py
test_query.py
test_s3.py
test_wm.py
)

PEERDIR(
Expand Down

0 comments on commit 440c81e

Please sign in to comment.