Skip to content

Commit

Permalink
Merge pull request #894 from Aiven-Open/jjaakola-aiven-fix-schema-rea…
Browse files Browse the repository at this point in the history
…der-offset-handling-on-invalid-records

fix: move offset always, also with invalid processed records
  • Loading branch information
keejon authored Jun 7, 2024
2 parents a189931 + a18f9b5 commit cd49348
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
8 changes: 6 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ def handle_messages(self) -> None:
assert message_key is not None
key = json_decode(message_key)
except JSONDecodeError:
LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset())
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
continue
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
LOG.error(
Expand All @@ -380,7 +382,9 @@ def handle_messages(self) -> None:
try:
value = self._parse_message_value(message_value)
except JSONDecodeError:
LOG.exception("Invalid JSON in msg.value() at offset %s", msg.offset())
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset())
continue

self.handle_msg(key, value)
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
"""

from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import (
KafkaSchemaReader,
Expand All @@ -20,6 +23,7 @@
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

import json
import pytest
import random
import time
Expand Down Expand Up @@ -184,3 +188,58 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP


def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic() -> None:
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)

schema_str = json.dumps(
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
).encode()

ok1_message = Mock(spec=Message)
ok1_message.key.return_value = b'{"keytype":"SCHEMA","subject1":"test","version":1,"magic":1}'
ok1_message.error.return_value = None
ok1_message.value.return_value = schema_str
ok1_message.offset.return_value = 1
invalid_key_message = Mock(spec=Message)
invalid_key_message.key.return_value = "invalid-key"
invalid_key_message.error.return_value = None
invalid_key_message.value.return_value = schema_str
invalid_key_message.offset.return_value = 2
invalid_value_message = Mock(spec=Message)
invalid_value_message.key.return_value = b'{"keytype":"SCHEMA","subject3":"test","version":1,"magic":1}'
invalid_value_message.error.return_value = None
invalid_value_message.value.return_value = "invalid-value"
invalid_value_message.offset.return_value = 3

consumer_mock.consume.side_effect = [ok1_message], [invalid_key_message], [invalid_value_message], []
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 4)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP

schema_reader.handle_messages()
assert schema_reader.offset == 1
assert schema_reader.ready is False
schema_reader.handle_messages()
assert schema_reader.offset == 2
assert schema_reader.ready is False
schema_reader.handle_messages()
assert schema_reader.offset == 3
assert schema_reader.ready is False
schema_reader.handle_messages() # call last time to call _is_ready()
assert schema_reader.offset == 3
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP

0 comments on commit cd49348

Please sign in to comment.