From 9b93fd1f84d03ddab4c0051caef67885b932757a Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Mon, 10 Jun 2024 15:40:44 +0530 Subject: [PATCH] KSQL-12194, KSQL-12195 | Classify missing schema exception and record too large exceptions as user errors. --- .../ksql/query/MissingSchemaClassifier.java | 51 +++++++++++++++++ .../io/confluent/ksql/query/QueryBuilder.java | 2 + .../ksql/query/RecordTooLargeClassifier.java | 56 +++++++++++++++++++ .../schema/registry/SchemaRegistryUtil.java | 13 ++++- .../query/MissingSchemaClassifierTest.java | 49 ++++++++++++++++ .../query/MissingSubjectClassifierTest.java | 1 - .../query/RecordTooLargeClassifierTest.java | 48 ++++++++++++++++ 7 files changed, 217 insertions(+), 3 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingSchemaClassifier.java create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/RecordTooLargeClassifier.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSchemaClassifierTest.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/RecordTooLargeClassifierTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingSchemaClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingSchemaClassifier.java new file mode 100644 index 000000000000..3ca496e1d9e7 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingSchemaClassifier.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +import io.confluent.ksql.query.QueryError.Type; +import io.confluent.ksql.schema.registry.SchemaRegistryUtil; + +/** + * {@code MissingSchemaClassifier} classifies missing SR schema exceptions as user error + */ +public class MissingSchemaClassifier implements QueryErrorClassifier { + private static final Logger LOG = LoggerFactory.getLogger(MissingSchemaClassifier.class); + + private final String queryId; + + public MissingSchemaClassifier(final String queryId) { + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + @Override + public Type classify(final Throwable e) { + final Type type = SchemaRegistryUtil.isSchemaNotFoundErrorCode(e) ? Type.USER : Type.UNKNOWN; + + if (type == Type.USER) { + LOG.info( + "Classified error as USER error based on missing SR schema. Query ID: {} Exception: {}", + queryId, + e.getMessage()); + } + + return type; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 790cb382ca9a..07f484f253e5 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -656,7 +656,9 @@ private QueryErrorClassifier getConfiguredQueryErrorClassifier( final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId) .and(new AuthorizationClassifier(applicationId)) .and(new KsqlFunctionClassifier(applicationId)) + .and(new RecordTooLargeClassifier(applicationId)) .and(new MissingSubjectClassifier(applicationId)) + .and(new MissingSchemaClassifier(applicationId)) .and(new SchemaAuthorizationClassifier(applicationId)) .and(new KsqlSerializationClassifier(applicationId)); return buildConfiguredClassifiers(ksqlConfig, applicationId) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/RecordTooLargeClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/RecordTooLargeClassifier.java new file mode 100644 index 000000000000..b554ceed5bf7 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/RecordTooLargeClassifier.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import io.confluent.ksql.query.QueryError.Type; +import java.util.Objects; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code RecordTooLargeClassifier} classifies records too large to be produced as user exception. + */ +public class RecordTooLargeClassifier implements QueryErrorClassifier { + private static final Logger LOG = LoggerFactory.getLogger(RecordTooLargeClassifier.class); + + private final String queryId; + + public RecordTooLargeClassifier(final String queryId) { + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + @Override + public Type classify(final Throwable e) { + final Type type = e instanceof StreamsException + && ExceptionUtils.getRootCause(e) instanceof RecordTooLargeException + ? Type.USER + : Type.UNKNOWN; + + if (type == Type.USER) { + LOG.info( + "Classified RecordTooLargeException error as USER error. Query ID: {} Exception: {}. " + + "Consider setting ksql.streams.max.request.size property to a higher value.", + queryId, + e.getMessage() + ); + } + + return type; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java index 090efd958672..3e229adcf781 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java @@ -46,8 +46,10 @@ public final class SchemaRegistryUtil { @VisibleForTesting public static final int SUBJECT_NOT_FOUND_ERROR_CODE = 40401; + public static final int SCHEMA_NOT_FOUND_ERROR_CODE = 40403; private SchemaRegistryUtil() { + super(); } public static void cleanupInternalTopicSchemas( @@ -185,6 +187,11 @@ public static boolean isSubjectNotFoundErrorCode(final Throwable error) { && ((RestClientException) error).getErrorCode() == SUBJECT_NOT_FOUND_ERROR_CODE); } + public static boolean isSchemaNotFoundErrorCode(final Throwable error) { + return (error instanceof RestClientException + && ((RestClientException) error).getErrorCode() == SCHEMA_NOT_FOUND_ERROR_CODE); + } + public static boolean isAuthErrorCode(final Throwable error) { return (error instanceof RestClientException && ((((RestClientException) error).getStatus() == HttpStatus.SC_UNAUTHORIZED) @@ -192,7 +199,9 @@ public static boolean isAuthErrorCode(final Throwable error) { } private static boolean isRetriableError(final Throwable error) { - return !isSubjectNotFoundErrorCode(error) && !isAuthErrorCode(error); + return !isSubjectNotFoundErrorCode(error) + && !isSchemaNotFoundErrorCode(error) + && !isAuthErrorCode(error); } private static void hardDeleteSubjectWithRetries( @@ -201,7 +210,7 @@ private static void hardDeleteSubjectWithRetries( try { ExecutorUtil.executeWithRetries( () -> schemaRegistryClient.deleteSubject(subject, true), - error -> isRetriableError(error) + SchemaRegistryUtil::isRetriableError ); } catch (final RestClientException e) { if (isAuthErrorCode(e)) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSchemaClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSchemaClassifierTest.java new file mode 100644 index 000000000000..3b2ee7df6774 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSchemaClassifierTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import org.junit.Test; + +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class MissingSchemaClassifierTest { + @Test + public void shouldClassifyMissingSchemaAsUserError() { + // Given: + final Exception e = new RestClientException("foo", 404, 40403); + + // When: + final QueryError.Type type = new MissingSchemaClassifier("").classify(e); + + // Then: + assertThat(type, is(QueryError.Type.USER)); + } + + @Test + public void shouldClassifyOtherExceptionAsUnknownException() { + // Given: + final Exception e = new Exception("foo"); + + // When: + final QueryError.Type type = new MissingSchemaClassifier("").classify(e); + + // Then: + assertThat(type, is(QueryError.Type.UNKNOWN)); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSubjectClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSubjectClassifierTest.java index 2ac11df4a2fa..d79a970710cd 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSubjectClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSubjectClassifierTest.java @@ -16,7 +16,6 @@ package io.confluent.ksql.query; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.junit.Test; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/RecordTooLargeClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/RecordTooLargeClassifierTest.java new file mode 100644 index 000000000000..99b40e1b6e4a --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/RecordTooLargeClassifierTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.streams.errors.StreamsException; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class RecordTooLargeClassifierTest { + + @Test + public void shouldClassifyRecordTooLargeExceptionAsUserError() { + // Given: + final Exception e = new StreamsException( + "Error encountered trying to send record to topic foo", + new KafkaException( + "Cannot execute transactional method because we are in an error state", + new RecordTooLargeException( + "The message is 1084728 bytes when serialized which is larger than 1048576, which" + + " is the value of the max.request.size configuration.") + ) + ); + + // When: + final QueryError.Type type = new RecordTooLargeClassifier("").classify(e); + + // Then: + assertThat(type, is(QueryError.Type.USER)); + } + +} \ No newline at end of file