diff --git a/Spanner/src/Database.php b/Spanner/src/Database.php index edeee0ae53e1..a02ad71220e0 100644 --- a/Spanner/src/Database.php +++ b/Spanner/src/Database.php @@ -46,17 +46,23 @@ use Google\Cloud\Spanner\Admin\Database\V1\RestoreDatabaseRequest; use Google\Cloud\Spanner\Admin\Database\V1\UpdateDatabaseDdlRequest; use Google\Cloud\Spanner\Admin\Database\V1\UpdateDatabaseRequest; +use Google\Cloud\Spanner\Operation; use Google\Cloud\Spanner\Session\Session; use Google\Cloud\Spanner\Session\SessionPoolInterface; use Google\Cloud\Spanner\Transaction; use Google\Cloud\Spanner\V1\BatchCreateSessionsRequest; +use Google\Cloud\Spanner\V1\BatchWriteRequest; use Google\Cloud\Spanner\V1\Client\SpannerClient as GapicSpannerClient; use Google\Cloud\Spanner\V1\DeleteSessionRequest; -use Google\Cloud\Spanner\V1\SpannerClient; +use Google\Cloud\Spanner\V1\Mutation; +use Google\Cloud\Spanner\V1\Mutation\Delete; +use Google\Cloud\Spanner\V1\Mutation\Write; use Google\Cloud\Spanner\V1\BatchWriteResponse; -use Google\Cloud\Spanner\V1\SpannerClient as GapicSpannerClient; use Google\Cloud\Spanner\V1\TypeCode; use Google\Protobuf\Duration; +use Google\Protobuf\ListValue; +use Google\Protobuf\Struct; +use Google\Protobuf\Value; use Google\Rpc\Code; use GuzzleHttp\Promise\PromiseInterface; @@ -226,6 +232,17 @@ class Database */ private $returnInt64AsObject; + /** + * @var array + */ + private $mutationSetters = [ + 'insert' => 'setInsert', + 'update' => 'setUpdate', + 'insertOrUpdate' => 'setInsertOrUpdate', + 'replace' => 'setReplace', + 'delete' => 'setDelete' + ]; + /** * Create an object representing a Database. * @@ -1886,13 +1903,27 @@ public function batchWrite(array $mutationGroups, array $options = []) ); $mutationGroups = array_map(fn ($x) => $x->toArray(), $mutationGroups); + array_walk( + $mutationGroups, + fn(&$x) => $x['mutations'] = $this->parseMutations($x['mutations']) + ); try { - return $this->connection->batchWrite([ - 'database' => $this->name(), + list($data, $optionalArgs) = $this->splitOptionalArgs($options); + $data += [ 'session' => $session->name(), 'mutationGroups' => $mutationGroups - ] + $options); + ]; + + return $this->createAndSendRequest( + GapicSpannerClient::class, + 'batchWrite', + $data, + $optionalArgs, + BatchWriteRequest::class, + $this->name, + $this->routeToLeader + ); } finally { $this->isRunningTransaction = false; $session->setExpiration(); @@ -2594,6 +2625,108 @@ private function databaseIdOnly($name) } } + private function parseMutations($rawMutations) + { + if (!is_array($rawMutations)) { + return []; + } + + $mutations = []; + foreach ($rawMutations as $mutation) { + $type = array_keys($mutation)[0]; + $data = $mutation[$type]; + + switch ($type) { + case Operation::OP_DELETE: + if (isset($data['keySet'])) { + $data['keySet'] = $this->formatKeySet($data['keySet']); + } + + $operation = $this->serializer->decodeMessage( + new Delete, + $data + ); + break; + default: + $operation = new Write; + $operation->setTable($data['table']); + $operation->setColumns($data['columns']); + + $modifiedData = []; + foreach ($data['values'] as $key => $param) { + $modifiedData[$key] = $this->fieldValue($param); + } + + $list = new ListValue; + $list->setValues($modifiedData); + $values = [$list]; + $operation->setValues($values); + + break; + } + + $setterName = $this->mutationSetters[$type]; + $mutation = new Mutation; + $mutation->$setterName($operation); + $mutations[] = $mutation; + } + return $mutations; + } + + /** + * @param mixed $param + * @return Value + */ + private function fieldValue($param) + { + $field = new Value; + $value = $this->formatValueForApi($param); + + $setter = null; + switch (array_keys($value)[0]) { + case 'string_value': + $setter = 'setStringValue'; + break; + case 'number_value': + $setter = 'setNumberValue'; + break; + case 'bool_value': + $setter = 'setBoolValue'; + break; + case 'null_value': + $setter = 'setNullValue'; + break; + case 'struct_value': + $setter = 'setStructValue'; + $modifiedParams = []; + foreach ($param as $key => $value) { + $modifiedParams[$key] = $this->fieldValue($value); + } + $value = new Struct; + $value->setFields($modifiedParams); + + break; + case 'list_value': + $setter = 'setListValue'; + $modifiedParams = []; + foreach ($param as $item) { + $modifiedParams[] = $this->fieldValue($item); + } + $list = new ListValue; + $list->setValues($modifiedParams); + $value = $list; + + break; + } + + $value = is_array($value) ? current($value) : $value; + if ($setter) { + $field->$setter($value); + } + + return $field; + } + /** * Represent the class in a more readable and digestable fashion. * diff --git a/Spanner/src/MutationTrait.php b/Spanner/src/MutationTrait.php index 51ca50c20789..2361de3b5836 100644 --- a/Spanner/src/MutationTrait.php +++ b/Spanner/src/MutationTrait.php @@ -28,8 +28,6 @@ */ trait MutationTrait { - use ArrayTrait; - /** * @var array */ @@ -334,6 +332,6 @@ private function flattenKeySet(KeySet $keySet) $keys['keys'] = $this->getValueMapper()->encodeValuesAsSimpleType($keys['keys'], true); } - return $this->arrayFilterRemoveNull($keys); + return array_filter($keys, fn ($v) => !is_null($v));; } } diff --git a/Spanner/tests/Unit/DatabaseTest.php b/Spanner/tests/Unit/DatabaseTest.php index 37373365037c..35f555d47bd3 100644 --- a/Spanner/tests/Unit/DatabaseTest.php +++ b/Spanner/tests/Unit/DatabaseTest.php @@ -44,14 +44,18 @@ use Google\Cloud\Spanner\Tests\ResultGeneratorTrait; use Google\Cloud\Spanner\Timestamp; use Google\Cloud\Spanner\Transaction; +use Google\Cloud\Spanner\V1\BatchWriteRequest\MutationGroup; use Google\Cloud\Spanner\V1\Client\SpannerClient; use Google\Cloud\Spanner\V1\CommitRequest; use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type; use Google\Cloud\Spanner\V1\ExecuteBatchDmlRequest; use Google\Cloud\Spanner\V1\ExecuteSqlRequest; +use Google\Cloud\Spanner\V1\Mutation; use Google\Cloud\Spanner\V1\ReadRequest; use Google\Cloud\Spanner\V1\TransactionSelector; use Google\Protobuf\Duration; +use Google\Protobuf\ListValue; +use Google\Protobuf\Value; use Google\Rpc\Code; use PHPUnit\Framework\TestCase; use Prophecy\Argument; @@ -848,28 +852,26 @@ public function testSnapshotNestedTransaction() public function testBatchWrite() { - $expectedMutationGroup = ['mutations' => [ - [ - Operation::OP_INSERT_OR_UPDATE => [ - 'table' => 'foo', - 'columns' => ['bar1', 'bar2'], - 'values' => [1, 2] - ] - ] - ]]; - $this->connection->batchWrite(Argument::allOf( - Argument::withEntry( - 'database', - DatabaseAdminClient::databaseName( - self::PROJECT, - self::INSTANCE, - self::DATABASE - ) - ), - Argument::withEntry('session', $this->session->name()), - Argument::withEntry('mutationGroups', [$expectedMutationGroup]) - ))->shouldBeCalled()->willReturn(['foo result']); + $expectedMutationGroup = new MutationGroup(['mutations' => [ + new Mutation(['insert_or_update' => new Mutation\Write([ + 'table' => 'foo', + 'columns' => ['bar1', 'bar2'], + 'values' => [new ListValue(['values' => [ + new Value(['string_value' => '1']), + new Value(['string_value' => '2']), + ]])] + ])]) + ]]); + $this->mockSendRequest( + SpannerClient::class, + 'batchWrite', + function ($request) use ($expectedMutationGroup) { + return $request->getSession() === $this->session->name() + && $request->getMutationGroups()[0] == $expectedMutationGroup; + }, + ['foo result'] + ); $mutationGroups = [ ($this->database->mutationGroup(false)) @@ -879,10 +881,9 @@ public function testBatchWrite() ) ]; - $this->refreshOperation($this->database, $this->connection->reveal()); + $this->refreshOperation($this->database, $this->requestHandler->reveal(), $this->serializer); $result = $this->database->batchWrite($mutationGroups); - $this->assertIsArray($result); } public function testRunTransaction() @@ -1105,15 +1106,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_INSERT]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1140,15 +1141,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_INSERT]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1175,15 +1176,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_UPDATE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_UPDATE]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_UPDATE]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1210,15 +1211,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_UPDATE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_UPDATE]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_UPDATE]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1245,15 +1246,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_INSERT_OR_UPDATE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT_OR_UPDATE]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT_OR_UPDATE]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1280,15 +1281,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_INSERT_OR_UPDATE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT_OR_UPDATE]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_INSERT_OR_UPDATE]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1315,15 +1316,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_REPLACE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_REPLACE]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_REPLACE]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1350,15 +1351,15 @@ function ($arg) use ($table, $row) { if ($arg['mutations'][0][OPERATION::OP_REPLACE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_REPLACE]['columns'][0] !== array_keys($row)[0]) { return false; } - + if ($arg['mutations'][0][OPERATION::OP_REPLACE]['values'][0][0] !== current($row)) { return false; } - + return true; }, $this->commitResponse() @@ -1385,15 +1386,15 @@ function ($arg) use ($table, $keys) { if ($arg['mutations'][0][Operation::OP_DELETE]['table'] !== $table) { return false; } - + if ($arg['mutations'][0][Operation::OP_DELETE]['keySet']['keys'][0][0] !== (string) $keys[0]) { return false; } - + if ($arg['mutations'][0][Operation::OP_DELETE]['keySet']['keys'][1][0] !== $keys[1]) { return false; } - + return true; }, $this->commitResponse()