From d33d9c4e71c73ad2ddcbf9df93a74eae9c7bc353 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Tue, 30 Jul 2024 08:09:51 -0700 Subject: [PATCH] feat(actions): add docs propagation action for columns --- .github/workflows/datahub-actions-docker.yml | 43 +- datahub-actions/setup.py | 6 +- .../src/datahub_actions/api/action_graph.py | 10 + .../plugin/action/mcl_utils.py | 56 ++ .../plugin/action/propagation/__init__.py | 13 + .../action/propagation/docs/__init__.py | 13 + .../propagation/docs/propagation_action.py | 645 ++++++++++++++++++ .../action/propagation/propagation_utils.py | 173 +++++ .../plugin/action/stats_util.py | 204 ++++++ datahub-actions/tests/unit/test_helpers.py | 17 +- docker/config/doc_propagation_action.yaml | 34 + 11 files changed, 1176 insertions(+), 38 deletions(-) create mode 100644 datahub-actions/src/datahub_actions/plugin/action/mcl_utils.py create mode 100644 datahub-actions/src/datahub_actions/plugin/action/propagation/__init__.py create mode 100644 datahub-actions/src/datahub_actions/plugin/action/propagation/docs/__init__.py create mode 100644 datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py create mode 100644 datahub-actions/src/datahub_actions/plugin/action/propagation/propagation_utils.py create mode 100644 datahub-actions/src/datahub_actions/plugin/action/stats_util.py create mode 100644 docker/config/doc_propagation_action.yaml diff --git a/.github/workflows/datahub-actions-docker.yml b/.github/workflows/datahub-actions-docker.yml index 791edbcd..2ff45b74 100644 --- a/.github/workflows/datahub-actions-docker.yml +++ b/.github/workflows/datahub-actions-docker.yml @@ -55,16 +55,18 @@ jobs: uses: actions/checkout@v3 - name: Docker meta id: docker_meta - uses: crazy-max/ghaction-docker-meta@v1 + uses: docker/metadata-action@v5 with: images: | acryldata/datahub-actions - tag-custom: ${{ needs.setup.outputs.tag }} - tag-custom-only: true + tags: | + type=raw,value=${{ needs.setup.outputs.tag }} + # tag-custom: ${{ needs.setup.outputs.tag }} + # tag-custom-only: true - name: Set up QEMU - uses: docker/setup-qemu-action@v2 + uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to DockerHub uses: docker/login-action@v2 with: @@ -78,8 +80,7 @@ jobs: tags: ${{ steps.docker_meta.outputs.tags }} push: ${{ needs.setup.outputs.publish == 'true' }} target: final - build-args: - "GEM_FURY_TOKEN=${{ secrets.GEMFURY_PULL_TOKEN }}" + build-args: 'GEM_FURY_TOKEN=${{ secrets.GEMFURY_PULL_TOKEN }}' slim_image: name: Build & Push Image to DockerHub (slim) runs-on: ubuntu-latest @@ -89,13 +90,13 @@ jobs: - name: Check out the repo (slim) uses: actions/checkout@v3 - name: Docker meta (slim) - id: docker_meta - uses: crazy-max/ghaction-docker-meta@v1 + id: docker_meta_slim + uses: docker/metadata-action@v5 with: images: | - acryldata/datahub-actions - tag-custom: ${{ needs.setup.outputs.tag }} - tag-custom-only: true + acryldata/datahub-actions-slim + tags: | + type=raw,value=${{ needs.setup.outputs.tag }} - name: Set up QEMU (slim) uses: docker/setup-qemu-action@v2 - name: Set up Docker Buildx (slim) @@ -105,14 +106,6 @@ jobs: with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} - - name: Docker meta (slim) - id: docker_meta_slim - uses: crazy-max/ghaction-docker-meta@v1 - with: - images: | - acryldata/datahub-actions-slim - tag-custom: ${{ needs.setup.outputs.tag }} - tag-custom-only: true - name: Build & Push Image (slim) uses: docker/build-push-action@v6 with: @@ -151,7 +144,7 @@ jobs: # output: 'trivy-results.sarif' # severity: 'CRITICAL,HIGH' # ignore-unfixed: true - # vuln-type: "os,library" + # vuln-type: "os,library" # - name: Upload Trivy scan results to GitHub Security tab # uses: github/codeql-action/upload-sarif@v2 # with: @@ -161,12 +154,12 @@ jobs: contents: read # for actions/checkout to fetch code security-events: write # for github/codeql-action/upload-sarif to upload SARIF results actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status - name: "[Monitoring] Scan slim action images for vulnerabilities" + name: '[Monitoring] Scan slim action images for vulnerabilities' runs-on: ubuntu-latest needs: [setup, slim_image] steps: - name: Checkout # adding checkout step just to make trivy upload happy - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Download image (slim) uses: ishworkh/docker-image-artifact-download@v1 if: ${{ needs.setup.outputs.publish != 'true' }} @@ -183,8 +176,8 @@ jobs: output: 'trivy-results.sarif' severity: 'CRITICAL,HIGH' ignore-unfixed: true - vuln-type: "os,library" + vuln-type: 'os,library' - name: Upload Trivy scan results to GitHub Security tab (slim) uses: github/codeql-action/upload-sarif@v2 with: - sarif_file: 'trivy-results.sarif' \ No newline at end of file + sarif_file: 'trivy-results.sarif' diff --git a/datahub-actions/setup.py b/datahub-actions/setup.py index 0c992a67..c4a3d744 100644 --- a/datahub-actions/setup.py +++ b/datahub-actions/setup.py @@ -30,7 +30,7 @@ def get_long_description(): return description -acryl_datahub_min_version = os.environ.get("ACRYL_DATAHUB_MIN_VERSION") or "0.12.1.5" +acryl_datahub_min_version = os.environ.get("ACRYL_DATAHUB_MIN_VERSION") or "0.13.3.6rc1" base_requirements = { f"acryl-datahub[datahub-kafka]>={acryl_datahub_min_version}", @@ -86,6 +86,7 @@ def get_long_description(): "snowflake_tag_propagation": { f"acryl-datahub[snowflake]>={acryl_datahub_min_version}" }, + "doc_propagation": set(), # Transformer Plugins (None yet) } @@ -138,6 +139,7 @@ def get_long_description(): "tag_propagation", "term_propagation", "snowflake_tag_propagation", + "doc_propagation", ] for dependency in plugins[plugin] ), @@ -158,6 +160,7 @@ def get_long_description(): "tag_propagation", "term_propagation", "snowflake_tag_propagation", + "doc_propagation", ] for dependency in plugins[plugin] ), @@ -173,6 +176,7 @@ def get_long_description(): "tag_propagation = datahub_actions.plugin.action.tag.tag_propagation_action:TagPropagationAction", "term_propagation = datahub_actions.plugin.action.term.term_propagation_action:TermPropagationAction", "snowflake_tag_propagation = datahub_actions.plugin.action.snowflake.tag_propagator:SnowflakeTagPropagatorAction", + "doc_propagation = datahub_actions.plugin.action.propagation.docs.propagation_action:DocPropagationAction", ], "datahub_actions.transformer.plugins": [], "datahub_actions.source.plugins": [], diff --git a/datahub-actions/src/datahub_actions/api/action_graph.py b/datahub-actions/src/datahub_actions/api/action_graph.py index 24ddc270..7186a982 100644 --- a/datahub-actions/src/datahub_actions/api/action_graph.py +++ b/datahub-actions/src/datahub_actions/api/action_graph.py @@ -186,6 +186,16 @@ def get_downstreams(self, entity_urn: str) -> List[str]: return entities return [] + def get_upstreams(self, entity_urn: str) -> List[str]: + url_frag = f"/relationships?direction=OUTGOING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}" + url = f"{self.graph._gms_server}{url_frag}" + response = self.graph._get_generic(url) + if response["count"] > 0: + relnships = response["relationships"] + entities = [x["entity"] for x in relnships] + return entities + return [] + def get_relationships( self, entity_urn: str, direction: str, relationship_types: List[str] ) -> List[str]: diff --git a/datahub-actions/src/datahub_actions/plugin/action/mcl_utils.py b/datahub-actions/src/datahub_actions/plugin/action/mcl_utils.py new file mode 100644 index 00000000..d4b37cc8 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/mcl_utils.py @@ -0,0 +1,56 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Callable + +from datahub.metadata.schema_classes import MetadataChangeLogClass + +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.event.event_registry import METADATA_CHANGE_LOG_EVENT_V1_TYPE + + +class MCLProcessor: + """ + A utility class to register and process MetadataChangeLog events. + """ + + def __init__(self) -> None: + self.entity_aspect_processors: dict[str, dict[str, Callable]] = {} + pass + + def is_mcl(self, event: EventEnvelope) -> bool: + return event.event_type is METADATA_CHANGE_LOG_EVENT_V1_TYPE + + def register_processor( + self, entity_type: str, aspect: str, processor: Callable + ) -> None: + if entity_type not in self.entity_aspect_processors: + self.entity_aspect_processors[entity_type] = {} + self.entity_aspect_processors[entity_type][aspect] = processor + + def process(self, event: EventEnvelope) -> Any: + + if isinstance(event.event, MetadataChangeLogClass): + entity_type = event.event.entityType + aspect = event.event.aspectName + if ( + entity_type in self.entity_aspect_processors + and aspect in self.entity_aspect_processors[entity_type] + ): + return self.entity_aspect_processors[entity_type][aspect]( + entity_urn=event.event.entityUrn, + aspect_name=event.event.aspectName, + aspect_value=event.event.aspect, + previous_aspect_value=event.event.previousAspectValue, + ) diff --git a/datahub-actions/src/datahub_actions/plugin/action/propagation/__init__.py b/datahub-actions/src/datahub_actions/plugin/action/propagation/__init__.py new file mode 100644 index 00000000..48955489 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/propagation/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/__init__.py b/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/__init__.py new file mode 100644 index 00000000..48955489 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py b/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py new file mode 100644 index 00000000..2774b900 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py @@ -0,0 +1,645 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import time +from typing import Any, Iterable, List, Optional + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.metadata.schema_classes import ( + AuditStampClass, + DocumentationAssociationClass, + DocumentationClass, + EditableSchemaMetadataClass, +) +from datahub.metadata.schema_classes import EntityChangeEventClass as EntityChangeEvent +from datahub.metadata.schema_classes import ( + GenericAspectClass, + MetadataAttributionClass, + MetadataChangeLogClass, +) +from datahub.utilities.urns.urn import Urn +from pydantic import BaseModel, Field, validator + +from datahub_actions.action.action import Action +from datahub_actions.api.action_graph import AcrylDataHubGraph +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.plugin.action.mcl_utils import MCLProcessor +from datahub_actions.plugin.action.propagation.propagation_utils import ( + get_unique_siblings, +) +from datahub_actions.plugin.action.stats_util import ( + ActionStageReport, + EventProcessingStats, +) + +logger = logging.getLogger(__name__) + + +class DocPropagationDirective(BaseModel): + propagate: bool = Field( + description="Indicates whether the documentation should be propagated." + ) + doc_string: Optional[str] = Field( + default=None, description="Documentation string to be propagated." + ) + operation: str = Field( + description="Operation to be performed on the documentation. Can be ADD, MODIFY or REMOVE." + ) + entity: str = Field( + description="Entity URN from which the documentation is propagated. This will either be the same as the origin or the via entity, depending on the propagation path." + ) + origin: str = Field( + description="Origin entity for the documentation. This is the entity that triggered the documentation propagation.", + ) + via: Optional[str] = Field( + None, + description="Via entity for the documentation. This is the direct entity that the documentation was propagated through.", + ) + actor: Optional[str] = Field( + None, + description="Actor that triggered the documentation propagation.", + ) + + +class SourceDetails(BaseModel): + origin: Optional[str] = Field( + None, + description="Origin entity for the documentation. This is the entity that triggered the documentation propagation.", + ) + via: Optional[str] = Field( + None, + description="Via entity for the documentation. This is the direct entity that the documentation was propagated through.", + ) + propagated: Optional[str] = Field( + None, + description="Indicates whether the documentation was propagated.", + ) + actor: Optional[str] = Field( + None, + description="Actor that triggered the documentation propagation.", + ) + + @validator("propagated", pre=True) + def convert_boolean_to_lowercase_string(cls, v: Any) -> Optional[str]: + if isinstance(v, bool): + return str(v).lower() + return v + + +class DocPropagationConfig(ConfigModel): + """ + Configuration model for documentation propagation. + + Attributes: + enabled (bool): Indicates whether documentation propagation is enabled or not. Default is True. + columns_enabled (bool): Indicates whether column documentation propagation is enabled or not. Default is True. + datasets_enabled (bool): Indicates whether dataset level documentation propagation is enabled or not. Default is False. + + Example: + config = DocPropagationConfig(enabled=True) + """ + + enabled: bool = Field( + True, + description="Indicates whether documentation propagation is enabled or not.", + example=True, + ) + columns_enabled: bool = Field( + True, + description="Indicates whether column documentation propagation is enabled or not.", + example=True, + ) + # TODO: Currently this flag does nothing. Datasets are NOT supported for docs propagation. + datasets_enabled: bool = Field( + False, + description="Indicates whether dataset level documentation propagation is enabled or not.", + example=False, + ) + + +def get_field_path(schema_field_urn: str) -> str: + urn = Urn.create_from_string(schema_field_urn) + return urn.get_entity_id()[1] + + +def get_field_doc_from_dataset( + graph: AcrylDataHubGraph, dataset_urn: str, schema_field_urn: str +) -> Optional[str]: + editableSchemaMetadata = graph.graph.get_aspect( + dataset_urn, EditableSchemaMetadataClass + ) + if editableSchemaMetadata is not None: + if editableSchemaMetadata.editableSchemaFieldInfo is not None: + field_info = [ + x + for x in editableSchemaMetadata.editableSchemaFieldInfo + if x.fieldPath == get_field_path(schema_field_urn) + ] + if field_info: + return field_info[0].description + return None + + +ECE_EVENT_TYPE = "EntityChangeEvent_v1" + + +class DocPropagationAction(Action): + def __init__(self, config: DocPropagationConfig, ctx: PipelineContext): + super().__init__() + self.action_urn: str + if not ctx.pipeline_name.startswith("urn:li:dataHubAction"): + self.action_urn = f"urn:li:dataHubAction:{ctx.pipeline_name}" + else: + self.action_urn = ctx.pipeline_name + + self.config: DocPropagationConfig = config + self.last_config_refresh: float = 0 + self.ctx = ctx + self.mcl_processor = MCLProcessor() + self.actor_urn = "urn:li:corpuser:__datahub_system" + + self.mcl_processor.register_processor( + "schemaField", + "documentation", + self.process_schema_field_documentation, + ) + self.refresh_config() + self._stats = ActionStageReport() + self._stats.start() + + def name(self) -> str: + return "DocPropagator" + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action": + action_config = DocPropagationConfig.parse_obj(config_dict or {}) + logger.info(f"Doc Propagation Config action configured with {action_config}") + return cls(action_config, ctx) + + def process_schema_field_documentation( + self, + entity_urn: str, + aspect_name: str, + aspect_value: GenericAspectClass, + previous_aspect_value: Optional[GenericAspectClass], + ) -> Optional[DocPropagationDirective]: + if aspect_name == "documentation": + logger.debug("Processing 'documentation' MCL") + if self.config.columns_enabled: + current_docs = DocumentationClass.from_obj( + json.loads(aspect_value.value) + ) + old_docs = ( + None + if previous_aspect_value is None + else DocumentationClass.from_obj( + json.loads(previous_aspect_value.value) + ) + ) + if current_docs.documentations: + # we assume that the first documentation is the primary one + # we can change this later + current_documentation_instance = current_docs.documentations[0] + source_details = ( + (current_documentation_instance.attribution.sourceDetail) + if current_documentation_instance.attribution + else {} + ) + origin_entity = source_details.get("origin") + if old_docs is None or not old_docs.documentations: + return DocPropagationDirective( + propagate=True, + doc_string=current_documentation_instance.documentation, + operation="ADD", + entity=entity_urn, + origin=origin_entity, + via=entity_urn, + actor=self.actor_urn, + ) + else: + if ( + current_docs.documentations[0].documentation + != old_docs.documentations[0].documentation + ): + return DocPropagationDirective( + propagate=True, + doc_string=current_documentation_instance.documentation, + operation="MODIFY", + entity=entity_urn, + origin=origin_entity, + via=entity_urn, + actor=self.actor_urn, + ) + return None + + def should_propagate( + self, event: EventEnvelope + ) -> Optional[DocPropagationDirective]: + + if self.mcl_processor.is_mcl(event): + return self.mcl_processor.process(event) + if event.event_type == "EntityChangeEvent_v1": + assert isinstance(event.event, EntityChangeEvent) + # logger.info(f"Received event {event}") + assert self.ctx.graph is not None + semantic_event = event.event + if ( + semantic_event.category == "DOCUMENTATION" + and self.config is not None + and self.config.enabled + ): + logger.debug("Processing EntityChangeEvent Documentation Change") + if self.config.columns_enabled and ( + semantic_event.entityType == "schemaField" + ): + if semantic_event.parameters: + parameters = semantic_event.parameters + else: + parameters = semantic_event._inner_dict.get( + "__parameters_json", {} + ) + doc_string = parameters.get("description") + origin = parameters.get("origin") + origin = origin or semantic_event.entityUrn + via = ( + semantic_event.entityUrn + if origin != semantic_event.entityUrn + else None + ) + logger.debug(f"Origin: {origin}") + logger.debug(f"Via: {via}") + logger.debug(f"Doc string: {doc_string}") + logger.debug(f"Semantic event {semantic_event}") + if doc_string: + return DocPropagationDirective( + propagate=True, + doc_string=doc_string, + operation=semantic_event.operation, + entity=semantic_event.entityUrn, + origin=origin, + via=via, # if origin is set, then via is the entity itself + actor=( + semantic_event.auditStamp.actor + if semantic_event.auditStamp + else self.actor_urn + ), + ) + return None + + def modify_docs_on_columns( + self, + graph: AcrylDataHubGraph, + operation: str, + schema_field_urn: str, + dataset_urn: str, + field_doc: Optional[str], + context: SourceDetails, + ) -> Optional[MetadataChangeProposalWrapper]: + if context.origin == schema_field_urn: + # No need to propagate to self + return None + + if not dataset_urn.startswith("urn:li:dataset"): + logger.error( + f"Invalid dataset urn {dataset_urn}. Must start with urn:li:dataset" + ) + return None + + auditStamp = AuditStampClass( + time=int(time.time() * 1000.0), actor=self.actor_urn + ) + + source_details = context.dict(exclude_none=True) + attribution: MetadataAttributionClass = MetadataAttributionClass( + source=self.action_urn, + time=auditStamp.time, + actor=self.actor_urn, + sourceDetail=source_details, + ) + documentations = graph.graph.get_aspect(schema_field_urn, DocumentationClass) + if documentations: + mutation_needed = False + action_sourced = False + # we check if there are any existing documentations generated by + # this action, if so, we update them + # otherwise, we add a new documentation entry sourced by this action + for doc_association in documentations.documentations: + if doc_association.attribution and doc_association.attribution.source: + if doc_association.attribution.source == self.action_urn: + action_sourced = True + if doc_association.documentation != field_doc: + mutation_needed = True + if operation == "ADD" or operation == "MODIFY": + doc_association.documentation = field_doc or "" + doc_association.attribution = attribution + elif operation == "REMOVE": + # TODO : should we remove the documentation or just set it to empty string? + # Ideally we remove it + doc_association.documentation = "" + doc_association.attribution = attribution + if not action_sourced: + documentations.documentations.append( + DocumentationAssociationClass( + documentation=field_doc or "", + attribution=attribution, + ) + ) + mutation_needed = True + else: + # no docs found, create a new one + # we don't check editableSchemaMetadata because our goal is to + # propagate documentation to downstream entities + # UI will handle resolving priorities and conflicts + documentations = DocumentationClass( + documentations=[ + DocumentationAssociationClass( + documentation=field_doc or "", + attribution=attribution, + ) + ] + ) + mutation_needed = True + + if mutation_needed: + logger.debug( + f"Will emit documentation change proposal for {schema_field_urn} with {field_doc}" + ) + return MetadataChangeProposalWrapper( + entityUrn=schema_field_urn, + aspect=documentations, + ) + return None + + def refresh_config(self, event: Optional[EventEnvelope] = None) -> None: + """ + Fetches important configuration flags from the global settings entity to + override client-side settings. + If not found, it will use the client-side values. + """ + now = time.time() + try: + if now - self.last_config_refresh > 60 or self._is_settings_change(event): + assert self.ctx.graph + entity_dict = self.ctx.graph.graph.get_entity_raw( + "urn:li:globalSettings:0", ["globalSettingsInfo"] + ) + if entity_dict: + global_settings = entity_dict.get("aspects", {}).get( + "globalSettingsInfo" + ) + if global_settings: + doc_propagation_config = global_settings.get("value", {}).get( + "docPropagation" + ) + if doc_propagation_config: + if doc_propagation_config.get("enabled") is not None: + logger.info( + "Overwriting the asset-level config using globalSettings" + ) + self.config.enabled = doc_propagation_config.get( + "enabled" + ) + if ( + doc_propagation_config.get("columnPropagationEnabled") + is not None + ): + logger.info( + "Overwriting the column-level config using globalSettings" + ) + self.config.columns_enabled = ( + doc_propagation_config.get( + "columnPropagationEnabled" + ) + ) + except Exception: + # We don't want to fail the pipeline if we can't fetch the config + logger.warning( + "Error fetching global settings for doc propagation. Will try again in 1 minute.", + exc_info=True, + ) + self.last_config_refresh = now + + def _is_settings_change(self, event: Optional[EventEnvelope]) -> bool: + if event and isinstance(event.event, MetadataChangeLogClass): + entity_type = event.event.entityType + if entity_type == "globalSettings": + return True + return False + + def get_upstreams(self, graph: AcrylDataHubGraph, entity_urn: str) -> List[str]: + """ + Fetch the upstreams for an dataset or schema field. + Note that this DOES NOT support DataJob upstreams, or any intermediate nodes. + """ + import urllib.parse + + url_frag = f"/relationships?direction=OUTGOING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}" + url = f"{graph.graph._gms_server}{url_frag}" + response = graph.graph._get_generic(url) + if response["count"] > 0: + relnships = response["relationships"] + entities = [x["entity"] for x in relnships] + return entities + return [] + + def _only_one_upstream_field( + self, + graph: AcrylDataHubGraph, + downstream_field: str, + upstream_field: str, + ) -> bool: + """ + Check if there is only one upstream field for the downstream field. If upstream_field is provided, + it will also check if the upstream field is the only upstream + + TODO: We should cache upstreams because we make this fetch upstreams call FOR EVERY downstream that must be propagated to. + """ + upstreams = ( + graph.get_upstreams(entity_urn=downstream_field) + if hasattr(graph, "get_upstreams") + else self.get_upstreams(graph, downstream_field) + ) + # Use a set here in case there are duplicated upstream edges + upstream_fields = list( + {x for x in upstreams if x.startswith("urn:li:schemaField")} + ) + + # If we found no upstreams for the downstream field, simply skip. + if not upstream_fields: + logger.warning( + f"No upstream fields found. Skipping propagation to downstream {downstream_field}" + ) + return False + + # Convert the set to a list to access by index + result = len(upstream_fields) == 1 and upstream_fields[0] == upstream_field + if not result: + logger.warning( + f"Failed check for single upstream: Found upstream fields {upstream_fields} for downstream {downstream_field}. Expecting only one upstream field: {upstream_field}" + ) + return result + + def act(self, event: EventEnvelope) -> None: + assert self.ctx.graph + for mcp in self.act_async(event): + self.ctx.graph.graph.emit(mcp) + + def act_async( + self, event: EventEnvelope + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Process the event asynchronously and return the change proposals + """ + self.refresh_config(event) + if not self.config.enabled or not self.config.columns_enabled: + logger.warning("Doc propagation is disabled. Skipping event") + return + else: + logger.debug(f"Processing event {event}") + + if not self._stats.event_processing_stats: + self._stats.event_processing_stats = EventProcessingStats() + + stats = self._stats.event_processing_stats + stats.start(event) + + try: + doc_propagation_directive = self.should_propagate(event) + logger.debug( + f"Doc propagation directive for {event}: {doc_propagation_directive}" + ) + + if ( + doc_propagation_directive is not None + and doc_propagation_directive.propagate + ): + self._stats.increment_assets_processed(doc_propagation_directive.entity) + context = SourceDetails( + origin=doc_propagation_directive.origin, + via=doc_propagation_directive.via, + propagated=True, + actor=doc_propagation_directive.actor, + ) + assert self.ctx.graph + + # TODO: Put each mechanism behind a config flag to be controlled externally. + + # Step 1: Propagate to downstream entities + yield from self._propagate_to_downstreams( + doc_propagation_directive, context + ) + + # Step 2: Propagate to sibling entities + yield from self._propagate_to_siblings( + doc_propagation_directive, context + ) + + stats.end(event, success=True) + + except Exception: + logger.error(f"Error processing event {event}:", exc_info=True) + stats.end(event, success=False) + + def _propagate_to_downstreams( + self, doc_propagation_directive: DocPropagationDirective, context: SourceDetails + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Propagate the documentation to downstream entities. + """ + assert self.ctx.graph + downstreams = self.ctx.graph.get_downstreams( + entity_urn=doc_propagation_directive.entity + ) + logger.debug( + f"Downstreams: {downstreams} for {doc_propagation_directive.entity}" + ) + entity_urn = doc_propagation_directive.entity + + if entity_urn.startswith("urn:li:schemaField"): + downstream_fields = { + x for x in downstreams if x.startswith("urn:li:schemaField") + } + for field in downstream_fields: + schema_field_urn = Urn.create_from_string(field) + parent_urn = schema_field_urn.get_entity_id()[0] + field_path = schema_field_urn.get_entity_id()[1] + + logger.debug( + f"Will {doc_propagation_directive.operation} documentation {doc_propagation_directive.doc_string} for {field_path} on {parent_urn}" + ) + + if parent_urn.startswith("urn:li:dataset"): + if self._only_one_upstream_field( + self.ctx.graph, + downstream_field=str(schema_field_urn), + upstream_field=entity_urn, + ): + maybe_mcp = self.modify_docs_on_columns( + self.ctx.graph, + doc_propagation_directive.operation, + field, + parent_urn, + field_doc=doc_propagation_directive.doc_string, + context=context, + ) + if maybe_mcp: + yield maybe_mcp + + elif parent_urn.startswith("urn:li:chart"): + logger.warning( + "Charts are expected to have fields that are dataset schema fields. Skipping for now..." + ) + + self._stats.increment_assets_impacted(field) + + elif entity_urn.startswith("urn:li:dataset"): + logger.debug( + "Dataset level documentation propagation is not yet supported!" + ) + + def _propagate_to_siblings( + self, doc_propagation_directive: DocPropagationDirective, context: SourceDetails + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Propagate the documentation to sibling entities. + """ + assert self.ctx.graph + entity_urn = doc_propagation_directive.entity + siblings = get_unique_siblings(self.ctx.graph, entity_urn) + + logger.debug(f"Siblings: {siblings} for {doc_propagation_directive.entity}") + + for sibling in siblings: + if entity_urn.startswith("urn:li:schemaField") and sibling.startswith( + "urn:li:schemaField" + ): + parent_urn = Urn.create_from_string(sibling).get_entity_id()[0] + self._stats.increment_assets_impacted(sibling) + maybe_mcp = self.modify_docs_on_columns( + self.ctx.graph, + doc_propagation_directive.operation, + schema_field_urn=sibling, + dataset_urn=parent_urn, + field_doc=doc_propagation_directive.doc_string, + context=context, + ) + if maybe_mcp: + yield maybe_mcp + + def close(self) -> None: + return super().close() diff --git a/datahub-actions/src/datahub_actions/plugin/action/propagation/propagation_utils.py b/datahub-actions/src/datahub_actions/plugin/action/propagation/propagation_utils.py new file mode 100644 index 00000000..b203a952 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/propagation/propagation_utils.py @@ -0,0 +1,173 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import time +from abc import abstractmethod +from enum import Enum +from typing import Dict, Iterable, List, Optional, Tuple + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import make_schema_field_urn +from datahub.ingestion.graph.client import SearchFilterRule +from datahub.metadata.schema_classes import MetadataAttributionClass +from datahub.utilities.urns.urn import Urn +from pydantic.fields import Field +from pydantic.main import BaseModel + +from datahub_actions.api.action_graph import AcrylDataHubGraph + +SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system" + + +class RelationshipType(Enum): + LINEAGE = "lineage" # signifies all types of lineage + HIERARCHY = "hierarchy" # signifies all types of hierarchy + + +class DirectionType(Enum): + UP = "up" # signifies upstream or parent (depending on relationship type) + DOWN = "down" # signifies downstream or child (depending on relationship type) + ALL = "all" # signifies all directions + + +class PropagationDirective(BaseModel): + propagate: bool + operation: str + relationship: RelationshipType = RelationshipType.LINEAGE + direction: DirectionType = DirectionType.UP + entity: str = Field( + description="Entity that currently triggered the propagation directive", + ) + origin: str = Field( + description="Origin entity for the association. This is the entity that triggered the propagation.", + ) + via: Optional[str] = Field( + None, + description="Via entity for the association. This is the direct entity that the propagation came through.", + ) + actor: Optional[str] = Field( + None, + description="Actor that triggered the propagation through the original association.", + ) + + +def get_attribution_and_context_from_directive( + action_urn: str, + propagation_directive: PropagationDirective, + actor: str = SYSTEM_ACTOR, + time: int = int(time.time() * 1000.0), +) -> Tuple[MetadataAttributionClass, str]: + """ + Given a propagation directive, return the attribution and context for + the directive. + Attribution is the official way to track the source of metadata in + DataHub. + Context is the older way to track the source of metadata in DataHub. + We populate both to ensure compatibility with older versions of DataHub. + """ + source_detail: dict[str, str] = { + "origin": propagation_directive.origin, + "propagated": "true", + } + if propagation_directive.actor: + source_detail["actor"] = propagation_directive.actor + else: + source_detail["actor"] = actor + if propagation_directive.via: + source_detail["via"] = propagation_directive.via + context_dict: dict[str, str] = {} + context_dict.update(source_detail) + return ( + MetadataAttributionClass( + time=time, + actor=actor, + source=action_urn, + sourceDetail=source_detail, + ), + json.dumps(context_dict), + ) + + +class SelectedAsset(BaseModel): + """ + A selected asset is a data structure that represents an asset that has been + selected for processing by a propagator. + """ + + urn: str # URN of the asset that has been selected + target_entity_type: str # entity type that is being targeted by the propagator. e.g. schemaField even if asset is of type dataset + + +class ComposablePropagator: + + @abstractmethod + def asset_filters(self) -> Dict[str, Dict[str, List[SearchFilterRule]]]: + """ + Returns a dictionary of asset filters that are used to filter the assets + based on the configuration of the action. + """ + pass + + @abstractmethod + def process_one_asset( + self, asset: SelectedAsset, operation: str + ) -> Iterable[PropagationDirective]: + """ + Given an asset, returns a list of propagation directives + + :param asset_urn: URN of the asset + :param target_entity_type: The entity type of the target entity (Note: + this can be different from the entity type of the asset. e.g. we + might process a dataset while the target entity_type is a column + (schemaField)) + :param operation: The operation that triggered the propagation (ADD / + REMOVE) + :return: A list of PropagationDirective objects + """ + pass + + +def get_unique_siblings(graph: AcrylDataHubGraph, entity_urn: str) -> list[str]: + """ + Get unique siblings for the entity urn + """ + + if entity_urn.startswith("urn:li:schemaField"): + parent_urn = Urn.create_from_string(entity_urn).get_entity_id()[0] + entity_field_path = Urn.create_from_string(entity_urn).get_entity_id()[1] + # Does my parent have siblings? + siblings: Optional[models.SiblingsClass] = graph.graph.get_aspect( + parent_urn, + models.SiblingsClass, + ) + if siblings and siblings.siblings: + other_siblings = [x for x in siblings.siblings if x != parent_urn] + if len(other_siblings) == 1: + target_sibling = other_siblings[0] + # now we need to find the schema field in this sibling that + # matches us + if target_sibling.startswith("urn:li:dataset"): + schema_fields = graph.graph.get_aspect( + target_sibling, models.SchemaMetadataClass + ) + if schema_fields: + for schema_field in schema_fields.fields: + if schema_field.fieldPath == entity_field_path: + # we found the sibling field + schema_field_urn = make_schema_field_urn( + target_sibling, schema_field.fieldPath + ) + return [schema_field_urn] + return [] diff --git a/datahub-actions/src/datahub_actions/plugin/action/stats_util.py b/datahub-actions/src/datahub_actions/plugin/action/stats_util.py new file mode 100644 index 00000000..c4a9c7ef --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/stats_util.py @@ -0,0 +1,204 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import json +from datetime import datetime, timezone +from enum import Enum +from typing import Dict, Optional + +import pydantic +from datahub.ingestion.api.report import Report, SupportsAsObj +from pydantic import BaseModel + +from datahub_actions.action.action import Action +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.event.event_registry import ( + ENTITY_CHANGE_EVENT_V1_TYPE, + METADATA_CHANGE_LOG_EVENT_V1_TYPE, + EntityChangeEvent, + MetadataChangeLogEvent, +) +from datahub_actions.pipeline.pipeline_context import PipelineContext + + +class EventProcessingStats(BaseModel): + """ + A class to represent the event-oriented processing stats for a pipeline. + Note: Might be merged into ActionStats in the future. + """ + + last_seen_event_time: Optional[str] = pydantic.Field( + None, description="The event time of the last event we processed" + ) + last_event_processed_time: Optional[str] = pydantic.Field( + None, description="The time at which we processed the last event" + ) + last_seen_event_time_success: Optional[str] = pydantic.Field( + None, description="The event time of the last event we processed successfully" + ) + last_event_processed_time_success: Optional[str] = pydantic.Field( + None, description="The time at which we processed the last event successfully" + ) + last_seen_event_time_failure: Optional[str] = pydantic.Field( + None, description="The event time of the last event we processed unsuccessfully" + ) + last_event_processed_time_failure: Optional[str] = pydantic.Field( + None, description="The time at which we processed the last event unsuccessfully" + ) + + @classmethod + def _get_event_time(cls, event: EventEnvelope) -> Optional[str]: + """ + Get the event time from the event. + """ + if event.event_type == ENTITY_CHANGE_EVENT_V1_TYPE: + if isinstance(event.event, EntityChangeEvent): + return ( + datetime.fromtimestamp( + event.event.auditStamp.time / 1000.0, tz=timezone.utc + ).isoformat() + if event.event.auditStamp + else None + ) + elif event.event_type == METADATA_CHANGE_LOG_EVENT_V1_TYPE: + if isinstance(event.event, MetadataChangeLogEvent): + return ( + datetime.fromtimestamp( + event.event.auditHeader.time / 1000.0, tz=timezone.utc + ).isoformat() + if event.event.auditHeader + else None + ) + return None + + def start(self, event: EventEnvelope) -> None: + """ + Update the stats based on the event. + """ + self.last_event_processed_time = datetime.now(tz=timezone.utc).isoformat() + self.last_seen_event_time = self._get_event_time(event) + + def end(self, event: EventEnvelope, success: bool) -> None: + """ + Update the stats based on the event. + """ + + if success: + self.last_seen_event_time_success = ( + self._get_event_time(event) or self.last_seen_event_time_success + ) + self.last_event_processed_time_success = datetime.now( + timezone.utc + ).isoformat() + else: + self.last_seen_event_time_failure = ( + self._get_event_time(event) or self.last_seen_event_time_failure + ) + self.last_event_processed_time_failure = datetime.now( + timezone.utc + ).isoformat() + + def __str__(self) -> str: + return json.dumps(self.dict(), indent=2) + + +class StageStatus(str, Enum): + SUCCESS = "success" + FAILURE = "failure" + RUNNING = "running" + STOPPED = "stopped" + + +class ActionStageReport(BaseModel): + # All stats here are only for the current run of the current stage. + + # Attributes that should be aggregated across runs should be prefixed with "total_". + # Only ints can be aggregated. + + start_time: int = 0 + + end_time: int = 0 + + total_assets_to_process: int = -1 # -1 if unknown + + total_assets_processed: int = 0 + + total_actions_executed: int = 0 + + total_assets_impacted: int = 0 + + event_processing_stats: Optional[EventProcessingStats] = None + + status: Optional[StageStatus] = None + + def start(self) -> None: + self.start_time = int(datetime.now().timestamp() * 1000) + self.status = StageStatus.RUNNING + + def end(self, success: bool) -> None: + self.end_time = int(datetime.now().timestamp() * 1000) + self.status = StageStatus.SUCCESS if success else StageStatus.FAILURE + + def increment_assets_processed(self, asset: str) -> None: + # TODO: If we want to track unique assets, use a counting set. + # For now, just increment + self.total_assets_processed += 1 + + def increment_assets_impacted(self, asset: str) -> None: + # TODO: If we want to track unique assets, use a counting set. + # For now, just increment + self.total_assets_impacted += 1 + + def as_obj(self) -> dict: + return Report.to_pure_python_obj(self) + + def aggregatable_stats(self) -> Dict[str, int]: + all_items = self.dict() + + stats = {k: v for k, v in all_items.items() if k.startswith("total_")} + + # If total_assets_to_process is unknown, don't include it. + if self.total_assets_to_process == -1: + stats.pop("total_assets_to_process") + + # Add a few additional special cases of aggregatable stats. + if self.event_processing_stats: + for key, value in self.event_processing_stats.dict().items(): + if value is not None: + stats[f"event_processing_stats.{key}"] = str(value) + + return stats + + +class ReportingAction(Action, abc.ABC): + def __init__(self, ctx: PipelineContext): + super().__init__() + self.ctx = ctx + + self.action_urn: str + if "urn:li:dataHubAction:" in ctx.pipeline_name: + # The pipeline name might get a prefix before the urn:li:... part. + # We need to remove that prefix to get the urn:li:dataHubAction part. + action_urn_part = ctx.pipeline_name.split("urn:li:dataHubAction:")[1] + self.action_urn = f"urn:li:dataHubAction:{action_urn_part}" + else: + self.action_urn = f"urn:li:dataHubAction:{ctx.pipeline_name}" + + @abc.abstractmethod + def get_report(self) -> ActionStageReport: + pass + + +assert isinstance(ActionStageReport(), SupportsAsObj) diff --git a/datahub-actions/tests/unit/test_helpers.py b/datahub-actions/tests/unit/test_helpers.py index 2d7b5adb..b799b056 100644 --- a/datahub-actions/tests/unit/test_helpers.py +++ b/datahub-actions/tests/unit/test_helpers.py @@ -41,18 +41,11 @@ # Mocked Metadata Change Log representing a Domain change for a Dataset. metadata_change_log_event = MetadataChangeLogEvent.from_class( MetadataChangeLogClass( - "dataset", - "UPSERT", - None, - "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - None, - "domains", - None, - None, - None, - None, - None, - AuditStampClass(0, "urn:li:corpuser:datahub"), + entityType="dataset", + changeType="UPSERT", + entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + aspectName="domains", + created=AuditStampClass(0, "urn:li:corpuser:datahub"), ) ) diff --git a/docker/config/doc_propagation_action.yaml b/docker/config/doc_propagation_action.yaml new file mode 100644 index 00000000..bb822b66 --- /dev/null +++ b/docker/config/doc_propagation_action.yaml @@ -0,0 +1,34 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +name: ${DATAHUB_ACTIONS_DOC_PROPAGATION_CONSUMER_GROUP_ID:-datahub_doc_propagation_action} +enabled: ${DATAHUB_ACTIONS_DOC_PROPAGATION_ENABLED:-true} +source: + type: 'kafka' + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} + topic_routes: + mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1} + pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1} +action: + type: doc_propagation + config: + # Action-specific configs (map) + columns_enabled: ${DATAHUB_ACTIONS_DOC_PROPAGATION_COLUMNS_ENABLED:-true} + +datahub: + server: 'http://${DATAHUB_GMS_HOST:-localhost}:${DATAHUB_GMS_PORT:-8080}' + extra_headers: + Authorization: 'Basic ${DATAHUB_SYSTEM_CLIENT_ID:-__datahub_system}:${DATAHUB_SYSTEM_CLIENT_SECRET:-JohnSnowKnowsNothing}'