Skip to content

Commit

Permalink
chore: update dagster version (#2401)
Browse files Browse the repository at this point in the history
* chore: update `dagster` version

* fix: `cast` wide asset types

* fix: format open collective `staging` models appropriately
  • Loading branch information
Jabolol authored Oct 27, 2024
1 parent 205b2bb commit e40bae4
Show file tree
Hide file tree
Showing 7 changed files with 1,718 additions and 1,533 deletions.
3,196 changes: 1,684 additions & 1,512 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ boltons = "^24.0.0"
click = "^8.1.7"
clickhouse-connect = "^0.7.16"
cloud-sql-python-connector = { extras = ["pg8000"], version = "^1.6.0" }
dagster = "1.8.6"
dagster = "^1.8.6"
dagster-dbt = "^0.24.0"
dagster-embedded-elt = "^0.24.0"
dagster-gcp = "^0.24.0"
Expand Down Expand Up @@ -54,14 +54,14 @@ sqlalchemy = "^2.0.25"
textual = "^0.52.1"
redis = "^5.0.7"
githubkit = "^0.11.6"
sqlmesh = {extras = ["trino"], version = "^0.125.0"}
sqlmesh = { extras = ["trino"], version = "^0.125.0" }
dagster-duckdb = "^0.24.0"
dagster-duckdb-polars = "^0.24.0"
google-cloud-bigquery-storage = "^2.25.0"
dagster-sqlmesh = "0.2.0.dev3"
google-auth = "^2.34.0"
pillow = "^10.4.0"
dagster-k8s = "0.24.6"
dagster-k8s = "^0.24.6"


[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ with source as (

renamed as (
select
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
from source
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ with source as (

renamed as (
select
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
from source
)

Expand Down
14 changes: 10 additions & 4 deletions warehouse/oso_dagster/factories/bq2clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from dataclasses import dataclass, field
from typing import Optional, Sequence, Dict, List, Tuple
from typing import Optional, Sequence, Dict, List, Tuple, cast
from dagster import (
asset,
AssetExecutionContext,
MaterializeResult,
)
from dagster_gcp import BigQueryResource, GCSResource
from google.cloud.bigquery import Client as BQClient
from .common import AssetFactoryResponse, AssetDeps
from .common import AssetFactoryResponse, AssetDeps, GenericAsset
from ..resources import ClickhouseResource
from ..utils.bq import BigQueryTableConfig, export_to_gcs
from ..utils.errors import UnsupportedTableColumn
Expand Down Expand Up @@ -182,7 +182,12 @@ def bq2clickhouse_asset(
# Also ensure that the expected destination exists. Even if we
# will delete this keeps the `OVERWRITE` mode logic simple
create_table(
ch_client, destination_table_name, columns, index, order_by, if_not_exists=True
ch_client,
destination_table_name,
columns,
index,
order_by,
if_not_exists=True,
)
context.log.info(f"Ensured destination table {destination_table_name}")
create_table(ch_client, temp_dest, columns, index, if_not_exists=False)
Expand All @@ -208,4 +213,5 @@ def bq2clickhouse_asset(
}
)

return AssetFactoryResponse([bq2clickhouse_asset])
# https://github.com/opensource-observer/oso/issues/2403
return AssetFactoryResponse([cast(GenericAsset, bq2clickhouse_asset)])
7 changes: 4 additions & 3 deletions warehouse/oso_dagster/factories/bq_dts.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from dataclasses import dataclass, field
from typing import Optional, Sequence
from typing import Optional, Sequence, cast
from dagster import (
asset,
AssetExecutionContext,
MaterializeResult,
)
from dagster_gcp import BigQueryResource
from .common import AssetFactoryResponse
from .common import AssetFactoryResponse, GenericAsset
from ..constants import impersonate_service_account
from ..resources import BigQueryDataTransferResource
from ..utils import (
Expand Down Expand Up @@ -88,4 +88,5 @@ def _bq_dts_asset(
}
)

return AssetFactoryResponse([_bq_dts_asset])
# https://github.com/opensource-observer/oso/issues/2403
return AssetFactoryResponse([cast(GenericAsset, _bq_dts_asset)])
16 changes: 11 additions & 5 deletions warehouse/oso_dagster/factories/gcs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import re
from typing import Optional, Sequence, Dict
from typing import Optional, Sequence, Dict, cast
from dataclasses import dataclass, field

import arrow
from google.api_core.exceptions import NotFound
from google.cloud.bigquery.job import CopyJobConfig
from dagster import (
AssetsDefinition,
asset,
asset_sensor,
job,
Expand All @@ -21,7 +22,7 @@
)
from dagster_gcp import BigQueryResource, GCSResource

from .common import AssetFactoryResponse
from .common import AssetFactoryResponse, GenericAsset
from ..utils import (
ensure_dataset,
DatasetOptions,
Expand Down Expand Up @@ -239,13 +240,13 @@ def gcs_clean_up_job():
gcs_clean_up_op()

@asset_sensor(
asset_key=gcs_asset.key,
asset_key=cast(AssetsDefinition, gcs_asset).key,
name=f"{config.name}_clean_up_sensor",
job=gcs_clean_up_job,
default_status=DefaultSensorStatus.STOPPED,
)
def gcs_clean_up_sensor(
context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry
context: SensorEvaluationContext, _gcs: GCSResource, asset_event: EventLogEntry
):
yield RunRequest(
run_key=context.cursor,
Expand All @@ -258,4 +259,9 @@ def gcs_clean_up_sensor(
),
)

return AssetFactoryResponse([gcs_asset], [gcs_clean_up_sensor], [gcs_clean_up_job])
# https://github.com/opensource-observer/oso/issues/2403
return AssetFactoryResponse(
[cast(GenericAsset, gcs_asset)],
[gcs_clean_up_sensor],
[gcs_clean_up_job],
)

0 comments on commit e40bae4

Please sign in to comment.