diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f591ac3a5..652071d04 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -353,7 +353,9 @@ def handle_messages(self) -> None: assert message_key is not None key = json_decode(message_key) except JSONDecodeError: - LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset()) + # Invalid entry shall also move the offset so Karapace makes progress towards ready state. + self.offset = msg.offset() + LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset()) continue except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: LOG.error( @@ -380,7 +382,9 @@ def handle_messages(self) -> None: try: value = self._parse_message_value(message_value) except JSONDecodeError: - LOG.exception("Invalid JSON in msg.value() at offset %s", msg.offset()) + # Invalid entry shall also move the offset so Karapace makes progress towards ready state. + self.offset = msg.offset() + LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset()) continue self.handle_msg(key, value) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 052c3ef7f..39ac0dfab 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -6,9 +6,12 @@ """ from concurrent.futures import ThreadPoolExecutor +from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS from karapace.in_memory_database import InMemoryDatabase +from karapace.kafka.consumer import KafkaConsumer +from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher from karapace.schema_reader import ( KafkaSchemaReader, @@ -20,6 +23,7 @@ from tests.base_testcase import BaseTestCase from unittest.mock import Mock +import json import pytest import random import time @@ -184,3 +188,58 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: schema_reader.handle_messages() assert schema_reader.ready is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP + + +def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic() -> None: + key_formatter_mock = Mock(spec=KeyFormatter) + consumer_mock = Mock(spec=KafkaConsumer) + + schema_str = json.dumps( + {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} + ).encode() + + ok1_message = Mock(spec=Message) + ok1_message.key.return_value = b'{"keytype":"SCHEMA","subject1":"test","version":1,"magic":1}' + ok1_message.error.return_value = None + ok1_message.value.return_value = schema_str + ok1_message.offset.return_value = 1 + invalid_key_message = Mock(spec=Message) + invalid_key_message.key.return_value = "invalid-key" + invalid_key_message.error.return_value = None + invalid_key_message.value.return_value = schema_str + invalid_key_message.offset.return_value = 2 + invalid_value_message = Mock(spec=Message) + invalid_value_message.key.return_value = b'{"keytype":"SCHEMA","subject3":"test","version":1,"magic":1}' + invalid_value_message.error.return_value = None + invalid_value_message.value.return_value = "invalid-value" + invalid_value_message.offset.return_value = 3 + + consumer_mock.consume.side_effect = [ok1_message], [invalid_key_message], [invalid_value_message], [] + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 4) + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=DEFAULTS, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP + + schema_reader.handle_messages() + assert schema_reader.offset == 1 + assert schema_reader.ready is False + schema_reader.handle_messages() + assert schema_reader.offset == 2 + assert schema_reader.ready is False + schema_reader.handle_messages() + assert schema_reader.offset == 3 + assert schema_reader.ready is False + schema_reader.handle_messages() # call last time to call _is_ready() + assert schema_reader.offset == 3 + assert schema_reader.ready is True + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP