Skip to content

Commit

Permalink
mimic get_catalog behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Dec 19, 2023
1 parent 5221389 commit ccfeebd
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union, Type, Tuple, Callable, Set

from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.base.relation import InformationSchema, SchemaSearchMap
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.contracts.graph.manifest import Manifest

Expand Down Expand Up @@ -361,7 +361,7 @@ def get_catalog(
schema_map = self._get_catalog_schemas(manifest) # info_schema: [relation_schema]
if len(schema_map) > 1:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog, found " f"{list(schema_map)}"
f"Expected only one database in get_catalog, found {list(schema_map)}"
)

with executor(self.config) as tpe:
Expand All @@ -384,24 +384,31 @@ def get_catalog(
def get_catalog_by_relations(
self, manifest: Manifest, relations: Set[BaseRelation]
) -> Tuple[agate.Table, List[Exception]]:
info_schemas = {r.information_schema() for r in relations}
if len(info_schemas) > 1:
schema_map = SchemaSearchMap()
for relation in relations:
schema_map.add(relation)
if len(schema_map) > 1:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog, found {list(schema_map)}"
)
# TODO: remove this prior to merge
elif len(schema_map) == 0:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog, found " f"{list(info_schemas)}"
"Expected at least one database in get_catalog, found None"
)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info_schema in info_schemas:
for relation in relations:
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self,
str(relation),
self._get_one_catalog_by_relations,
information_schema=info_schema,
relations=[relation],
manifest=manifest,
schema,
self._get_one_catalog,
info,
[schema],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
Expand Down

0 comments on commit ccfeebd

Please sign in to comment.