Skip to content

Commit

Permalink
feat: add table/column lineage models (#21)
Browse files Browse the repository at this point in the history
add table/column lineage models
  • Loading branch information
xuan616 authored Jan 6, 2023
1 parent 7cbd052 commit 8d9bc1d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 10 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ The schema in ORM models follows the logic of [databuilder models](https://githu
Amundsenrds will be used in [databuilder](https://github.com/amundsen-io/amundsendatabuilder) and [metadatalibrary](https://github.com/amundsen-io/amundsenmetadatalibrary) for metadata storage and retrieval with relational databases.

## Requirements
- Python >= 3.6.x
- MySQL >= 5.7

- Python: >= 3.6
- MySQL: 5.7, 8

**Note**: amundsen-rds(version >= 0.0.8) comes with SQLAlchemy ORM features supported only in MySQL 8 in the correlated amundsen [metadata-service](https://github.com/amundsen-io/amundsen/tree/main/metadata).
## Instructions to configure venv
- In the terminal window, change directory to [amundsen-rds](https://github.com/amundsen-io/amundsenrds]).
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""add table/column lineage
Revision ID: aa30b4276b9b
Revises: a539c998cc1e
Create Date: 2023-01-03 14:48:38.280768
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql

# revision identifiers, used by Alembic.
revision = 'aa30b4276b9b'
down_revision = 'a539c998cc1e'
branch_labels = None
depends_on = None


def upgrade() -> None:
# add table/column lineage tables
op.create_table('table_lineage',
sa.Column('table_source_rk', sa.String(length=1024, collation='latin1_general_cs'), nullable=False),
sa.Column('table_target_rk', sa.String(length=1024, collation='latin1_general_cs'), nullable=False),
sa.Column('published_tag', sa.String(length=128), nullable=False),
sa.Column('publisher_last_updated_epoch_ms', sa.BigInteger(), nullable=False),
sa.ForeignKeyConstraint(['table_source_rk'], ['table_metadata.rk'], ondelete='cascade'),
sa.ForeignKeyConstraint(['table_target_rk'], ['table_metadata.rk'], ondelete='cascade'),
sa.PrimaryKeyConstraint('table_source_rk', 'table_target_rk')
)
op.create_table('column_lineage',
sa.Column('column_source_rk', sa.String(length=1024, collation='latin1_general_cs'), nullable=False),
sa.Column('column_target_rk', sa.String(length=1024, collation='latin1_general_cs'), nullable=False),
sa.Column('published_tag', sa.String(length=128), nullable=False),
sa.Column('publisher_last_updated_epoch_ms', sa.BigInteger(), nullable=False),
sa.ForeignKeyConstraint(['column_source_rk'], ['column_metadata.rk'], ondelete='cascade'),
sa.ForeignKeyConstraint(['column_target_rk'], ['column_metadata.rk'], ondelete='cascade'),
sa.PrimaryKeyConstraint('column_source_rk', 'column_target_rk')
)


def downgrade() -> None:
# drop table/lineage tables
op.drop_table('column_lineage')
op.drop_table('table_lineage')
12 changes: 6 additions & 6 deletions amundsen_rds/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from amundsen_rds.models.badge import Badge
from amundsen_rds.models.cluster import Cluster
from amundsen_rds.models.column import (
ColumnBadge, ColumnDescription, ColumnStat, TableColumn
ColumnBadge, ColumnDescription, ColumnLineage, ColumnStat, TableColumn
)
from amundsen_rds.models.dashboard import (
Dashboard, DashboardBadge, DashboardChart, DashboardCluster,
Expand All @@ -20,22 +20,22 @@
Schema, SchemaDescription, SchemaProgrammaticDescription
)
from amundsen_rds.models.table import (
Table, TableBadge, TableDescription, TableFollower, TableOwner,
TableProgrammaticDescription, TableSource, TableTag, TableTimestamp,
TableUsage, TableWatermark
Table, TableBadge, TableDescription, TableFollower, TableLineage,
TableOwner, TableProgrammaticDescription, TableSource, TableTag,
TableTimestamp, TableUsage, TableWatermark
)
from amundsen_rds.models.tag import Tag
from amundsen_rds.models.updated_timestamp import UpdatedTimestamp
from amundsen_rds.models.user import User

RDSModel = Union[Application, ApplicationTable, Badge, Cluster,
TableColumn, ColumnBadge, ColumnDescription, ColumnStat,
TableColumn, ColumnBadge, ColumnDescription, ColumnLineage, ColumnStat,
Dashboard, DashboardBadge, DashboardChart, DashboardCluster,
DashboardDescription, DashboardExecution, DashboardFollower, DashboardGroup,
DashboardGroupDescription, DashboardOwner, DashboardQuery, DashboardTable,
DashboardTag, DashboardTimestamp, DashboardUsage, Database,
Schema, SchemaDescription, SchemaProgrammaticDescription, Table,
TableBadge, TableDescription, TableFollower, TableOwner,
TableBadge, TableDescription, TableFollower, TableLineage, TableOwner,
TableProgrammaticDescription, TableSource, TableTag, TableTimestamp,
TableUsage, TableWatermark, Tag, UpdatedTimestamp,
User]
Expand Down
16 changes: 16 additions & 0 deletions amundsen_rds/models/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,19 @@ class ColumnStat(Base):
nullable=False)
published_tag = Column(String(PUBLISHED_TAG_LEN), nullable=False)
publisher_last_updated_epoch_ms = Column(BigInteger, nullable=False)


class ColumnLineage(Base):
"""
Column lineage model.
"""
__tablename__ = 'column_lineage'

column_source_rk = Column(String(KEY_LEN, **INDEX_KEY_COLLATION_ARGS),
ForeignKey('column_metadata.rk', ondelete='cascade'),
primary_key=True)
column_target_rk = Column(String(KEY_LEN, **INDEX_KEY_COLLATION_ARGS),
ForeignKey('column_metadata.rk', ondelete='cascade'),
primary_key=True)
published_tag = Column(String(PUBLISHED_TAG_LEN), nullable=False)
publisher_last_updated_epoch_ms = Column(BigInteger, nullable=False)
16 changes: 16 additions & 0 deletions amundsen_rds/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,19 @@ class TableWatermark(Base):
nullable=False)
published_tag = Column(String(PUBLISHED_TAG_LEN), nullable=False)
publisher_last_updated_epoch_ms = Column(BigInteger, nullable=False)


class TableLineage(Base):
"""
Table lineage model.
"""
__tablename__ = 'table_lineage'

table_source_rk = Column(String(KEY_LEN, **INDEX_KEY_COLLATION_ARGS),
ForeignKey('table_metadata.rk', ondelete='cascade'),
primary_key=True)
table_target_rk = Column(String(KEY_LEN, **INDEX_KEY_COLLATION_ARGS),
ForeignKey('table_metadata.rk', ondelete='cascade'),
primary_key=True)
published_tag = Column(String(PUBLISHED_TAG_LEN), nullable=False)
publisher_last_updated_epoch_ms = Column(BigInteger, nullable=False)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from setuptools import find_packages, setup

__version__ = '0.0.7'
__version__ = '0.0.8'


requirements = [
Expand Down

0 comments on commit 8d9bc1d

Please sign in to comment.