From e3213f74c377e3889e06b924a96f52cba7212ac6 Mon Sep 17 00:00:00 2001 From: Matt Brown Date: Thu, 1 Jun 2023 14:02:04 -0400 Subject: [PATCH] Add flags to track replica queries and synthetic requests --- src/Index.hack | 7 ++++++- src/Query/DeleteQuery.php | 34 ++++++++++++++++++++++++++++------ src/Query/FromClause.php | 28 +++++++++++++++++++--------- src/Query/InsertQuery.php | 14 ++++++++++++-- src/Query/JoinProcessor.php | 35 +++++++++++++++++++++++++++++++++-- src/Query/Query.php | 26 +++++++++++++++++++++----- src/Query/SelectQuery.php | 8 +++++--- src/Query/UpdateQuery.php | 32 +++++++++++++++++++++++++------- src/QueryContext.php | 31 +++++++++++++++++++++++++++++++ src/Server.php | 18 +++++++++++++++++- src/Types.php | 2 +- tests/SharedSetup.php | 7 +++++++ 12 files changed, 205 insertions(+), 37 deletions(-) diff --git a/src/Index.hack b/src/Index.hack index 9a8ffc1..9a150b2 100644 --- a/src/Index.hack +++ b/src/Index.hack @@ -1,5 +1,10 @@ namespace Slack\SQLFake; final class Index { - public function __construct(public string $name, public string $type, public keyset $fields) {} + public function __construct( + public string $name, + public string $type, + public keyset $fields, + public bool $vitess_sharding_key = false, + ) {} } diff --git a/src/Query/DeleteQuery.php b/src/Query/DeleteQuery.php index 8af1686..e782b62 100644 --- a/src/Query/DeleteQuery.php +++ b/src/Query/DeleteQuery.php @@ -12,7 +12,7 @@ public function __construct(public string $sql) {} public function execute(AsyncMysqlConnection $conn): int { $this->fromClause as nonnull; list($database, $table_name) = Query::parseTableName($conn, $this->fromClause['name']); - $data = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[]); + $table_data = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[], keyset[]); $schema = QueryContext::getSchema($database, $table_name); Metrics::trackQuery(QueryType::DELETE, $conn->getServer()->name, $table_name, $this->sql); @@ -26,10 +26,26 @@ public function execute(AsyncMysqlConnection $conn): int { } } - return $this->applyWhere($conn, $data[0], $data[1], $columns, $schema?->indexes) + return $this->applyWhere( + $conn, + $table_data[0], + $table_data[1], + $table_data[2] ?? keyset[], + $columns, + $schema?->indexes, + ) |> $this->applyOrderBy($conn, $$) |> $this->applyLimit($$) - |> $this->applyDelete($conn, $database, $table_name, $$, $data[0], $data[1], $schema); + |> $this->applyDelete( + $conn, + $database, + $table_name, + $$, + $table_data[0], + $table_data[1], + $table_data[2] ?? keyset[], + $schema, + ); } /** @@ -42,11 +58,14 @@ protected function applyDelete( dataset $filtered_rows, dataset $original_table, index_refs $index_refs, + keyset $dirty_pks, ?TableSchema $table_schema, ): int { $rows_to_delete = Keyset\keys($filtered_rows); - $remaining_rows = - Dict\filter_with_key($original_table, ($row_num, $_) ==> !C\contains_key($rows_to_delete, $row_num)); + $remaining_rows = Dict\filter_with_key( + $original_table, + ($row_num, $_) ==> !C\contains_key($rows_to_delete, $row_num), + ); $rows_affected = C\count($original_table) - C\count($remaining_rows); if ($table_schema is nonnull) { @@ -57,6 +76,7 @@ protected function applyDelete( $table_schema->vitess_sharding->keyspace, 'INDEX', keyset[$table_schema->vitess_sharding->sharding_key], + true, ); } @@ -70,11 +90,13 @@ protected function applyDelete( $index_refs[$index_name] = $specific_index_refs; } } + + unset($dirty_pks[$row_id]); } } // write it back to the database - $conn->getServer()->saveTable($database, $table_name, $remaining_rows, $index_refs); + $conn->getServer()->saveTable($database, $table_name, $remaining_rows, $index_refs, $dirty_pks); return $rows_affected; } } diff --git a/src/Query/FromClause.php b/src/Query/FromClause.php index 5cb3976..4ad62f2 100644 --- a/src/Query/FromClause.php +++ b/src/Query/FromClause.php @@ -37,17 +37,20 @@ public function aliasRecentExpression(string $name): void { public function process( AsyncMysqlConnection $conn, string $sql, - ): (dataset, index_refs, vec, dict) { + ): (dataset, index_refs, keyset, vec, dict) { $data = dict[]; $is_first_table = true; $index_refs = dict[]; + $dirty_pks = keyset[]; + $indexes = vec[]; $columns = dict[]; foreach ($this->tables as $table) { $schema = null; $new_index_refs = dict[]; + $new_dirty_pks = keyset[]; $new_indexes = vec[]; if (Shapes::keyExists($table, 'subquery')) { @@ -63,12 +66,17 @@ public function process( $name = $table['alias'] ?? $table_name; $schema = QueryContext::getSchema($database, $table_name); if ($schema === null && QueryContext::$strictSchemaMode) { - throw - new SQLFakeRuntimeException("Table $table_name not found in schema and strict mode is enabled"); + throw new SQLFakeRuntimeException( + "Table $table_name not found in schema and strict mode is enabled", + ); } - list($res, $new_index_refs) = - $conn->getServer()->getTableData($database, $table_name) ?: tuple(dict[], dict[]); + $table_data = $conn->getServer()->getTableData($database, $table_name) ?: + tuple(dict[], dict[], keyset[]); + + $res = $table_data[0]; + $new_index_refs = $table_data[1]; + $new_dirty_pks = $table_data[2] ?? keyset[]; if (C\count($this->tables) > 1) { $new_index_refs = Dict\map_keys($new_index_refs, $k ==> $name.'.'.$k); @@ -94,6 +102,7 @@ public function process( $prefix.$schema->vitess_sharding->keyspace, 'INDEX', keyset[$prefix.$schema->vitess_sharding->sharding_key], + true, ); } @@ -111,6 +120,7 @@ public function process( } $index_refs = Dict\merge($index_refs, $new_index_refs); + $dirty_pks = Keyset\union($dirty_pks, $new_dirty_pks); } $new_dataset = dict[]; @@ -156,10 +166,10 @@ public function process( if ($data || !$is_first_table) { // do the join here. based on join type, pass in $data and $res to filter. and aliases - list($data, $index_refs) = JoinProcessor::process( + list($data, $index_refs, $dirty_pks) = JoinProcessor::process( $conn, - tuple($data, $index_refs), - tuple($new_dataset, $new_index_refs), + tuple($data, $index_refs, keyset[]), + tuple($new_dataset, $new_index_refs, $new_dirty_pks), $name, $table['join_type'], $table['join_operator'] ?? null, @@ -180,6 +190,6 @@ public function process( } } - return tuple($data, $index_refs, $indexes, $columns); + return tuple($data, $index_refs, $dirty_pks, $indexes, $columns); } } diff --git a/src/Query/InsertQuery.php b/src/Query/InsertQuery.php index d882af3..5b53e19 100644 --- a/src/Query/InsertQuery.php +++ b/src/Query/InsertQuery.php @@ -18,7 +18,11 @@ public function __construct(public string $table, public string $sql, public boo */ public function execute(AsyncMysqlConnection $conn): int { list($database, $table_name) = Query::parseTableName($conn, $this->table); - list($table, $index_refs) = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[]); + $data = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[], keyset[]); + + $table = $data[0]; + $index_refs = $data[1]; + $dirty_pks = $data[2] ?? keyset[]; Metrics::trackQuery(QueryType::INSERT, $conn->getServer()->name, $table_name, $this->sql); @@ -65,6 +69,7 @@ public function execute(AsyncMysqlConnection $conn): int { $table_schema->vitess_sharding->keyspace, 'INDEX', keyset[$table_schema->vitess_sharding->sharding_key], + true, ); } @@ -113,6 +118,7 @@ public function execute(AsyncMysqlConnection $conn): int { dict[$row_id => $existing_row], $table, $index_refs, + $dirty_pks, $this->updateExpressions, $table_schema, $row, @@ -135,12 +141,16 @@ public function execute(AsyncMysqlConnection $conn): int { $index_refs[$index_name] = $specific_index_refs; } + if (QueryContext::$inRequest) { + $dirty_pks[] = $primary_key; + } + $table[$primary_key] = $row; $rows_affected++; } // write it back to the database - $conn->getServer()->saveTable($database, $table_name, $table, $index_refs); + $conn->getServer()->saveTable($database, $table_name, $table, $index_refs, $dirty_pks); return $rows_affected; } } diff --git a/src/Query/JoinProcessor.php b/src/Query/JoinProcessor.php index e15c92f..032788e 100644 --- a/src/Query/JoinProcessor.php +++ b/src/Query/JoinProcessor.php @@ -62,6 +62,8 @@ public static function process( $left_mappings = dict[]; $right_mappings = dict[]; + $dirty_pks = keyset[]; + switch ($join_type) { case JoinType::JOIN: case JoinType::STRAIGHT: @@ -77,6 +79,9 @@ public static function process( $left_mappings[$left_row_id][] = $insert_id; $right_mappings[$right_row_id] ??= keyset[]; $right_mappings[$right_row_id][] = $insert_id; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) { + $dirty_pks[] = $insert_id; + } } } } @@ -104,6 +109,9 @@ public static function process( $right_mappings[$right_row_id] ??= keyset[]; $right_mappings[$right_row_id][] = $insert_id; $any_match = true; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) { + $dirty_pks[] = $insert_id; + } } } @@ -120,6 +128,10 @@ public static function process( $insert_id = C\count($out) - 1; $left_mappings[$left_row_id] ??= keyset[]; $left_mappings[$left_row_id][] = $insert_id; + + if (isset($left_dataset[2][$left_row_id])) { + $dirty_pks[] = $insert_id; + } } } break; @@ -146,6 +158,9 @@ public static function process( $left_mappings[$left_row_id][] = $insert_id; $right_mappings[$right_row_id] ??= keyset[]; $right_mappings[$right_row_id][] = $insert_id; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) { + $dirty_pks[] = $insert_id; + } } } @@ -164,6 +179,9 @@ public static function process( $left_mappings[$left_row_id][] = $insert_id; $right_mappings[$right_row_id] ??= keyset[]; $right_mappings[$right_row_id][] = $insert_id; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) { + $dirty_pks[] = $insert_id; + } } } break; @@ -183,6 +201,9 @@ public static function process( $left_mappings[$left_row_id][] = $insert_id; $right_mappings[$right_row_id] ??= keyset[]; $right_mappings[$right_row_id][] = $insert_id; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) { + $dirty_pks[] = $insert_id; + } } } } @@ -198,7 +219,7 @@ public static function process( $right_indexes, ); - return tuple(dict($out), $index_refs); + return tuple(dict($out), $index_refs, $dirty_pks); } /** @@ -308,6 +329,7 @@ private static function processHashJoin( $left_mappings = dict[]; $right_mappings = dict[]; + $dirty_pks = keyset[]; switch ($join_type) { case JoinType::JOIN: @@ -322,6 +344,9 @@ private static function processHashJoin( $left_mappings[$left_row_id][] = $insert_id; $right_mappings[$k] ??= keyset[]; $right_mappings[$k][] = $insert_id; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$k])) { + $dirty_pks[] = $insert_id; + } } } break; @@ -348,6 +373,9 @@ private static function processHashJoin( $right_mappings[$k] ??= keyset[]; $right_mappings[$k][] = $insert_id; $any_match = true; + if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$k])) { + $dirty_pks[] = $insert_id; + } } } @@ -364,6 +392,9 @@ private static function processHashJoin( $insert_id = C\count($out) - 1; $left_mappings[$left_row_id] ??= keyset[]; $left_mappings[$left_row_id][] = $insert_id; + if (isset($left_dataset[2][$left_row_id])) { + $dirty_pks[] = $insert_id; + } } } break; @@ -380,7 +411,7 @@ private static function processHashJoin( $right_indexes, ); - return tuple(dict($out), $index_refs); + return tuple(dict($out), $index_refs, $dirty_pks); } private static function getIndexRefsFromMappings( diff --git a/src/Query/Query.php b/src/Query/Query.php index 5192d56..a7852a8 100644 --- a/src/Query/Query.php +++ b/src/Query/Query.php @@ -27,6 +27,7 @@ protected function applyWhere( AsyncMysqlConnection $conn, dataset $data, index_refs $index_refs, + keyset $dirty_pks, ?dict $columns, ?vec $indexes, ): dataset { @@ -36,16 +37,25 @@ protected function applyWhere( return $data; } + $all_matched = false; + if ($columns is nonnull && $indexes) { - $all_matched = false; $data = QueryPlanner::filterWithIndexes($data, $index_refs, $columns, $indexes, $where, inout $all_matched); + } + + if (!$all_matched) { + $data = Dict\filter($data, $row ==> (bool)$where->evaluate($row, $conn)); + } - if ($all_matched) { - return $data; + if (QueryContext::$useReplica && QueryContext::$inRequest && QueryContext::$preventReplicaReadsAfterWrites) { + $intersection = Keyset\intersect(Keyset\keys($data), $dirty_pks); + + if ($intersection !== keyset[]) { + throw new \Exception('Replica read after write: '.(QueryContext::$query ?? '')); } } - return Dict\filter($data, $row ==> (bool)$where->evaluate($row, $conn)); + return $data; } /** @@ -177,6 +187,7 @@ protected function applySet( dataset $filtered_rows, dataset $original_table, index_refs $index_refs, + keyset $dirty_pks, vec $set_clause, ?TableSchema $table_schema, /* for dupe inserts only */ @@ -229,6 +240,7 @@ protected function applySet( $table_schema->vitess_sharding->keyspace, 'INDEX', keyset[$table_schema->vitess_sharding->sharding_key], + true, ); } } @@ -338,6 +350,10 @@ protected function applySet( $index_refs[$index_name] = $specific_index_refs; } + if (QueryContext::$inRequest) { + $dirty_pks[] = $new_row_id; + } + if ($new_row_id !== $row_id) { // Remap keys to preserve insertion order when primary key has changed $original_table = Dict\pull_with_key( @@ -354,7 +370,7 @@ protected function applySet( } // write it back to the database - $conn->getServer()->saveTable($database, $table_name, $original_table, $index_refs); + $conn->getServer()->saveTable($database, $table_name, $original_table, $index_refs, $dirty_pks); return tuple($update_count, $original_table, $index_refs); } diff --git a/src/Query/SelectQuery.php b/src/Query/SelectQuery.php index 4cc609a..961f236 100644 --- a/src/Query/SelectQuery.php +++ b/src/Query/SelectQuery.php @@ -55,7 +55,7 @@ public function execute(AsyncMysqlConnection $conn, ?row $_ = null): dataset { // FROM clause handling - builds a data set including extracting rows from tables, applying joins $this->applyFrom($conn) // WHERE caluse - filter out any rows that don't match it - |> $this->applyWhere($conn, $$[0], $$[1], $$[3], $$[2]) + |> $this->applyWhere($conn, $$[0], $$[1], $$[2], $$[4], $$[3]) // GROUP BY clause - may group the rows if necessary. all clauses after this need to know how to handled both grouped and ungrouped inputs |> $this->applyGroupBy($conn, $$) // HAVING clause, filter out any rows not matching it @@ -76,12 +76,14 @@ public function execute(AsyncMysqlConnection $conn, ?row $_ = null): dataset { * The FROM clause of the query gets processed first, retrieving data from tables, executing subqueries, and handling joins * This is also where we build up the $columns list which is commonly used throughout the entire library to map column references to index_refs in this dataset */ - protected function applyFrom(AsyncMysqlConnection $conn): (dataset, index_refs, vec, dict) { + protected function applyFrom( + AsyncMysqlConnection $conn, + ): (dataset, index_refs, keyset, vec, dict) { $from = $this->fromClause; if ($from === null) { // we put one empty row when there is no FROM so that queries like "SELECT 1" will return a row - return tuple(dict[0 => dict[]], dict[], vec[], dict[]); + return tuple(dict[0 => dict[]], dict[], keyset[], vec[], dict[]); } return $from->process($conn, $this->sql); diff --git a/src/Query/UpdateQuery.php b/src/Query/UpdateQuery.php index 48927ff..220f074 100644 --- a/src/Query/UpdateQuery.php +++ b/src/Query/UpdateQuery.php @@ -9,7 +9,7 @@ public function __construct(public from_table $updateClause, public string $sql, public vec $setClause = vec[]; public function execute(AsyncMysqlConnection $conn): int { - list($tableName, $database, $data, $index_refs) = $this->processUpdateClause($conn); + list($tableName, $database, $data, $index_refs, $dirty_pks) = $this->processUpdateClause($conn); Metrics::trackQuery(QueryType::UPDATE, $conn->getServer()->name, $tableName, $this->sql); $schema = QueryContext::getSchema($database, $tableName); @@ -22,10 +22,27 @@ public function execute(AsyncMysqlConnection $conn): int { } } - list($rows_affected, $_, $_) = $this->applyWhere($conn, $data, $index_refs, $columns, $schema?->indexes) + list($rows_affected, $_, $_) = $this->applyWhere( + $conn, + $data, + $index_refs, + $dirty_pks, + $columns, + $schema?->indexes, + ) |> $this->applyOrderBy($conn, $$) |> $this->applyLimit($$) - |> $this->applySet($conn, $database, $tableName, $$, $data, $index_refs, $this->setClause, $schema); + |> $this->applySet( + $conn, + $database, + $tableName, + $$, + $data, + $index_refs, + $dirty_pks, + $this->setClause, + $schema, + ); return $rows_affected; } @@ -34,10 +51,11 @@ public function execute(AsyncMysqlConnection $conn): int { * process the UPDATE clause to retrieve the table * add a row identifier to each element in the result which we can later use to update the underlying table */ - protected function processUpdateClause(AsyncMysqlConnection $conn): (string, string, dataset, index_refs) { + protected function processUpdateClause( + AsyncMysqlConnection $conn, + ): (string, string, dataset, index_refs, keyset) { list($database, $tableName) = Query::parseTableName($conn, $this->updateClause['name']); - list($table_data, $index_refs) = - $conn->getServer()->getTableData($database, $tableName) ?? tuple(dict[], dict[]); - return tuple($tableName, $database, $table_data, $index_refs); + $data = $conn->getServer()->getTableData($database, $tableName) ?? tuple(dict[], dict[], keyset[]); + return tuple($tableName, $database, $data[0], $data[1], $data[2] ?? keyset[]); } } diff --git a/src/QueryContext.php b/src/QueryContext.php index 6e0e030..ac26e5d 100644 --- a/src/QueryContext.php +++ b/src/QueryContext.php @@ -29,6 +29,22 @@ */ public static bool $requireIndexes = false; + /** + * Require the presence of the table's Vitess sharding key + */ + public static bool $requireVitessShardingKey = false; + + /** + * Whether or not to use a replica + */ + public static bool $useReplica = false; + + /** + * Whether or not we're currently simulating a web request. + * This controls replica behaviour. + */ + public static bool $inRequest = false; + /** * 1: quiet, print nothing * 2: verbose, print every query as it executes @@ -42,6 +58,12 @@ */ public static bool $skipVitessValidation = false; + /** + * If true hack-sql-fake will attempt to detect references to updated rows + * after they've been written in the same synthetic request. + */ + public static bool $preventReplicaReadsAfterWrites = false; + public static ?string $query = null; /** @@ -57,4 +79,13 @@ public static function getSchema(string $database, string $table): ?TableSchema { return self::$schema[$database][$table] ?? null; } + + public static function startRequest(): void { + self::$inRequest = true; + } + + public static function endRequest(): void { + self::$inRequest = false; + Server::cleanDirtyTables(); + } } diff --git a/src/Server.php b/src/Server.php index 80a7731..70208ac 100644 --- a/src/Server.php +++ b/src/Server.php @@ -121,6 +121,7 @@ public function saveTable( string $name, dataset $rows, index_refs $index_refs, + keyset $dirty_pks, ): void { // create table if not exists if (!C\contains_key($this->databases, $dbname)) { @@ -128,6 +129,21 @@ public function saveTable( } // save rows - $this->databases[$dbname][$name] = tuple($rows, $index_refs); + $this->databases[$dbname][$name] = tuple($rows, $index_refs, $dirty_pks); + } + + /** + * Remove records of primary keys that have been updated in this simulated request + */ + public static function cleanDirtyTables(): void { + foreach (self::$instances as $instance) { + foreach ($instance->databases as $database_name => $database) { + foreach ($database as $table_name => $table_data) { + if (isset($table_data[2])) { + $instance->databases[$database_name][$table_name][2] = keyset[]; + } + } + } + } } } diff --git a/src/Types.php b/src/Types.php index 1d233c7..43e5266 100644 --- a/src/Types.php +++ b/src/Types.php @@ -11,7 +11,7 @@ // vec of rows can be a stored table, a query result set, or an intermediate state for either of those type dataset = dict; type index_refs = dict>; -type table_data = (dataset, index_refs); +type table_data = (dataset, index_refs, keyset); // a database is a collection of named tables type database = dict; diff --git a/tests/SharedSetup.php b/tests/SharedSetup.php index f4cd519..a52c10f 100644 --- a/tests/SharedSetup.php +++ b/tests/SharedSetup.php @@ -31,6 +31,7 @@ final class SharedSetup { 6 => keyset[4, 6], ], ], + keyset[], ); $table4_data = tuple( @@ -47,6 +48,7 @@ final class SharedSetup { 7 => keyset[1003, 1004], ], ], + keyset[], ); $table5_data = tuple( @@ -64,6 +66,7 @@ final class SharedSetup { 0x2 => keyset[1003], ], ], + keyset[], ); $association_table_data = tuple( @@ -114,6 +117,7 @@ final class SharedSetup { 1003 => keyset[3], ], ], + keyset[], ); $table6_data = tuple( @@ -125,6 +129,7 @@ final class SharedSetup { 1004 => dict['id' => 1004, 'position' => '25'], ], dict[], + keyset[], ); // populate database state @@ -158,6 +163,7 @@ final class SharedSetup { 4 => dict['id' => 4, 'name' => 'Benjamin Ampersand'], ], dict[], + keyset[], ), 'vt_table2' => tuple( dict[ @@ -167,6 +173,7 @@ final class SharedSetup { 14 => dict['id' => 14, 'vt_table1_id' => 4, 'description' => 'no'], ], dict[], + keyset[], ), ];