diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py
index 540de023e..f829fe897 100644
--- a/singer_sdk/connectors/sql.py
+++ b/singer_sdk/connectors/sql.py
@@ -881,6 +881,8 @@ def discover_catalog_entry(
         schema_name: str,
         table_name: str,
         is_view: bool,  # noqa: FBT001
+        *,
+        reflect_indices: bool = True,
     ) -> CatalogEntry:
         """Create `CatalogEntry` object for the given table or a view.
 
@@ -890,6 +892,7 @@ def discover_catalog_entry(
             schema_name: Schema name to inspect
             table_name: Name of the table or a view
             is_view: Flag whether this object is a view, returned by `get_object_names`
+            reflect_indices: Whether to reflect indices
 
         Returns:
             `CatalogEntry` object for the given table or a view
@@ -905,11 +908,12 @@ def discover_catalog_entry(
 
         # An element of the columns list is ``None`` if it's an expression and is
         # returned in the ``expressions`` list of the reflected index.
-        possible_primary_keys.extend(
-            index_def["column_names"]  # type: ignore[misc]
-            for index_def in inspected.get_indexes(table_name, schema=schema_name)
-            if index_def.get("unique", False)
-        )
+        if reflect_indices:
+            possible_primary_keys.extend(
+                index_def["column_names"]  # type: ignore[misc]
+                for index_def in inspected.get_indexes(table_name, schema=schema_name)
+                if index_def.get("unique", False)
+            )
 
         key_properties = next(iter(possible_primary_keys), None)
 
@@ -960,9 +964,19 @@ def discover_catalog_entry(
             replication_key=None,  # Must be defined by user
         )
 
-    def discover_catalog_entries(self) -> list[dict]:
+    def discover_catalog_entries(
+        self,
+        *,
+        exclude_schemas: t.Sequence[str] = (),
+        reflect_indices: bool = True,
+    ) -> list[dict]:
         """Return a list of catalog entries from discovery.
 
+        Args:
+            exclude_schemas: A list of schema names to exclude from discovery.
+            reflect_indices: Whether to reflect indices to detect potential primary
+                keys.
+
         Returns:
             The discovered catalog entries as a list.
         """
@@ -970,6 +984,9 @@ def discover_catalog_entries(self) -> list[dict]:
         engine = self._engine
         inspected = sa.inspect(engine)
         for schema_name in self.get_schema_names(engine, inspected):
+            if schema_name in exclude_schemas:
+                continue
+
             # Iterate through each table and view
             for table_name, is_view in self.get_object_names(
                 engine,
@@ -982,6 +999,7 @@ def discover_catalog_entries(self) -> list[dict]:
                     schema_name,
                     table_name,
                     is_view,
+                    reflect_indices=reflect_indices,
                 )
                 result.append(catalog_entry.to_dict())
 
@@ -1217,8 +1235,7 @@ def prepare_schema(self, schema_name: str) -> None:
         Args:
             schema_name: The target schema name.
         """
-        schema_exists = self.schema_exists(schema_name)
-        if not schema_exists:
+        if not self.schema_exists(schema_name):
             self.create_schema(schema_name)
 
     def prepare_table(
diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py
index cccbe4fd5..398ef49f0 100644
--- a/singer_sdk/tap_base.py
+++ b/singer_sdk/tap_base.py
@@ -657,6 +657,9 @@ class SQLTap(Tap):
     querying a database's system tables).
     """
 
+    exclude_schemas: t.Sequence[str] = []
+    """Hard-coded list of stream names to skip when discovering the catalog."""
+
     _tap_connector: SQLConnector | None = None
 
     def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
@@ -700,7 +703,9 @@ def catalog_dict(self) -> dict:
         connector = self.tap_connector
 
         result: dict[str, list[dict]] = {"streams": []}
-        result["streams"].extend(connector.discover_catalog_entries())
+        result["streams"].extend(
+            connector.discover_catalog_entries(exclude_schemas=self.exclude_schemas),
+        )
 
         self._catalog_dict = result
         return self._catalog_dict
diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py
index b4452d961..a8b39521e 100644
--- a/tests/core/test_connector_sql.py
+++ b/tests/core/test_connector_sql.py
@@ -351,6 +351,32 @@ def test_adapt_column_type(self, connector: DuckDBConnector):
             assert result.keys() == ["id", "name"]
             assert result.cursor.description[1][1] == "STRING"
 
+    @pytest.mark.parametrize(
+        "exclude_schemas,expected_streams",
+        [
+            ([], 1),
+            (["memory.my_schema"], 0),
+        ],
+    )
+    def test_discover_catalog_entries_exclude_schemas(
+        self,
+        connector: DuckDBConnector,
+        exclude_schemas: list[str],
+        expected_streams: int,
+    ):
+        with connector._engine.connect() as conn, conn.begin():
+            conn.execute(sa.text("CREATE SCHEMA my_schema"))
+            conn.execute(
+                sa.text(
+                    "CREATE TABLE my_schema.test_table (id INTEGER PRIMARY KEY, name STRING)",  # noqa: E501
+                )
+            )
+        entries = connector.discover_catalog_entries(
+            exclude_schemas=exclude_schemas,
+            reflect_indices=False,
+        )
+        assert len(entries) == expected_streams
+
 
 def test_adapter_without_json_serde():
     registry.register(