Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DQX engine refactor and docs update #138

Merged
merged 10 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions demos/dqx_demo_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
""")

dq_engine = DQEngine(WorkspaceClient())

status = dq_engine.validate_checks(checks)
print(status.has_errors)
print(status.errors)
Expand Down Expand Up @@ -334,5 +335,6 @@ def ends_with_foo(col_name: str) -> Column:
input_df = spark.createDataFrame([["str1"], ["foo"], ["str3"]], schema)

dq_engine = DQEngine(WorkspaceClient())

valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
display(valid_and_quarantined_df)
12 changes: 6 additions & 6 deletions demos/dqx_demo_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@

ws = WorkspaceClient()
dq_engine = DQEngine(ws)
run_config = dq_engine.load_run_config(run_config="default", assume_user=True)
run_config = dq_engine.load_run_config(run_config_name="default", assume_user=True)

# read the input data, limit to 1000 rows for demo purpose
input_df = spark.read.format(run_config.input_format).load(run_config.input_location).limit(1000)
Expand All @@ -101,14 +101,14 @@
print(yaml.safe_dump(checks))

# save generated checks to location specified in the default run configuration inside workspace installation folder
dq_engine.save_checks(checks, run_config_name="default")
dq_engine.save_checks_in_installation(checks, run_config_name="default")
# or save it to an arbitrary workspace location
#dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")

# COMMAND ----------

# MAGIC %md
# MAGIC ### Prepare checks manually (optional)
# MAGIC ### Prepare checks manually and save in the workspace (optional)
# MAGIC
# MAGIC You can modify the check candidates generated by the profiler to suit your needs. Alternatively, you can create checks manually, as demonstrated below, without using the profiler.

Expand Down Expand Up @@ -161,7 +161,7 @@

dq_engine = DQEngine(WorkspaceClient())
# save checks to location specified in the default run configuration inside workspace installation folder
dq_engine.save_checks(checks, run_config_name="default")
dq_engine.save_checks_in_installation(checks, run_config_name="default")
# or save it to an arbitrary workspace location
#dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")

Expand All @@ -175,7 +175,7 @@
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

run_config = dq_engine.load_run_config(run_config="default", assume_user=True)
run_config = dq_engine.load_run_config(run_config_name="default", assume_user=True)

# read the data, limit to 1000 rows for demo purpose
bronze_df = spark.read.format(run_config.input_format).load(run_config.input_location).limit(1000)
Expand All @@ -186,7 +186,7 @@
dq_engine = DQEngine(WorkspaceClient())

# load checks from location defined in the run configuration
checks = dq_engine.load_checks(assume_user=True, run_config_name="default")
checks = dq_engine.load_checks_from_installation(assume_user=True, run_config_name="default")
# or load checks from arbitrary workspace file
# checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/checks.yml")
print(checks)
Expand Down
5 changes: 2 additions & 3 deletions docs/dqx/docs/demos.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ sidebar_position: 4

# Demos

After the [installation](/docs/installation) of the framework,
you can import the following notebooks in the Databricks workspace to try it out:
Install the [installation](/docs/installation) framework, and import the following notebooks in the Databricks workspace to try it out:
* [DQX Demo Notebook (library)](https://github.com/databrickslabs/dqx/blob/main/demos/dqx_demo_library.py) - demonstrates how to use DQX as a library.
* [DQX Demo Notebook (tool)](https://github.com/databrickslabs/dqx/blob/main/demos/dqx_demo_tool.py) - demonstrates how to use DQX when installed in the workspace, including usage of DQX dashboards.
* [DQX Demo Notebook (tool)](https://github.com/databrickslabs/dqx/blob/main/demos/dqx_demo_tool.py) - demonstrates how to use DQX as a tool when installed in the workspace.
* [DQX DLT Demo Notebook](https://github.com/databrickslabs/dqx/blob/main/demos/dqx_dlt_demo.py) - demonstrates how to use DQX with Delta Live Tables (DLT).
15 changes: 8 additions & 7 deletions docs/dqx/docs/dev/contributing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,9 @@ make lint
make test
```

Configure auth to Databricks workspace for integration testing by configuring credentials.

If you want to run the tests from an IDE you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).

Setup required environment variables for executing integration tests and code coverage:
Setup required environment variables for executing integration tests and code coverage using the command line.
Note that integration tests are run automatically when you create a Pull Request in Github.
You can also run them from a local machine by configuring authentication to a Databricks workspace as below:
```shell
export DATABRICKS_HOST=https://<workspace-url>
export DATABRICKS_CLUSTER_ID=<cluster-id>
Expand All @@ -119,9 +116,13 @@ Calculate test coverage and display report in html:
make coverage
```

If you want to be able to run integration tests from your IDE, you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).
The name of the debug environment that you define must be `ws`.

## Running CLI from the local repo

Once you clone the repo locally and install Databricks CLI you can run labs CLI commands.
Once you clone the repo locally and install Databricks CLI you can run labs CLI commands from the root of the repository.
Similar to other databricks cli commands we can specify profile to use with `--profile`.

Authenticate your current machine to your Databricks Workspace:
Expand Down
81 changes: 57 additions & 24 deletions docs/dqx/docs/guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,23 @@ from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

df = spark.read.table("catalog1.schema1.table1")
input_df = spark.read.table("catalog1.schema1.table1")

# profile input data
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(df)
summary_stats, profiles = profiler.profile(input_df)

# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"

# save checks in the workspace
dq_engine = DQEngine(ws)
# in arbitrary workspace location

# save checks in arbitrary workspace location
dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")
# in workspace location specified in the run config (only works if DQX is installed in the workspace)
dq_engine.save_checks(checks, run_config_name="default")
# save checks in the installation folder specified in the default run config (only works if DQX is installed in the workspace)
dq_engine.save_checks_in_installation(checks, run_config_name="default")

# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
Expand Down Expand Up @@ -153,9 +154,9 @@ Fields:
- `check`: column expression containing "function" (check function to apply), "arguments" (check function arguments), and "col_name" (column name as `str` the check will be applied for) or "col_names" (column names as `array` the check will be applied for).
- (optional) `name` for the check: autogenerated if not provided.

#### Loading and execution methods
### Loading and execution methods

**Method 1: load checks from a workspace file in the installation folder**
#### Method 1: Loading checks from a workspace file in the installation folder

If DQX is installed in the workspace, you can load checks based on the run configuration:

Expand All @@ -164,9 +165,10 @@ from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# load check file specified in the run configuration
checks = dq_engine.load_checks(assume_user=True, run_config_name="default")
checks = dq_engine.load_checks_from_installation(assume_user=True, run_config_name="default")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
Expand All @@ -175,9 +177,7 @@ valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
```

Checks are validated automatically as part of the `apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods.

**Method 2: load checks from a workspace file**
#### Method 2: Loading checks from a workspace file

The checks can also be loaded from any file in the Databricks workspace:

Expand All @@ -188,6 +188,8 @@ from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/checks.yml")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)

Expand All @@ -197,7 +199,7 @@ valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)

Checks are validated automatically as part of the `apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods.

**Method 3: load checks from a local file**
#### Method 3: Loading checks from a local file

Checks can also be loaded from a file in the local file system:

Expand All @@ -208,6 +210,8 @@ from databricks.sdk import WorkspaceClient
checks = DQEngine.load_checks_from_local_file("checks.yml")
dq_engine = DQEngine(WorkspaceClient())

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)

Expand All @@ -217,13 +221,15 @@ valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)

### Quality rules defined as code

**Method 1: using DQX classes**
#### Method 1: Using DQX classes

```python
from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQEngine, DQRuleColSet, DQRule
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRuleColSet, DQRule
from databricks.sdk import WorkspaceClient


dq_engine = DQEngine(WorkspaceClient())

checks = DQRuleColSet( # define rule for multiple columns at once
Expand All @@ -239,16 +245,18 @@ checks = DQRuleColSet( # define rule for multiple columns at once
check=value_is_in_list('col4', ['1', '2']))
]

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
```

See details of the check functions [here](/docs/reference#quality-rules--functions).
See details of the check functions [here](/docs/reference#quality-rules).

**Method 2: using yaml config**
#### Method 2: Using yaml config

```python
import yaml
Expand Down Expand Up @@ -282,22 +290,24 @@ checks = yaml.safe_load("""
- 2
""")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
```

See details of the check functions [here](/docs/reference/#quality-rules--functions).
See details of the check functions [here](/docs/reference#quality-rules).

### Integration with DLT (Delta Live Tables)

DLT provides [expectations](https://docs.databricks.com/en/delta-live-tables/expectations.html) to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail.
The example below demonstrates how to integrate DQX with DLT to provide comprehensive quality information.
The DQX integration does not use expectations with DLT but DQX own methods.
The DQX integration with DLT does not use DLT Expectations but DQX own methods.

**Option 1: apply quality rules and quarantine bad records**
#### Option 1: Apply quality rules and quarantine bad records

```python
import dlt
Expand Down Expand Up @@ -326,7 +336,7 @@ def quarantine():
return dq_engine.get_invalid(df)
```

**Option 2: apply quality rules as additional columns (`_warning` and `_error`)**
#### Option 2: Apply quality rules and report issues as additional columns

```python
import dlt
Expand Down Expand Up @@ -367,6 +377,29 @@ After executing the command:
Note: the dashboards are only using the quarantined data as input as defined during the installation process.
If you change the quarantine table in the run config after the deployment (`quarantine_table` field), you need to update the dashboard queries accordingly.

## Explore Quality Rules and Create Custom Checks
## Quality Rules and Creation of Custom Checks

Discover the full list of available data quality rules and learn how to define your own custom checks in our [Reference](/docs/reference#quality-rules) section.

## Details on DQX Engine and Workspace Client

To perform data quality checking with DQX, you need to create `DQEngine` object.
The engine requires a Databricks workspace client for authentication and interaction with the Databricks workspace.

When running the code on a Databricks workspace (e.g. in a notebook or as a job), the workspace client is automatically authenticated.
For external environments (e.g. CI servers or local machines), you can authenticate using any method supported by the Databricks SDK. Detailed instructions are available in the [default authentication flow](https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html#default-authentication-flow).

If you use Databricks [configuration profiles](https://docs.databricks.com/dev-tools/auth.html#configuration-profiles) or Databricks-specific [environment variables](https://docs.databricks.com/dev-tools/auth.html#environment-variables) for authentication, you only need the following code to create a workspace client:
```python
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine

ws = WorkspaceClient()

# use the workspace client to create the DQX engine
dq_engine = DQEngine(ws)
```

For details on the specific methods available in the engine, visit to the [reference](/docs/reference#dq-engine-methods) section.

Discover the full list of available data quality rules and learn how to define your own custom checks in our [Reference](/docs/reference) section.
Information on testing applications that use `DQEngine` can be found [here](/docs/reference#testing-applications-using-dqx).
Loading