Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: planet scale working #45

Merged
merged 26 commits into from
Feb 23, 2024
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a08e4b3
WIP Discovery working, need to get tables working, and then refactor
visch Feb 16, 2024
7359da5
Working with access issues fitlered out
visch Feb 17, 2024
248be32
Works with 100k+ tables
visch Feb 17, 2024
1b155d9
Fix decimals
visch Feb 17, 2024
49f125a
Dynamic PlanetScale detection
visch Feb 17, 2024
a1a0580
Readme updated, planet scale working
visch Feb 17, 2024
f06bffd
Swap config name for sqlalchemy options
visch Feb 19, 2024
6c90cfb
Add is_vitess configuration
visch Feb 20, 2024
685a2e9
Merge main's poetry.lock file in
visch Feb 20, 2024
84fc4b3
Make linter happy
visch Feb 21, 2024
8c2d124
Fix config validation, and messup with connect function call
visch Feb 21, 2024
8809b65
Passes all tests, squashed some bugs!
visch Feb 21, 2024
7644319
make mypy happy
visch Feb 21, 2024
0b07d39
PlanetScale tap pointer
visch Feb 21, 2024
c84d384
fix merge conflicts
visch Feb 22, 2024
f7520c9
Fix sqlalchemy_options documentation
visch Feb 22, 2024
6c0e974
Fix vitess config check
visch Feb 22, 2024
fa8ac91
Update README.md
visch Feb 22, 2024
7e2a11b
Update README.md
visch Feb 22, 2024
3ac1ceb
Apply Edgars suggestions
visch Feb 22, 2024
d986bba
Merge branch 'planet_scale' of github.com:MeltanoLabs/tap-mysql into …
visch Feb 22, 2024
164b94f
Match tap.py config docs with README
visch Feb 22, 2024
6c40462
A bit cleaner is_vitess check
visch Feb 22, 2024
1b34310
config key doesn't get set when value is None. I didn't expect that!
visch Feb 22, 2024
563e229
Merge branch 'main' into planet_scale
edgarrmondragon Feb 23, 2024
76b0b11
Merge branch 'main' into planet_scale
visch Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Dynamic PlanetScale detection
visch committed Feb 17, 2024
commit 49f125a86c082c58639462f5b6513d5a87919ef4
24 changes: 23 additions & 1 deletion tap_mysql/client.py
Original file line number Diff line number Diff line change
@@ -43,6 +43,21 @@ def patched_conform(
class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def __init__(
self,
is_vitess: bool = False,
*args,
**kwargs
) -> None:
"""Initialize the SQL connector.

Args:
is_vitess: Is this a vitess instance?
sqlalchemy_url: Optional URL for the connection.
"""
self.is_vitess = is_vitess
super().__init__(*args, **kwargs)

def create_engine(self) -> Engine:
"""Creates and returns a new engine. Do not call outside of _engine.

@@ -218,6 +233,9 @@ def discover_catalog_entry(
Returns:
`CatalogEntry` object for the given table or a view
"""
if self.is_vitess is False and is_view is False:
return super().discover_catalog_entry(engine, inspected, schema_name, table_name, is_view)
# For vitess views, we can't use DESCRIBE as it's not supported for views so we do the below
# Initialize unique stream name
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
@@ -337,6 +355,9 @@ def get_table_columns(
Returns:
An ordered list of column objects.
"""
if self.is_vitess:
return self.get_table_columns_vitess(full_table_name, column_names)
# If Vitess Instance then we can't use DESCRIBE as it's not supported for views so we do below
if full_table_name not in self._table_cols_cache:
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
with self._connect() as conn:
@@ -405,6 +426,7 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
query = query.filter(replication_key_col >= start_val)

with self.connector._connect() as conn:
conn.exec_driver_sql("set workload=olap")
if self.connector.is_vitess:
conn.exec_driver_sql("set workload=olap")
for row in conn.execute(query):
yield dict(row)
7 changes: 7 additions & 0 deletions tap_mysql/tap.py
Original file line number Diff line number Diff line change
@@ -301,6 +301,13 @@ def discover_streams(self) -> list[Stream]:
Returns:
List of discovered Stream objects.
"""
with self.connector._connect() as conn:
# Check if we are using PlanetScale, if so we need to let our connector know
# Ideally we'd just check to see if we're on Vitess, but I don't know how to do that quickly
output = conn.execute("select variable_value from performance_schema.global_variables where variable_name='version_comment' and variable_value like 'PlanetScale%'")
if len(output.fetchall()) > 0:
self.logger.info("Instance has been detected to be a Vitess instance. Using Vitess configuration.")
self.connector.is_vitess = True
return [
MySQLStream(self, catalog_entry, connector=self.connector)
for catalog_entry in self.catalog_dict["streams"]