diff --git a/src/karapace/backup/api.py b/src/karapace/backup/api.py index 3da9a2304..6e3628414 100644 --- a/src/karapace/backup/api.py +++ b/src/karapace/backup/api.py @@ -112,7 +112,7 @@ def normalize_topic_name( topic_option: str | None, config: Config, ) -> TopicName: - return TopicName(topic_option or config["topic_name"]) + return TopicName(topic_option or config.topic_name) class BackupVersion(Enum): @@ -354,17 +354,17 @@ def _handle_restore_topic_legacy( ) -> None: if skip_topic_creation: return - if config["topic_name"] != instruction.topic_name: + if config.topic_name != instruction.topic_name: LOG.warning( "Not creating topic, because the name %r from the config and the name %r from the CLI differ.", - config["topic_name"], + config.topic_name, instruction.topic_name, ) return _maybe_create_topic( config=config, name=instruction.topic_name, - replication_factor=config["replication_factor"], + replication_factor=config.replication_factor, topic_configs={"cleanup.policy": "compact"}, ) @@ -441,9 +441,7 @@ def restore_backup( see Kafka implementation. :raises BackupTopicAlreadyExists: if backup version is V3 and topic already exists """ - key_formatter = ( - KeyFormatter() if topic_name == constants.DEFAULT_SCHEMA_TOPIC or config.get("force_key_correction", False) else None - ) + key_formatter = KeyFormatter() if topic_name == constants.DEFAULT_SCHEMA_TOPIC or config.force_key_correction else None backup_version = BackupVersion.identify(backup_location) backend_type = backup_version.reader @@ -591,7 +589,7 @@ def create_backup( started_at=start_time, finished_at=end_time, partition_count=1, - replication_factor=replication_factor if replication_factor is not None else config["replication_factor"], + replication_factor=replication_factor if replication_factor is not None else config.replication_factor, topic_configurations=topic_configurations, data_files=[data_file] if data_file else [], ) diff --git a/src/karapace/backup/cli.py b/src/karapace/backup/cli.py index 7125b1e04..9ae73ba75 100644 --- a/src/karapace/backup/cli.py +++ b/src/karapace/backup/cli.py @@ -12,7 +12,7 @@ from aiokafka.errors import BrokerResponseError from collections.abc import Iterator from karapace.backup.api import VerifyLevel -from karapace.config import Config, read_config +from karapace.config import Config, read_env_file import argparse import contextlib @@ -89,8 +89,7 @@ def parse_args() -> argparse.Namespace: def get_config(args: argparse.Namespace) -> Config: - with open(args.config) as buffer: - return read_config(buffer) + return read_env_file(args.config) def dispatch(args: argparse.Namespace) -> None: diff --git a/src/karapace/config.py b/src/karapace/config.py index 4ae51f62d..2d933a5cc 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -37,6 +37,7 @@ class Config(BaseSettings): consumer_request_max_bytes: int = 67108864 consumer_idle_disconnect_timeout: int = 0 fetch_min_bytes: int = 1 + force_key_correction: bool = False group_id: str = "schema-registry" http_request_max_size: int | None = None host: str = "127.0.0.1" @@ -250,12 +251,15 @@ def write_env_file(dot_env_path: Path, config: Config) -> None: dot_env_path.write_text(config.to_env_str()) -# def read_config(config_handler: IO) -> Config: -# try: -# config = json_decode(config_handler) -# except JSONDecodeError as ex: -# raise InvalidConfiguration("Configuration is not a valid JSON") from ex -# return set_config_defaults(config) +def read_env_file(env_file_path: str) -> Config: + return Config(_env_file=env_file_path, _env_file_encoding="utf-8") + + Config() + try: + config = json_decode(config_handler) + except JSONDecodeError as ex: + raise InvalidConfiguration("Configuration is not a valid JSON") from ex + return set_config_defaults(config) def create_client_ssl_context(config: Config) -> ssl.SSLContext | None: diff --git a/src/karapace/kafka_utils.py b/src/karapace/kafka_utils.py index ede5e7023..02eed2e64 100644 --- a/src/karapace/kafka_utils.py +++ b/src/karapace/kafka_utils.py @@ -13,35 +13,35 @@ def kafka_admin_from_config(config: Config) -> KafkaAdminClient: return KafkaAdminClient( - bootstrap_servers=config["bootstrap_uri"], - client_id=config["client_id"], - security_protocol=config["security_protocol"], - sasl_mechanism=config["sasl_mechanism"], - sasl_plain_username=config["sasl_plain_username"], - sasl_plain_password=config["sasl_plain_password"], - ssl_cafile=config["ssl_cafile"], - ssl_certfile=config["ssl_certfile"], - ssl_keyfile=config["ssl_keyfile"], + bootstrap_servers=config.bootstrap_uri, + client_id=config.client_id, + security_protocol=config.security_protocol, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_plain_username, + sasl_plain_password=config.sasl_plain_password, + ssl_cafile=config.ssl_cafile, + ssl_certfile=config.ssl_certfile, + ssl_keyfile=config.ssl_keyfile, ) @contextlib.contextmanager def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaConsumer]: consumer = KafkaConsumer( - bootstrap_servers=config["bootstrap_uri"], + bootstrap_servers=config.bootstrap_uri, topic=topic, enable_auto_commit=False, - client_id=config["client_id"], - security_protocol=config["security_protocol"], - ssl_cafile=config["ssl_cafile"], - ssl_certfile=config["ssl_certfile"], - ssl_keyfile=config["ssl_keyfile"], - sasl_mechanism=config["sasl_mechanism"], - sasl_plain_username=config["sasl_plain_username"], - sasl_plain_password=config["sasl_plain_password"], + client_id=config.client_id, + security_protocol=config.security_protocol, + ssl_cafile=config.ssl_cafile, + ssl_certfile=config.ssl_certfile, + ssl_keyfile=config.ssl_keyfile, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_plain_username, + sasl_plain_password=config.sasl_plain_password, auto_offset_reset="earliest", - session_timeout_ms=config["session_timeout_ms"], - metadata_max_age_ms=config["metadata_max_age_ms"], + session_timeout_ms=config.session_timeout_ms, + metadata_max_age_ms=config.metadata_max_age_ms, ) try: yield consumer @@ -52,15 +52,16 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons @contextlib.contextmanager def kafka_producer_from_config(config: Config) -> Iterator[KafkaProducer]: producer = KafkaProducer( - bootstrap_servers=config["bootstrap_uri"], - security_protocol=config["security_protocol"], - ssl_cafile=config["ssl_cafile"], - ssl_certfile=config["ssl_certfile"], - ssl_keyfile=config["ssl_keyfile"], - sasl_mechanism=config["sasl_mechanism"], - sasl_plain_username=config["sasl_plain_username"], - sasl_plain_password=config["sasl_plain_password"], + bootstrap_servers=config.bootstrap_uri, + security_protocol=config.security_protocol, + ssl_cafile=config.ssl_cafile, + ssl_certfile=config.ssl_certfile, + ssl_keyfile=config.ssl_keyfile, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_plain_username, + sasl_plain_password=config.sasl_plain_password, retries=0, + session_timeout_ms=config.session_timeout_ms, ) try: yield producer diff --git a/tests/integration/backup/test_avro_export.py b/tests/integration/backup/test_avro_export.py index 041023580..ac9adaa8a 100644 --- a/tests/integration/backup/test_avro_export.py +++ b/tests/integration/backup/test_avro_export.py @@ -7,7 +7,7 @@ from karapace.backup import api from karapace.backup.api import BackupVersion from karapace.client import Client -from karapace.config import set_config_defaults +from karapace.config import Config from karapace.utils import json_encode from pathlib import Path from tests.integration.utils.cluster import RegistryDescription @@ -110,12 +110,9 @@ async def test_export_anonymized_avro_schemas( # Get the backup export_location = tmp_path / "export.log" - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": registry_cluster.schemas_topic, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = registry_cluster.schemas_topic api.create_backup( config=config, backup_location=export_location, diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 08076dbde..a8fca6da6 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -10,7 +10,7 @@ from karapace.backup.errors import StaleConsumerError from karapace.backup.poll_timeout import PollTimeout from karapace.client import Client -from karapace.config import set_config_defaults +from karapace.config import Config from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import KafkaError from karapace.kafka.consumer import KafkaConsumer @@ -52,12 +52,10 @@ async def test_backup_get( # Get the backup backup_location = tmp_path / "schemas.log" - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": registry_cluster.schemas_topic, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = registry_cluster.schemas_topic + api.create_backup( config=config, backup_location=backup_location, @@ -85,11 +83,9 @@ async def test_backup_restore_and_get_non_schema_topic( ) -> None: test_topic_name = new_random_name("non-schemas") - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + admin_client.new_topic(name=test_topic_name) # Restore from backup @@ -154,13 +150,10 @@ async def test_backup_restore( ) -> None: subject = "subject-1" test_data_path = Path("tests/integration/test_data/") - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": registry_cluster.schemas_topic, - "force_key_correction": True, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = registry_cluster.schemas_topic + config.force_key_correction = True # Test basic restore functionality restore_location = test_data_path / f"test_restore_{backup_file_version}.log" @@ -252,9 +245,10 @@ async def test_stale_consumer( tmp_path: Path, ) -> None: await insert_data(registry_async_client) - config = set_config_defaults( - {"bootstrap_uri": kafka_servers.bootstrap_servers, "topic_name": registry_cluster.schemas_topic} - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = registry_cluster.schemas_topic + with pytest.raises(StaleConsumerError) as e: # The proper way to test this would be with quotas by throttling our client to death while using a very short # poll timeout. However, we have no way to set up quotas because all Kafka clients available to us do not @@ -278,9 +272,10 @@ async def test_message_error( tmp_path: Path, ) -> None: await insert_data(registry_async_client) - config = set_config_defaults( - {"bootstrap_uri": kafka_servers.bootstrap_servers, "topic_name": registry_cluster.schemas_topic} - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = registry_cluster.schemas_topic + with pytest.raises(InvalidTopicError): with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock: poll_mock.return_value = StubMessage(error=KafkaError(KafkaError.TOPIC_EXCEPTION)) diff --git a/tests/integration/backup/test_session_timeout.py b/tests/integration/backup/test_session_timeout.py index f527c8c09..b953b577b 100644 --- a/tests/integration/backup/test_session_timeout.py +++ b/tests/integration/backup/test_session_timeout.py @@ -5,7 +5,7 @@ from aiokafka.errors import NoBrokersAvailable from confluent_kafka.admin import NewTopic from karapace.backup.api import BackupVersion, create_backup -from karapace.config import Config, DEFAULTS, set_config_defaults +from karapace.config import Config from karapace.kafka.admin import KafkaAdminClient from karapace.kafka_utils import kafka_producer_from_config from pathlib import Path @@ -55,9 +55,9 @@ def test_producer_with_custom_kafka_properties_does_not_fail( This test ensures that the `session.timeout.ms` can be injected in the kafka config so that the exception isn't raised """ - config = set_config_defaults( - Config(bootstrap_uri=kafka_server_session_timeout.bootstrap_servers, session_timeout_ms=SESSION_TIMEOUT_MS) - ) + config = Config() + config.bootstrap_uri = kafka_server_session_timeout.bootstrap_servers[0] + config.session_timeout_ms = SESSION_TIMEOUT_MS admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers) admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1) @@ -101,6 +101,12 @@ def test_producer_with_custom_kafka_properties_fail( admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers) admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1) + config = Config() + # TODO: This test is broken. Test has used localhost:9092 when this should use + # the configured broker from kafka_server_session. + # config.bootstrap_uri = kafka_server_session_timeout.bootstrap_servers[0] + config.bootstrap_uri = "localhost:9092" + with pytest.raises(NoBrokersAvailable): - with kafka_producer_from_config(DEFAULTS) as producer: + with kafka_producer_from_config(config) as producer: _ = producer diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 6f2e5df35..f03adc4ea 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -17,7 +17,7 @@ from karapace.backup.errors import BackupDataRestorationError, EmptyPartition from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations -from karapace.config import Config, set_config_defaults +from karapace.config import Config from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer @@ -49,12 +49,10 @@ def config_fixture( kafka_servers: KafkaServers, registry_cluster: RegistryDescription, ) -> Config: - return set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": registry_cluster.schemas_topic, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = registry_cluster.schemas_topic + return config @pytest.fixture(scope="function", name="config_file") @@ -67,13 +65,10 @@ def config_file_fixture( file_path = directory_path / "config.json" try: file_path.write_text( - json.dumps( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": registry_cluster.schemas_topic, - }, - indent=2, - ) + f"""\ + BOOTSTRAP_URI={kafka_servers.bootstrap_servers[0]} + TOPIC_NAME={registry_cluster.schemas_topic} + """ ) yield file_path finally: @@ -557,23 +552,20 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is # Make sure topic doesn't exist beforehand. _delete_topic(admin_client, topic_name) - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] class LowTimeoutProducer: def __init__(self): self._producer = KafkaProducer( - bootstrap_servers=config["bootstrap_uri"], - security_protocol=config["security_protocol"], - ssl_cafile=config["ssl_cafile"], - ssl_certfile=config["ssl_certfile"], - ssl_keyfile=config["ssl_keyfile"], - sasl_mechanism=config["sasl_mechanism"], - sasl_plain_username=config["sasl_plain_username"], - sasl_plain_password=config["sasl_plain_password"], + bootstrap_servers=config.bootstrap_uri, + security_protocol=config.security_protocol, + ssl_cafile=config.ssl_cafile, + ssl_certfile=config.ssl_certfile, + ssl_keyfile=config.ssl_keyfile, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_plain_username, + sasl_plain_password=config.sasl_plain_password, socket_timeout_ms=5000, ) @@ -606,11 +598,8 @@ def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_p # Make sure topic doesn't exist beforehand. _delete_topic(admin_client, topic_name) - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] class FailToSendProducer(KafkaProducer): def send(self, *args, **kwargs): @@ -619,14 +608,14 @@ def send(self, *args, **kwargs): class FailToSendProducerContext: def __init__(self): self._producer = FailToSendProducer( - bootstrap_servers=config["bootstrap_uri"], - security_protocol=config["security_protocol"], - ssl_cafile=config["ssl_cafile"], - ssl_certfile=config["ssl_certfile"], - ssl_keyfile=config["ssl_keyfile"], - sasl_mechanism=config["sasl_mechanism"], - sasl_plain_username=config["sasl_plain_username"], - sasl_plain_password=config["sasl_plain_password"], + bootstrap_servers=config.bootstrap_uri, + security_protocol=config.security_protocol, + ssl_cafile=config.ssl_cafile, + ssl_certfile=config.ssl_certfile, + ssl_keyfile=config.ssl_keyfile, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_plain_username, + sasl_plain_password=config.sasl_plain_password, ) def __enter__(self): @@ -656,11 +645,8 @@ def test_backup_restoration_fails_when_producer_send_fails_on_buffer_error( # Make sure topic doesn't exist beforehand. _delete_topic(admin_client, topic_name) - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] class FailToSendProducer(KafkaProducer): def send(self, *args, **kwargs): @@ -672,14 +658,14 @@ def poll(self, timeout: float) -> None: # pylint: disable=unused-argument class FailToSendProducerContext: def __init__(self): self._producer = FailToSendProducer( - bootstrap_servers=config["bootstrap_uri"], - security_protocol=config["security_protocol"], - ssl_cafile=config["ssl_cafile"], - ssl_certfile=config["ssl_certfile"], - ssl_keyfile=config["ssl_keyfile"], - sasl_mechanism=config["sasl_mechanism"], - sasl_plain_username=config["sasl_plain_username"], - sasl_plain_password=config["sasl_plain_password"], + bootstrap_servers=config.bootstrap_uri, + security_protocol=config.security_protocol, + ssl_cafile=config.ssl_cafile, + ssl_certfile=config.ssl_certfile, + ssl_keyfile=config.ssl_keyfile, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_plain_username, + sasl_plain_password=config.sasl_plain_password, ) def __enter__(self): @@ -706,11 +692,9 @@ def test_backup_restoration_override_replication_factor( ) -> None: backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic metadata_path = backup_directory / f"{new_topic.topic}.metadata" - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - } - ) + + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] # pupulate the topic and create a backup for i in range(10): @@ -1194,12 +1178,9 @@ def test_backup_creation_succeeds_no_duplicate_offsets( producer.flush() backup_location = tmp_path / "fails.log" - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": new_topic.topic, - } - ) + config = Config() + config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.topic_name = new_topic.topic class SlowConsumer(KafkaConsumer): def poll(self, *args, **kwargs): diff --git a/tests/integration/config/log4j.properties b/tests/integration/config/log4j.properties index 83a1a93a7..b0c806b4c 100644 --- a/tests/integration/config/log4j.properties +++ b/tests/integration/config/log4j.properties @@ -1,6 +1,6 @@ # Unspecified loggers and loggers with additivity=true output to server.log # Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise -log4j.rootLogger=INFO, kafkaAppender +log4j.rootLogger=DEBUG, kafkaAppender log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH @@ -42,11 +42,11 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.org.apache.zookeeper=INFO # Change the two lines below to adjust the general broker logging level (output to server.log) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=DEBUG +log4j.logger.org.apache.kafka=DEBUG # Change to DEBUG or TRACE to enable request logging -log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.logger.kafka.request.logger=DEBUG, requestAppender log4j.additivity.kafka.request.logger=false # Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output @@ -54,7 +54,7 @@ log4j.additivity.kafka.request.logger=false #log4j.logger.kafka.network.Processor=TRACE, requestAppender #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender #log4j.additivity.kafka.server.KafkaApis=false -log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.logger.kafka.network.RequestChannel$=DEBUG, requestAppender log4j.additivity.kafka.network.RequestChannel$=false log4j.logger.kafka.controller=TRACE, controllerAppender @@ -67,5 +67,5 @@ log4j.logger.state.change.logger=INFO, stateChangeAppender log4j.additivity.state.change.logger=false # Access denials are logged at INFO level, change to DEBUG to also log allowed accesses -log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender +log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender log4j.additivity.kafka.authorizer.logger=false