Skip to content

Commit

Permalink
Trigger a reset failed state if invalidating cache fails #4009
Browse files Browse the repository at this point in the history
  • Loading branch information
greenape committed Jun 16, 2022
1 parent ccdb885 commit ac314d8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 54 deletions.
119 changes: 65 additions & 54 deletions flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,64 +792,75 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
log("Resetting state machine.")
current_state, this_thread_is_owner = q_state_machine.reset()
if this_thread_is_owner:
log("Reset state machine.")
con = get_db().engine
try:
log("Getting table reference.")
table_reference_to_this_query = self.get_table()
if table_reference_to_this_query is not self:
log("Invalidating table reference cache.")
table_reference_to_this_query.invalidate_db_cache(
cascade=cascade, drop=drop
) # Remove any Table pointing as this query
except (ValueError, NotImplementedError) as e:
log("Query not stored - no table..")
pass # This cache record isn't actually stored
try:
log = partial(log, table_name=self.fully_qualified_table_name)
except NotImplementedError:
pass # Not a storable by default table
try:
dep_ids = [
rec[0]
for rec in get_db().fetch(
f"SELECT query_id FROM cache.dependencies WHERE depends_on='{self.query_id}'"
)
]
with con.begin():
con.execute(
"DELETE FROM cache.cached WHERE query_id=%s", (self.query_id,)
)
log("Deleted cache record.")
if drop:
con.execute(
"DROP TABLE IF EXISTS {}".format(
self.fully_qualified_table_name
)
log("Reset state machine.")
con = get_db().engine
try:
log("Getting table reference.")
table_reference_to_this_query = self.get_table()
if table_reference_to_this_query is not self:
log("Invalidating table reference cache.")
table_reference_to_this_query.invalidate_db_cache(
cascade=cascade, drop=drop
) # Remove any Table pointing as this query
except (ValueError, NotImplementedError) as e:
log("Query not stored - no table..")
pass # This cache record isn't actually stored
try:
log = partial(log, table_name=self.fully_qualified_table_name)
except NotImplementedError:
pass # Not a storable by default table
try:
dep_ids = [
rec[0]
for rec in get_db().fetch(
f"SELECT query_id FROM cache.dependencies WHERE depends_on='{self.query_id}'"
)
log("Dropped cache table.")

if cascade:
for dep_id in dep_ids:
dep = get_obj_or_stub(get_db(), dep_id)
log(
"Cascading to dependent.",
dependency=dep.fully_qualified_table_name,
]
with con.begin():
con.execute(
"DELETE FROM cache.cached WHERE query_id=%s",
(self.query_id,),
)
dep.invalidate_db_cache()
log("Deleted cache record.")
if drop:
con.execute(
"DROP TABLE IF EXISTS {}".format(
self.fully_qualified_table_name
)
)
log("Dropped cache table.")

if cascade:
for dep_id in dep_ids:
dep = get_obj_or_stub(get_db(), dep_id)
log(
"Cascading to dependent.",
dependency=dep.fully_qualified_table_name,
)
dep.invalidate_db_cache()
else:
log("Not cascading to dependents.")
except NotImplementedError:
logger.info("Table has no standard name.")
# Outside of cache schema table
if schema is not None:
full_name = "{}.{}".format(schema, name)
log("Dropping table outside cache schema.", table_name=full_name)
else:
log("Not cascading to dependents.")
except NotImplementedError:
logger.info("Table has no standard name.")
# Outside of cache schema table
if schema is not None:
full_name = "{}.{}".format(schema, name)
else:
full_name = name
log("Dropping table outside cache schema.", table_name=full_name)
with con.begin():
con.execute("DROP TABLE IF EXISTS {}".format(full_name))
q_state_machine.finish_resetting()
full_name = name
with con.begin():
con.execute("DROP TABLE IF EXISTS {}".format(full_name))
q_state_machine.finish_resetting()
except Exception as exc:
logger.error(
"Query reset failed.",
query_id=self.query_id,
action="invalidate_db_cache",
exception=exc,
)
q_state_machine.raise_error()
raise exc
elif q_state_machine.is_resetting:
log("Query is being reset from elsewhere, waiting for reset to finish.")
while q_state_machine.is_resetting:
Expand Down
19 changes: 19 additions & 0 deletions flowmachine/flowmachine/core/query_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class QueryState(str, Enum):
EXECUTING = ("executing", "is currently running")
ERRORED = ("errored", "finished with an error")
RESETTING = ("resetting", "is being reset")
RESET_FAILED = ("reset_failed", "error during reset")
KNOWN = ("known", "is known, but has not yet been run")

def __new__(cls, name, desc, **kwargs):
Expand Down Expand Up @@ -78,6 +79,7 @@ class QueryStateMachine:
- errored, when a query has been run but failed to succeed
- cancelled, when execution was terminated by the user
- resetting, when a previously run query is being purged from cache
- reset_failed, when an error occurred during reset
When the query is in a queued, executing, or resetting state, methods which need
to use the results of the query should wait. The `wait_until_complete` method
Expand Down Expand Up @@ -139,6 +141,12 @@ def __init__(self, redis_client: StrictRedis, query_id: str, db_id: str):
self.state_machine.on(
QueryEvent.FINISH_RESET, QueryState.RESETTING, QueryState.KNOWN
)
self.state_machine.on(
QueryEvent.ERROR, QueryState.RESETTING, QueryState.RESET_FAILED
)
self.state_machine.on(
QueryEvent.RESET, QueryState.RESET_FAILED, QueryState.RESETTING
)

@property
def current_query_state(self) -> QueryState:
Expand Down Expand Up @@ -206,6 +214,17 @@ def is_errored(self) -> bool:
"""
return self.current_query_state == QueryState.ERRORED

@property
def is_errored_in_reset(self) -> bool:
"""
Returns
-------
bool
True if the query failed to reset with an error
"""
return self.current_query_state == QueryState.RESET_FAILED

@property
def is_resetting(self) -> bool:
"""
Expand Down

0 comments on commit ac314d8

Please sign in to comment.