From 55ddce1ecd2411690bc6a268c35368b34a6df89b Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 7 Nov 2024 14:53:54 -0800 Subject: [PATCH] Implemented the integration test itest-http-ingest Added the GHA CI action for the test. --- .github/workflows/ci.yml | 5 + .../lsst/qserv/admin/cli/_integration_test.py | 36 +++ .../python/lsst/qserv/admin/cli/entrypoint.py | 29 ++ .../python/lsst/qserv/admin/cli/options.py | 6 + .../python/lsst/qserv/admin/cli/script.py | 16 + src/admin/python/lsst/qserv/admin/itest.py | 289 +++++++++++++++++- .../lsst/qserv/admin/qservCli/launch.py | 128 ++++++++ .../python/lsst/qserv/admin/qservCli/opt.py | 16 + .../lsst/qserv/admin/qservCli/qserv_cli.py | 55 ++++ .../lsst/qserv/admin/replicationInterface.py | 23 +- 10 files changed, 591 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e8afa81b1..1e51347a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -327,6 +327,11 @@ jobs: ./admin/local/cli/qserv --log-level DEBUG itest-http --reload --load-http \ --qserv-image ${{ needs.image-names.outputs.qserv-image }} \ --mariadb-image ${{ needs.image-names.outputs.mariadb-image }} + + - name: Run integration tests of ingesting user tables via the HTTP frontend + run: | + ./admin/local/cli/qserv --log-level DEBUG itest-http-ingest \ + --qserv-image ${{ needs.image-names.outputs.qserv-image }} - name: Check Qserv containers if: always() diff --git a/src/admin/python/lsst/qserv/admin/cli/_integration_test.py b/src/admin/python/lsst/qserv/admin/cli/_integration_test.py index 2c4988066..0f67d9a67 100644 --- a/src/admin/python/lsst/qserv/admin/cli/_integration_test.py +++ b/src/admin/python/lsst/qserv/admin/cli/_integration_test.py @@ -340,6 +340,42 @@ def run_integration_tests_http( ) +def run_integration_tests_http_ingest( + run_tests: bool, + keep_results: bool, + tests_yaml: str, +) -> bool: + """Top level script to run the integration tests of ingesting user tables via the HTTP frontend. + + Parameters + ---------- + run_tests : `bool` + True if the tests should be run. (False can be used to compare + previously generated test outputs.) + keep_results : `bool` + True if the results should be kept after the tests are run. + tests_yaml : `str` + Path to the yaml file that contains the details about running the + tests. The files will be merged, higher index files get priority. + + Returns + ------- + success : `bool` + `True` if loading succeeded and query outputs were the same as the inputs, otherwise `False`. + """ + + with open(tests_yaml) as f: + tests_data = yaml.safe_load(f.read()) + + if run_tests: + wait_for_czar(tests_data["czar-db-admin-uri"]) + wait_for_replication_system(tests_data["replication-controller-uri"]) + return itest.run_http_ingest( + http_frontend_uri=tests_data["qserv-http-uri"], + keep_results=keep_results, + ) + return True + def prepare_data( tests_yaml: str ) -> bool: diff --git a/src/admin/python/lsst/qserv/admin/cli/entrypoint.py b/src/admin/python/lsst/qserv/admin/cli/entrypoint.py index 6b003bce2..f63657aa6 100644 --- a/src/admin/python/lsst/qserv/admin/cli/entrypoint.py +++ b/src/admin/python/lsst/qserv/admin/cli/entrypoint.py @@ -63,6 +63,7 @@ option_results_protocol, option_run, option_run_tests, + option_keep_results, options_targs, option_tests_yaml, option_unload, @@ -428,6 +429,34 @@ def integration_test_http( sys.exit(0 if results.passed else 1) +@entrypoint.command() +@option_repl_connection( + help=option_repl_connection.keywords["help"] + + " If provided will wait for the replication system to be responsive before loading data (does not guarantee system readyness)." +) +@option_run_tests() +@option_keep_results() +@option_tests_yaml() +def integration_test_http_ingest( + repl_connection: str, + run_tests: bool, + keep_results: bool, + tests_yaml: str, +) -> None: + """Run integration tests of the ingesting user tables via the HTTP frontend. + + TESTS_YAML is the yaml file path that contains connection information and describes tests to load and run. + """ + + results = script.integration_test_http_ingest( + repl_connection=repl_connection, + run_tests=run_tests, + keep_results=keep_results, + tests_yaml=tests_yaml, + ) + click.echo(str(results)) + sys.exit(0 if results else 1) + @entrypoint.command() @option_tests_yaml() diff --git a/src/admin/python/lsst/qserv/admin/cli/options.py b/src/admin/python/lsst/qserv/admin/cli/options.py index 26bfee53f..fec4f12e5 100644 --- a/src/admin/python/lsst/qserv/admin/cli/options.py +++ b/src/admin/python/lsst/qserv/admin/cli/options.py @@ -280,6 +280,12 @@ def __call__(self, f: Callable) -> Callable: default=True, ) +option_keep_results = partial( + click.option, + "--keep-results/--no-keep-results", + help="Keep or delet results after finishing the tests. Defaults to --no-keep-results.", + default=False, +) option_compare_results = partial( click.option, diff --git a/src/admin/python/lsst/qserv/admin/cli/script.py b/src/admin/python/lsst/qserv/admin/cli/script.py index 0ddd17ad3..4f72673cf 100644 --- a/src/admin/python/lsst/qserv/admin/cli/script.py +++ b/src/admin/python/lsst/qserv/admin/cli/script.py @@ -1095,6 +1095,22 @@ def integration_test_http( ) +def integration_test_http_ingest( + repl_connection: str, + run_tests: bool, + keep_results: bool, + tests_yaml: str, +) -> bool: + if repl_connection is not None: + _do_smig_block(admin_smig_dir, "replica", repl_connection) + + return _integration_test.run_integration_tests_http_ingest( + run_tests=run_tests, + keep_results=keep_results, + tests_yaml=tests_yaml, + ) + + def prepare_data( tests_yaml: str, ) -> bool: diff --git a/src/admin/python/lsst/qserv/admin/itest.py b/src/admin/python/lsst/qserv/admin/itest.py index 9235cfd65..75effc420 100644 --- a/src/admin/python/lsst/qserv/admin/itest.py +++ b/src/admin/python/lsst/qserv/admin/itest.py @@ -18,19 +18,22 @@ # # You should have received a copy of the GNU General Public License +import csv from filecmp import dircmp import json import logging import urllib3 import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder import os import re import shutil import time -from typing import Any, Dict, Generator, List, Optional, TextIO, Union +from typing import Any, Collection, Dict, Generator, List, Optional, Sequence, TextIO, Union import subprocess from urllib.parse import urljoin from urllib.parse import urlparse +from .replicationInterface import repl_api_version _log = logging.getLogger(__name__) @@ -996,3 +999,287 @@ def compareQueryResults(run_cases: List[str], outputs_dir: str) -> List[ITestCas _log.info(str(result)) return results + +def run_http_ingest( + http_frontend_uri: str, + keep_results: bool, +) -> bool: + """Test ingesting user tables into Qserv and querying the tables. + + Parameters + ---------- + http_frontend_uri : `str` + The uri to use to connect to the HTPP frontend. + keep_results : `bool` + If `True` then keep the results of the test, otherwise delete them. + """ + + schema = [ + {"name": "id", "type": "INT"}, + {"name": "val", "type": "VARCHAR(32)"}, + {"name": "active", "type": "BOOL"}, + ] + indexes = [ + {"index": "idx_id", + "spec": "UNIQUE", + "comment": "The unique primary key index.", + "columns": [ + {"columnn": "id", "length": 0, "ascending": 1} + ]}, + {"index": "idx_val", + "spec": "DEFAULT", + "comment": "The non-unique index on the string values.", + "columns": [ + {"columnn": "val", "length": 32, "ascending": 1} + ]}, + ] + rows = [ + ["1", "Andy", "1"], + ["2", "Bob", "0"], + ["3", "Charlie", "1"], + ] + + database = "user_testdb" + table_json = "json" + table_csv = "csv" + + _log.debug("Testing user database: %s", database) + + # Supress the warning about the self-signed certificate + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # Run the cleanup step to ensure no such database (and tables in it) exists after + # prior attempts to run the test. + try: + _http_delete_database(http_frontend_uri, database) + except Exception as e: + _log.warning("Failed to delete user database: %s, error: %s", database, e) + + # Create the table and ingest data using the JSON option. Then query the table. + try: + _http_ingest_data_json(http_frontend_uri, database, table_json, schema, indexes, rows) + except Exception as e: + _log.error("Failed to ingest data into table: %s of user database: %s, error: %s", table_json, database, e) + return False + try: + _http_query_table(http_frontend_uri, database, table_json, rows) + except Exception as e: + _log.error("Failed to query table: %s of user database: %s, error: ", table_json, database, e) + return False + + # Create the table and ingest data using the CSV option. Then query the table. + try: + _http_ingest_data_csv(http_frontend_uri, database, table_csv, schema, indexes, rows) + except Exception as e: + _log.error("Failed to ingest data into table: %s of user database: %s, error: %s", table_csv, database, e) + return False + try: + _http_query_table(http_frontend_uri, database, table_csv, rows) + except Exception as e: + _log.error("Failed to query table: %s of user database: %s, error: ", table_csv, database, e) + + # Cleanup the tables and the database in two separate steps unless the user + # requested to keep the results. + if not keep_results: + for table in [table_json, table_csv]: + try: + _http_delete_table(http_frontend_uri, database, table) + except Exception as e: + _log.error("Failed to delete table: %s from user database: %s, error: %s", table, database, e) + return False + try: + _http_delete_database(http_frontend_uri, database) + except Exception as e: + _log.error("Failed to delete user database: %s, error: %s", database, e) + return False + + return True + +def _http_delete_database( + http_frontend_uri: str, + database: str, +) -> None: + """Delete an existing user database. + + Parameters + ---------- + http : `str` + The uri to use to connect to the HTPP frontend. + database : `str` + The name of the database to delete. + """ + _log.debug("Deleting user database: %s", database) + data = { + "version": repl_api_version, + } + url = str(urljoin(http_frontend_uri, f"/ingest/database/{database}")) + req = requests.delete(url, json=data, verify=False) + req.raise_for_status() + res = req.json() + if res["success"] == 0: + error = res["error"] + raise RuntimeError(f"Failed to delete user database: {database}, error: {error}") + +def _http_delete_table( + http_frontend_uri: str, + database: str, + table: str, +) -> None: + """Delete an existing table from the user database. + + Parameters + ---------- + http_frontend_uri : `str` + The uri to use to connect to the HTPP frontend. + database : `str` + The name of the database where the table is located. + table : `str` + The name of the table to delete. + """ + _log.debug("Deleting table: %s from user database: %s", table, database) + data = { + "version": repl_api_version, + } + url = str(urljoin(http_frontend_uri, f"/ingest/table/{database}/{table}")) + req = requests.delete(url, json=data, verify=False) + req.raise_for_status() + res = req.json() + if res["success"] == 0: + error = res["error"] + raise RuntimeError(f"Failed to delete table: {table} from user database: {database}, error: {error}") + +def _http_ingest_data_json( + http_frontend_uri: str, + database: str, + table: str, + schema: List[Dict[str, str]], + indexes: List[Dict[str, Sequence[Collection[str]]]], + rows: List[List[Any]], +) -> None: + """Ingest data into an existing table of the user database. + + Parameters + ---------- + http_frontend_uri : `str` + The uri to use to connect to the HTPP frontend. + database : `str` + The name of the database where the table is located. + table : `str` + The name of the table where the data will be ingested. + schema : `list` [`dict` [`str`, `str`]] + The schema of the table to be created. + indexes : `list` [`dict` [`str`, `list` [`list` [`str`]]]] + The indexes of the table to be created. + """ + _log.debug("Ingesting JSON data into table: %s of user database: %s", table, database) + data = { + "version": repl_api_version, + "database" : database, + "table": table, + "schema": schema, + "indexes": indexes, + "rows": rows, + } + url = str(urljoin(http_frontend_uri, "/ingest/data")) + req = requests.post(url, json=data, verify=False) + req.raise_for_status() + res = req.json() + if res["success"] == 0: + error = res["error"] + raise RuntimeError(f"Failed to create and load the table: {table} in user database {database}, error: {error}") + +def _http_ingest_data_csv( + http_frontend_uri: str, + database: str, + table: str, + schema: List[Dict[str, str]], + indexes: List[Dict[str, Sequence[Collection[str]]]], + rows: List[List[Any]], +) -> None: + """Create the table and ingest the data into the table. + + Parameters + ---------- + http_frontend_uri : `str` + The uri to use to connect to the HTPP frontend. + database : `str` + The name of the database where the table is located. + table : `str` + The name of the table where the data will be ingested. + schema : `list` [`dict` [`str`, `str`]] + The schema of the table to be created. + indexes : `list` [`dict` [`str`, `list` [`list` [`str`]]]] + The indexes of the table to be created. + """ + _log.debug("Ingesting CSV data into table: %s of user database: %s", table, database) + base_dir = "/tmp" + schema_file = "schema.json" + schema_file_path = os.path.join(base_dir, schema_file) + indexes_file = "indexes.json" + indexes_file_path = os.path.join(base_dir, indexes_file) + rows_file = "rows.csv" + rows_file_path = os.path.join(base_dir, rows_file) + + with open(schema_file_path, "w") as f: + json.dump(schema, f) + with open(indexes_file_path, "w") as f: + json.dump(indexes, f) + with open(rows_file_path, "w") as f: + csv_writer = csv.writer(f) + for row in rows: + csv_writer.writerow(row) + + encoder = MultipartEncoder( + fields = { + "version": (None, str(repl_api_version)), + "database" : (None, database), + "table": (None, table), + "fields_terminated_by": (None, ","), + "schema": (schema_file, open(schema_file_path, "rb"), "application/json"), + "indexes": (indexes_file, open(indexes_file_path, "rb"), "application/json"), + "rows": (rows_file, open(rows_file_path, "rb"), "text/csv"), + } + ) + url = str(urljoin(http_frontend_uri, "/ingest/csv")) + req = requests.post(url, data=encoder, headers={'Content-Type': encoder.content_type}, verify=False) + req.raise_for_status() + res = req.json() + if res["success"] == 0: + error = res["error"] + raise RuntimeError(f"Failed to create and load the table: {table} in user database {database}, error: {error}") + +def _http_query_table( + http_frontend_uri: str, + database: str, + table: str, + expected_rows: List[List[Any]], +) -> None: + """Query an existing table of the user database. + + Parameters + ---------- + http_frontend_uri : `str` + The uri to use to connect to the HTPP frontend. + database : `str` + The name of the database where the table is located. + table : `str` + The name of the table that will be queried. + expected_rows : `list` [`list` [`Any`]] + The expected data in the table + """ + _log.debug("Querying table: %s of user database: %s", table, database) + data = { + "version": repl_api_version, + "database" : database, + "query": f"SELECT `id`,`val`,`active` FROM `{table}` ORDER BY id ASC", + } + url = str(urljoin(http_frontend_uri, "/query")) + req = requests.post(url, json=data, verify=False) + req.raise_for_status() + res = req.json() + if res["success"] == 0: + error = res["error"] + raise RuntimeError(f"Failed to query the table: {table} in user database: {database}, error: {error}") + received_rows = res["rows"] + if received_rows != expected_rows: + raise RuntimeError(f"Query result mismatch for table: {table} in user database: {database}, expected: {expected_rows}, got: {received_rows}") diff --git a/src/admin/python/lsst/qserv/admin/qservCli/launch.py b/src/admin/python/lsst/qserv/admin/qservCli/launch.py index 912bb997b..1af901d22 100644 --- a/src/admin/python/lsst/qserv/admin/qservCli/launch.py +++ b/src/admin/python/lsst/qserv/admin/qservCli/launch.py @@ -1176,6 +1176,100 @@ def add_flag_if(val: Optional[bool], true_flag: str, false_flag: str, args: List result = subprocess.run(args) return result.returncode +def integration_test_http_ingest( + qserv_root: str, + itest_container_http_ingest: str, + itest_volume: str, + qserv_image: str, + bind: List[str], + itest_file: str, + dry: bool, + project: str, + run_tests: bool, + keep_results: bool, + tests_yaml: str, + wait: int, + remove: bool, +) -> int: + """Run integration tests of the HTTP frontend. + + Parameters + ---------- + qserv_root : `str` + The path to the qserv source folder. + itest_container_http_ingest : `str` + The name to give the container. + itest_volume : `str` + The name of the volume used to host integration test data. + qserv_image : `str` + The name of the image to run. + bind : `List[str]` + One of ["all", "python", "bin", "lib64", "lua", "qserv", "etc"]. + If provided, selected build artifact directories will be bound into + their install locations in the container. If "all" is provided then all + the locations will be bound. Allows for local iterative build & test + without having to rebuild the docker image. + itest_file : `str` + The path to the yaml file that contains integration test execution data. + dry : `bool` + If True do not run the command; print what would have been run. + project : `str` + The name used for qserv instance customizations. + run_tests : bool + If False will skip test execution. + keep_results : bool + If True will not remove ingested user tables and the database. + tests_yaml : str + Path to the yaml that contains settings for integration test execution. + wait : `int` + How many seconds to wait before launching the integration test container. + remove : `bool` + True if the containers should be removed after executing tests. + + Returns + ------- + returncode : `int` + The returncode of "entrypoint integration-test-http-ingest". + """ + if wait: + _log.info(f"Waiting {wait} seconds for qserv to stabilize.") + time.sleep(wait) + _log.info("Continuing.") + + with open(itest_file) as f: + tests_data = yaml.safe_load(f.read()) + + args = [ + "docker", + "run", + "--init", + "--name", + itest_container_http_ingest, + "--mount", + f"src={itest_file},dst=/usr/local/etc/integration_tests.yaml,type=bind", + "--mount", + f"src={itest_volume},dst={tests_data['testdata-output']},type=volume", + "--mount", + f"src={os.path.join(qserv_root, testdata_subdir)},dst={tests_data['qserv-testdata-dir']},type=bind", + ] + if remove: + args.append("--rm") + if bind: + args.extend(bind_args(qserv_root=qserv_root, bind_names=bind)) + add_network_option(args, project) + args.extend([qserv_image, "entrypoint", "--log-level", "DEBUG", "integration-test-http-ingest"]) + + args.append("--run-tests" if run_tests else "--no-run-tests") + args.append("--keep-results" if keep_results else "--no-keep-results") + + if tests_yaml: + args.extend(["--tests-yaml", tests_yaml]) + if dry: + print(" ".join(args)) + return 0 + _log.debug(f"Running {' '.join(args)}") + result = subprocess.run(args) + return result.returncode def itest( qserv_root: str, @@ -1307,6 +1401,40 @@ def itest_http( stop_db_returncode = stop_itest_ref(itest_ref_container, dry) if remove else 0 return returncode or stop_db_returncode +def itest_http_ingest( + qserv_root: str, + itest_http_ingest_container: str, + qserv_image: str, + bind: List[str], + itest_file: str, + dry: bool, + project: str, + run_tests: bool, + keep_results: bool, + tests_yaml: str, + wait: int, + remove: bool, +) -> int: + """Run integration tests of ingesting user tables via the HTTP frontend. + """ + itest_volumes = make_itest_volumes(project) + returncode = integration_test_http_ingest( + qserv_root, + itest_http_ingest_container, + itest_volumes.exe, + qserv_image, + bind, + itest_file, + dry, + project, + run_tests, + keep_results, + tests_yaml, + wait, + remove, + ) + return returncode + def itest_rm(project: str, dry: bool) -> None: """Remove integration test volumes. diff --git a/src/admin/python/lsst/qserv/admin/qservCli/opt.py b/src/admin/python/lsst/qserv/admin/qservCli/opt.py index 80b980042..6c92175bb 100644 --- a/src/admin/python/lsst/qserv/admin/qservCli/opt.py +++ b/src/admin/python/lsst/qserv/admin/qservCli/opt.py @@ -442,6 +442,12 @@ def help(self, preamble: str) -> str: ev=env_project, val=lambda ev_val: f"{ev_val}_itest_http", ) +itest_http_ingest_container_default = OptDefault( + opt=["--itest-http-ingest-container"], + default="itest_http_ingest", + ev=env_project, + val=lambda ev_val: f"{ev_val}_itest_http_ingest", +) test_container_default = OptDefault( opt=["--test-container"], default="test", @@ -746,6 +752,7 @@ def describe(self) -> str: required=True, ) + option_itest_http_container_name = partial( click.option, *itest_http_container_default.opt, @@ -754,6 +761,15 @@ def describe(self) -> str: required=True, ) +option_itest_http_ingest_container_name = partial( + click.option, + *itest_http_ingest_container_default.opt, + help=itest_http_ingest_container_default.help("The name to give the integration test container for testing" + " user table ingest via the HTTP frontend."), + default=itest_http_ingest_container_default.val(), + required=True, +) + option_test_container_name = partial( click.option, diff --git a/src/admin/python/lsst/qserv/admin/qservCli/qserv_cli.py b/src/admin/python/lsst/qserv/admin/qservCli/qserv_cli.py index f4a657559..19aa9bca3 100644 --- a/src/admin/python/lsst/qserv/admin/qservCli/qserv_cli.py +++ b/src/admin/python/lsst/qserv/admin/qservCli/qserv_cli.py @@ -39,6 +39,7 @@ option_reload, option_load_http, option_run_tests, + option_keep_results, option_compare_results, option_case, option_repl_connection, @@ -70,6 +71,7 @@ option_itest_ref_container_name, option_itest_file, option_itest_http_container_name, + option_itest_http_ingest_container_name, option_jobs, env_ltd_password, env_ltd_user, @@ -116,6 +118,7 @@ "update-schema", "itest", "itest-http", + "itest-http-ingest", "itest-rm", "prepare-data", "run-dev", @@ -647,6 +650,58 @@ def itest_http( ) sys.exit(returncode) +@qserv.command() +@option_qserv_image() +@option_qserv_root() +@option_project() +@option_itest_http_ingest_container_name() +@option_bind() +@option_itest_file() +@option_run_tests() +@option_keep_results() +@option_tests_yaml() +@click.option( + "--wait", + help="How many seconds to wait before running load and test. " + "This is useful for allowing qserv to boot if the qserv containers " + "are started at the same time as this container. " + f"Default is {click.style('0', fg='green', bold=True)}.", + default=0, +) +@option_remove() +@option_dry() +def itest_http_ingest( + qserv_root: str, + itest_http_ingest_container: str, + qserv_image: str, + bind: List[str], + itest_file: str, + dry: bool, + project: str, + run_tests: bool, + keep_results: bool, + tests_yaml: str, + wait: int, + remove: bool, +) -> None: + """Run integration tests for ingesting user tables via the HTTP frontend. + + Launches a lite-qserv container and uses it to run integration tests.""" + returncode = launch.itest_http_ingest( + qserv_root=qserv_root, + itest_http_ingest_container=itest_http_ingest_container, + qserv_image=qserv_image, + bind=bind, + itest_file=itest_file, + dry=dry, + project=project, + run_tests=run_tests, + keep_results=keep_results, + tests_yaml=tests_yaml, + wait=wait, + remove=remove, + ) + sys.exit(returncode) @qserv.command() diff --git a/src/admin/python/lsst/qserv/admin/replicationInterface.py b/src/admin/python/lsst/qserv/admin/replicationInterface.py index 369e9f0dd..ad33a3e23 100644 --- a/src/admin/python/lsst/qserv/admin/replicationInterface.py +++ b/src/admin/python/lsst/qserv/admin/replicationInterface.py @@ -41,6 +41,8 @@ chunk_info_file = "chunk_info.json" +repl_api_version = 39 + _log = logging.getLogger(__name__) @@ -230,7 +232,6 @@ def __init__( self.repl_ctrl = urlparse(repl_ctrl_uri) self.auth_key = auth_key self.admin_auth_key = admin_auth_key - self.repl_api_version = 39 _log.debug(f"ReplicationInterface %s", self.repl_ctrl) def version(self) -> str: @@ -278,7 +279,7 @@ def start_transaction(self, database: str) -> int: _log.debug("start_transaction database: %s", database) res = _post( url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/trans", - data=json.dumps(dict(database=database, auth_key=self.auth_key, version=self.repl_api_version,)), + data=json.dumps(dict(database=database, auth_key=self.auth_key, version=repl_api_version,)), ) return int(res["databases"][database]["transactions"][0]["id"]) @@ -293,7 +294,7 @@ def commit_transaction(self, transaction_id: int) -> None: _log.debug("commit_transaction transaction_id: %s", transaction_id) res = _put( url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/trans/{transaction_id}?abort=0", - data=json.dumps(dict(auth_key=self.auth_key, version=self.repl_api_version,)), + data=json.dumps(dict(auth_key=self.auth_key, version=repl_api_version,)), ) def ingest_chunk_config(self, transaction_id: int, chunk_id: str) -> ChunkLocation: @@ -315,7 +316,7 @@ def ingest_chunk_config(self, transaction_id: int, chunk_id: str) -> ChunkLocati res = _post( url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/chunk", data=json.dumps(dict(transaction_id=transaction_id, chunk=chunk_id, auth_key=self.auth_key, - version=self.repl_api_version,)), + version=repl_api_version,)), ) return ChunkLocation(chunk_id, res["location"]["host"], str(res["location"]["port"]), res["location"]["http_host"], str(res["location"]["http_port"])) @@ -338,7 +339,7 @@ def ingest_chunk_configs(self, transaction_id: int, chunk_ids: List[int]) -> Lis res = _post( url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/chunks", data=json.dumps(dict(transaction_id=transaction_id, chunks=chunk_ids, auth_key=self.auth_key, - version=self.repl_api_version,)), + version=repl_api_version,)), ) return [ChunkLocation(l["chunk"], l["host"], str(l["port"]), l["http_host"], str(l["http_port"])) for l in res["location"]] @@ -359,7 +360,7 @@ def ingest_regular_table(self, transaction_id: int) -> List[RegularLocation]: """ _log.debug("ingest_regular_table transaction_id: %s", transaction_id) res = _get( - url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/regular?version={self.repl_api_version}", + url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/regular?version={repl_api_version}", data=json.dumps(dict(auth_key=self.auth_key, transaction_id=transaction_id,)), ) return [RegularLocation(location["host"], str(location["port"]), @@ -480,7 +481,7 @@ def build_table_stats( auth_key=self.auth_key, admin_auth_key=self.admin_auth_key, instance_id=instance_id, - version=self.repl_api_version, + version=repl_api_version, ), ), ) @@ -496,7 +497,7 @@ def publish_database(self, database: str) -> None: _log.debug("publish_database database: %s", database) _put( url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/database/{database}", - data=json.dumps(dict(auth_key=self.auth_key, version=self.repl_api_version,)), + data=json.dumps(dict(auth_key=self.auth_key, version=repl_api_version,)), ) def ingest_chunks_data( @@ -639,8 +640,8 @@ def delete_database( admin : `bool` True if the admin auth key should be used. """ - if admin: data = dict(admin_auth_key=self.admin_auth_key, version=self.repl_api_version,) - else: data = dict(auth_key=self.auth_key, version=self.repl_api_version,) + if admin: data = dict(admin_auth_key=self.admin_auth_key, version=repl_api_version,) + else: data = dict(auth_key=self.auth_key, version=repl_api_version,) _log.debug("delete_database database:%s, data:%s", database, data) def warn_if_not_exist(res: Dict[Any, Any], url: str) -> None: @@ -666,6 +667,6 @@ def get_databases(self) -> List[str]: databases : List[str] The database names. """ - url = f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/replication/config?version={self.repl_api_version}" + url = f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/replication/config?version={repl_api_version}" res = _get(url, data=json.dumps({})) return [db["database"] for db in res["config"]["databases"] or []]