diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 45ab5d960..be7a82066 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Union, Type, Tuple, Callable, Set -from dbt.adapters.base.relation import InformationSchema +from dbt.adapters.base.relation import InformationSchema, SchemaSearchMap from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability from dbt.contracts.graph.manifest import Manifest @@ -361,7 +361,7 @@ def get_catalog( schema_map = self._get_catalog_schemas(manifest) # info_schema: [relation_schema] if len(schema_map) > 1: raise dbt.exceptions.CompilationError( - f"Expected only one database in get_catalog, found " f"{list(schema_map)}" + f"Expected only one database in get_catalog, found {list(schema_map)}" ) with executor(self.config) as tpe: @@ -384,24 +384,31 @@ def get_catalog( def get_catalog_by_relations( self, manifest: Manifest, relations: Set[BaseRelation] ) -> Tuple[agate.Table, List[Exception]]: - info_schemas = {r.information_schema() for r in relations} - if len(info_schemas) > 1: + schema_map = SchemaSearchMap() + for relation in relations: + schema_map.add(relation) + if len(schema_map) > 1: + raise dbt.exceptions.CompilationError( + f"Expected only one database in get_catalog, found {list(schema_map)}" + ) + # TODO: remove this prior to merge + elif len(schema_map) == 0: raise dbt.exceptions.CompilationError( - f"Expected only one database in get_catalog, found " f"{list(info_schemas)}" + "Expected at least one database in get_catalog, found None" ) with executor(self.config) as tpe: futures: List[Future[agate.Table]] = [] - for info_schema in info_schemas: - for relation in relations: + for info, schemas in schema_map.items(): + for schema in schemas: futures.append( tpe.submit_connected( self, - str(relation), - self._get_one_catalog_by_relations, - information_schema=info_schema, - relations=[relation], - manifest=manifest, + schema, + self._get_one_catalog, + info, + [schema], + manifest, ) ) catalogs, exceptions = catch_as_completed(futures)