From 84ca383c04311244f4521e1e8257d1c0382b3c69 Mon Sep 17 00:00:00 2001 From: Ziaratban Date: Thu, 23 Jan 2020 23:41:29 +0330 Subject: [PATCH] yii2 mongodb transaction --- src/ClientSession.php | 86 ++++++++++++++++ src/Collection.php | 98 ++++++++++-------- src/Command.php | 233 ++++++++++++++++-------------------------- src/Connection.php | 198 +++++++++++++++++++++++++++++++++++ src/Database.php | 15 +-- src/Transaction.php | 85 +++++++++++++++ 6 files changed, 522 insertions(+), 193 deletions(-) create mode 100644 src/ClientSession.php create mode 100644 src/Transaction.php diff --git a/src/ClientSession.php b/src/ClientSession.php new file mode 100644 index 000000000..347de8e17 --- /dev/null +++ b/src/ClientSession.php @@ -0,0 +1,86 @@ + + */ +class ClientSession extends \yii\base\BaseObject +{ + + /** + * @var Connection the database connection that this transaction is associated with. + */ + public $db; + + /** + * @var MongoDB\Driver\Session class represents a client session and Commands, + * queries, and write operations may then be associated the session. + * @see https://www.php.net/manual/en/class.mongodb-driver-session.php + */ + public $mongoSession; + + /** + * @var Transaction current transaction in session. this transaction can only be created once. + */ + private $_transaction = null; + + /** + * Start a new session in a connection. + * @param Connection $db + * @param Array $sessionOptions Creates a ClientSession for the given options + * @see https://www.php.net/manual/en/mongodb-driver-manager.startsession.php#refsect1-mongodb-driver-manager.startsession-parameters + * @return ClientSession return new session base on a session options for the given connection + */ + public static function start($db, $sessionOptions = []){ + Connection::prepareExecOptions($sessionOptions); + Yii::trace('Starting mongodb session ...', __METHOD__); + $db->trigger(Connection::EVENT_START_SESSION); + $newSession = new self([ + 'db' => $db, + 'mongoSession' => $db->manager->startSession($sessionOptions), + ]); + Yii::trace('MongoDB session started.', __METHOD__); + return $newSession; + } + + /** + * Get current transaction of session or create a new transaction once + * @return Transaction return current transaction + */ + public function getTransaction(){ + if($this->_transaction === null) + return $this->_transaction = new Transaction(['clientSession' => $this]); + return $this->_transaction; + } + + /** + * current session has a transaction? + * @return bool return true if transaction exists otherwise return false + */ + public function getHasTransaction(){ + return !empty($this->_transaction); + } + + /** + * End current session + */ + public function end(){ + $this->mongoSession->endSession(); + $db->trigger(Connection::EVENT_END_SESSION); + } +} diff --git a/src/Collection.php b/src/Collection.php index c897fd4cb..f6ea975c6 100644 --- a/src/Collection.php +++ b/src/Collection.php @@ -56,23 +56,25 @@ public function getFullName() /** * Drops this collection. + * @param array $execOptions -> goto Command::execute() * @throws Exception on failure. * @return bool whether the operation successful. */ - public function drop() + public function drop($execOptions = []) { - return $this->database->dropCollection($this->name); + return $this->database->dropCollection($this->name, $execOptions); } /** * Returns the list of defined indexes. * @return array list of indexes info. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::execute() * @since 2.1 */ - public function listIndexes($options = []) + public function listIndexes($options = [], $execOptions = []) { - return $this->database->createCommand()->listIndexes($this->name, $options); + return $this->database->createCommand()->listIndexes($this->name, $options, $execOptions); } /** @@ -107,23 +109,25 @@ public function listIndexes($options = []) * * See [[https://docs.mongodb.com/manual/reference/method/db.collection.createIndex/#options-for-all-index-types]] * for the full list of options. + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. * @since 2.1 */ - public function createIndexes($indexes) + public function createIndexes($indexes, $execOptions = []) { - return $this->database->createCommand()->createIndexes($this->name, $indexes); + return $this->database->createCommand()->createIndexes($this->name, $indexes, $execOptions); } /** * Drops collection indexes by name. * @param string $indexes wildcard for name of the indexes to be dropped. * You can use `*` to drop all indexes. + * @param array $execOptions -> goto Command::execute() * @return int count of dropped indexes. */ - public function dropIndexes($indexes) + public function dropIndexes($indexes, $execOptions = []) { - $result = $this->database->createCommand()->dropIndexes($this->name, $indexes); + $result = $this->database->createCommand()->dropIndexes($this->name, $indexes, $execOptions); return $result['nIndexesWas']; } @@ -144,13 +148,14 @@ public function dropIndexes($indexes) * ``` * * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::execute() * @throws Exception on failure. * @return bool whether the operation successful. */ - public function createIndex($columns, $options = []) + public function createIndex($columns, $options = [], $execOptions = []) { $index = array_merge(['key' => $columns], $options); - return $this->database->createCommand()->createIndexes($this->name, [$index]); + return $this->database->createCommand()->createIndexes($this->name, [$index], $execOptions); } /** @@ -171,17 +176,18 @@ public function createIndex($columns, $options = []) * ] * ``` * + * @param array $execOptions -> goto Command::execute() * @throws Exception on failure. * @return bool whether the operation successful. */ - public function dropIndex($columns) + public function dropIndex($columns, $execOptions = []) { $existingIndexes = $this->listIndexes(); $indexKey = $this->database->connection->getQueryBuilder()->buildSortFields($columns); foreach ($existingIndexes as $index) { if ($index['key'] == $indexKey) { - $this->database->createCommand()->dropIndexes($this->name, $index['name']); + $this->database->createCommand()->dropIndexes($this->name, $index['name'], $execOptions); return true; } } @@ -190,7 +196,7 @@ public function dropIndex($columns) $indexName = $this->database->connection->getQueryBuilder()->generateIndexName($indexKey); foreach ($existingIndexes as $index) { if ($index['name'] === $indexName) { - $this->database->createCommand()->dropIndexes($this->name, $index['name']); + $this->database->createCommand()->dropIndexes($this->name, $index['name'], $execOptions); return true; } } @@ -200,12 +206,13 @@ public function dropIndex($columns) /** * Drops all indexes for this collection. + * @param array $execOptions -> goto Command::execute() * @throws Exception on failure. * @return int count of dropped indexes. */ - public function dropAllIndexes() + public function dropAllIndexes($execOptions = []) { - $result = $this->database->createCommand()->dropIndexes($this->name, '*'); + $result = $this->database->createCommand()->dropIndexes($this->name, '*', $execOptions); return isset($result['nIndexesWas']) ? $result['nIndexesWas'] : 0; } @@ -215,15 +222,16 @@ public function dropAllIndexes() * @param array $condition query condition * @param array $fields fields to be selected * @param array $options query options (available since 2.1). + * @param array $execOptions -> goto Command::executeQuery() * @return \MongoDB\Driver\Cursor cursor for the search results * @see Query */ - public function find($condition = [], $fields = [], $options = []) + public function find($condition = [], $fields = [], $options = [], $execOptions = []) { if (!empty($fields)) { $options['projection'] = $fields; } - return $this->database->createCommand()->find($this->name, $condition, $options); + return $this->database->createCommand()->find($this->name, $condition, $options, $execOptions); } /** @@ -231,12 +239,13 @@ public function find($condition = [], $fields = [], $options = []) * @param array $condition query condition * @param array $fields fields to be selected * @param array $options query options (available since 2.1). + * @param array $execOptions -> goto Command::executeQuery() * @return array|null the single document. Null is returned if the query results in nothing. */ - public function findOne($condition = [], $fields = [], $options = []) + public function findOne($condition = [], $fields = [], $options = [], $execOptions = []) { $options['limit'] = 1; - $cursor = $this->find($condition, $fields, $options); + $cursor = $this->find($condition, $fields, $options, $execOptions); $rows = $cursor->toArray(); return empty($rows) ? null : current($rows); } @@ -249,9 +258,9 @@ public function findOne($condition = [], $fields = [], $options = []) * @return array|null the original document, or the modified document when $options['new'] is set. * @throws Exception on failure. */ - public function findAndModify($condition, $update, $options = []) + public function findAndModify($condition, $update, $options = [], $execOptions = []) { - return $this->database->createCommand()->findAndModify($this->name, $condition, $update, $options); + return $this->database->createCommand()->findAndModify($this->name, $condition, $update, $options, $execOptions); } /** @@ -259,23 +268,25 @@ public function findAndModify($condition, $update, $options = []) * @param array|object $data data to be inserted. * @param array $options list of options in format: optionName => optionValue. * @return \MongoDB\BSON\ObjectID new record ID instance. + * @param array $execOptions -> goto Command::executeBatch() * @throws Exception on failure. */ - public function insert($data, $options = []) + public function insert($data, $options = [], $execOptions = []) { - return $this->database->createCommand()->insert($this->name, $data, $options); + return $this->database->createCommand()->insert($this->name, $data, $options, $execOptions); } /** * Inserts several new rows into collection. * @param array $rows array of arrays or objects to be inserted. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::executeBatch() * @return array inserted data, each row will have "_id" key assigned to it. * @throws Exception on failure. */ - public function batchInsert($rows, $options = []) + public function batchInsert($rows, $options = [], $execOptions = []) { - $insertedIds = $this->database->createCommand()->batchInsert($this->name, $rows, $options); + $insertedIds = $this->database->createCommand()->batchInsert($this->name, $rows, $options, $execOptions); foreach ($rows as $key => $row) { $rows[$key]['_id'] = $insertedIds[$key]; } @@ -289,12 +300,13 @@ public function batchInsert($rows, $options = []) * @param array $condition description of the objects to update. * @param array $newData the object with which to update the matching records. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::executeBatch() * @return int|bool number of updated documents or whether operation was successful. * @throws Exception on failure. */ - public function update($condition, $newData, $options = []) + public function update($condition, $newData, $options = [], $execOptions = []) { - $writeResult = $this->database->createCommand()->update($this->name, $condition, $newData, $options); + $writeResult = $this->database->createCommand()->update($this->name, $condition, $newData, $options, $execOptions); return $writeResult->getModifiedCount() + $writeResult->getUpsertedCount(); } @@ -303,16 +315,17 @@ public function update($condition, $newData, $options = []) * @param array|object $data data to be updated/inserted. * @param array $options list of options in format: optionName => optionValue. * @return \MongoDB\BSON\ObjectID updated/new record id instance. + * @param array $execOptions -> goto Command::executeBatch() * @throws Exception on failure. */ - public function save($data, $options = []) + public function save($data, $options = [], $execOptions = []) { if (empty($data['_id'])) { - return $this->insert($data, $options); + return $this->insert($data, $options, $execOptions); } $id = $data['_id']; unset($data['_id']); - $this->update(['_id' => $id], ['$set' => $data], ['upsert' => true]); + $this->update(['_id' => $id], ['$set' => $data], ['upsert' => true], $execOptions); return is_object($id) ? $id : new ObjectID($id); } @@ -321,13 +334,14 @@ public function save($data, $options = []) * Removes data from the collection. * @param array $condition description of records to remove. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::executeBatch() * @return int|bool number of updated documents or whether operation was successful. * @throws Exception on failure. */ - public function remove($condition = [], $options = []) + public function remove($condition = [], $options = [], $execOptions = []) { $options = array_merge(['limit' => 0], $options); - $writeResult = $this->database->createCommand()->delete($this->name, $condition, $options); + $writeResult = $this->database->createCommand()->delete($this->name, $condition, $options, $execOptions); return $writeResult->getDeletedCount(); } @@ -338,9 +352,9 @@ public function remove($condition = [], $options = []) * @return int records count. * @since 2.1 */ - public function count($condition = [], $options = []) + public function count($condition = [], $options = [], $execOptions = []) { - return $this->database->createCommand()->count($this->name, $condition, $options); + return $this->database->createCommand()->count($this->name, $condition, $options, $execOptions); } /** @@ -351,9 +365,9 @@ public function count($condition = [], $options = []) * @return array|bool array of distinct values, or "false" on failure. * @throws Exception on failure. */ - public function distinct($column, $condition = [], $options = []) + public function distinct($column, $condition = [], $options = [], $execOptions = []) { - return $this->database->createCommand()->distinct($this->name, $column, $condition, $options); + return $this->database->createCommand()->distinct($this->name, $column, $condition, $options, $execOptions); } /** @@ -365,9 +379,9 @@ public function distinct($column, $condition = [], $options = []) * @return array|\MongoDB\Driver\Cursor the result of the aggregation. * @throws Exception on failure. */ - public function aggregate($pipelines, $options = []) + public function aggregate($pipelines, $options = [], $execOptions = []) { - return $this->database->createCommand()->aggregate($this->name, $pipelines, $options); + return $this->database->createCommand()->aggregate($this->name, $pipelines, $options, $execOptions); } /** @@ -385,9 +399,9 @@ public function aggregate($pipelines, $options = []) * @return array the result of the aggregation. * @throws Exception on failure. */ - public function group($keys, $initial, $reduce, $options = []) + public function group($keys, $initial, $reduce, $options = [], $execOptions = []) { - return $this->database->createCommand()->group($this->name, $keys, $initial, $reduce, $options); + return $this->database->createCommand()->group($this->name, $keys, $initial, $reduce, $options, $execOptions); } /** @@ -429,8 +443,8 @@ public function group($keys, $initial, $reduce, $options = []) * @return string|array the map reduce output collection name or output results. * @throws Exception on failure. */ - public function mapReduce($map, $reduce, $out, $condition = [], $options = []) + public function mapReduce($map, $reduce, $out, $condition = [], $options = [], $execOptions = []) { - return $this->database->createCommand()->mapReduce($this->name, $map, $reduce, $out, $condition, $options); + return $this->database->createCommand()->mapReduce($this->name, $map, $reduce, $out, $condition, $options, $execOptions); } } diff --git a/src/Command.php b/src/Command.php index e56b0b51c..85a629753 100644 --- a/src/Command.php +++ b/src/Command.php @@ -10,9 +10,6 @@ use MongoDB\BSON\ObjectID; use MongoDB\Driver\BulkWrite; use MongoDB\Driver\Exception\RuntimeException; -use MongoDB\Driver\ReadConcern; -use MongoDB\Driver\ReadPreference; -use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteResult; use Yii; use yii\base\InvalidConfigException; @@ -73,107 +70,30 @@ class Command extends BaseObject */ public $document = []; - /** - * @var ReadPreference|int|string|null command read preference. - */ - private $_readPreference; - /** - * @var WriteConcern|int|string|null write concern to be used by this command. - */ - private $_writeConcern; - /** - * @var ReadConcern|string read concern to be used by this command - */ - private $_readConcern; - + public $globalExecOptions = []; /** - * Returns read preference for this command. - * @return ReadPreference read preference. - */ - public function getReadPreference() - { - if (!is_object($this->_readPreference)) { - if ($this->_readPreference === null) { - $this->_readPreference = $this->db->manager->getReadPreference(); - } elseif (is_scalar($this->_readPreference)) { - $this->_readPreference = new ReadPreference($this->_readPreference); - } - } - return $this->_readPreference; - } - - /** - * Sets read preference for this command. - * @param ReadPreference|int|string|null $readPreference read reference, it can be specified as - * instance of [[ReadPreference]] or scalar mode value, for example: `ReadPreference::RP_PRIMARY`. - * @return $this self reference. - */ - public function setReadPreference($readPreference) - { - $this->_readPreference = $readPreference; - return $this; - } - - /** - * Returns write concern for this command. - * @return WriteConcern|null write concern to be used in this command. - */ - public function getWriteConcern() - { - if ($this->_writeConcern !== null) { - if (is_scalar($this->_writeConcern)) { - $this->_writeConcern = new WriteConcern($this->_writeConcern); - } - } - return $this->_writeConcern; - } - - /** - * Sets write concern for this command. - * @param WriteConcern|int|string|null $writeConcern write concern, it can be an instance of [[WriteConcern]] - * or its scalar mode value, for example: `majority`. - * @return $this self reference - */ - public function setWriteConcern($writeConcern) - { - $this->_writeConcern = $writeConcern; - return $this; - } - - /** - * Retuns read concern for this command. - * @return ReadConcern|string read concern to be used in this command. - */ - public function getReadConcern() - { - if ($this->_readConcern !== null) { - if (is_scalar($this->_readConcern)) { - $this->_readConcern = new ReadConcern($this->_readConcern); - } - } - return $this->_readConcern; - } - - /** - * Sets read concern for this command. - * @param ReadConcern|string $readConcern read concern, it can be an instance of [[ReadConcern]] or - * scalar level value, for example: 'local'. - * @return $this self reference - */ - public function setReadConcern($readConcern) - { - $this->_readConcern = $readConcern; - return $this; + * prepare execOptions for some purpose + * @param array|object by reference see Connection::prepareExceOptions + */ + private function prepareExecOptions(&$execOptions){ + $execOptions = empty($execOptions) ? $this->globalExecOptions : $execOptions; + $this->db->prepareExecOptions($execOptions); } /** * Executes this command. + * @param array $execOptions options for executeCommand + * Note: "readConcern" and "writeConcern" options will not default to corresponding values from the MongoDB + * Connection URI nor will the MongoDB server version be taken into account + * @see https://www.php.net/manual/en/mongodb-driver-server.executebulkwrite.php#refsect1-mongodb-driver-server.executebulkwrite-parameters * @return \MongoDB\Driver\Cursor result cursor. * @throws Exception on failure. */ - public function execute() + public function execute($execOptions = []) { + $this->prepareExecOptions($execOptions); + $databaseName = $this->databaseName === null ? $this->db->defaultDatabaseName : $this->databaseName; $token = $this->log([$databaseName, 'command'], $this->document, __METHOD__); @@ -183,7 +103,7 @@ public function execute() $this->db->open(); $mongoCommand = new \MongoDB\Driver\Command($this->document); - $cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $this->getReadPreference()); + $cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $execOptions); $cursor->setTypeMap($this->db->typeMap); $this->endProfile($token, __METHOD__); @@ -204,11 +124,15 @@ public function execute() * - 'insertedIds' - contains inserted IDs. * - 'result' - [[\MongoDB\Driver\WriteResult]] instance. * + * @param array $execOptions options for executeBulkWrite + * @see https://www.php.net/manual/en/mongodb-driver-server.executebulkwrite.php#refsect1-mongodb-driver-server.executebulkwrite-parameters * @throws Exception on failure. * @throws InvalidConfigException on invalid [[document]] format. */ - public function executeBatch($collectionName, $options = []) + public function executeBatch($collectionName, $options = [], $execOptions = []) { + $this->prepareExecOptions($execOptions); + $databaseName = $this->databaseName === null ? $this->db->defaultDatabaseName : $this->databaseName; $token = $this->log([$databaseName, $collectionName, 'bulkWrite'], $this->document, __METHOD__); @@ -236,7 +160,7 @@ public function executeBatch($collectionName, $options = []) } $this->db->open(); - $writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $this->getWriteConcern()); + $writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $execOptions); $this->endProfile($token, __METHOD__); } catch (RuntimeException $e) { @@ -254,11 +178,15 @@ public function executeBatch($collectionName, $options = []) * Executes this command as a mongo query * @param string $collectionName collection name * @param array $options query options. + * @param array $execOptions options for executeQuery + * @see https://www.php.net/manual/en/mongodb-driver-server.executequery.php#refsect1-mongodb-driver-server.executequery-parameters * @return \MongoDB\Driver\Cursor result cursor. * @throws Exception on failure */ - public function query($collectionName, $options = []) + public function query($collectionName, $options = [], $execOptions = []) { + $this->prepareExecOptions($execOptions); + $databaseName = $this->databaseName === null ? $this->db->defaultDatabaseName : $this->databaseName; $token = $this->log( @@ -273,17 +201,12 @@ public function query($collectionName, $options = []) __METHOD__ ); - $readConcern = $this->getReadConcern(); - if ($readConcern !== null) { - $options['readConcern'] = $readConcern; - } - try { $this->beginProfile($token, __METHOD__); $query = new \MongoDB\Driver\Query($this->document, $options); $this->db->open(); - $cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $this->getReadPreference()); + $cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $execOptions); $cursor->setTypeMap($this->db->typeMap); $this->endProfile($token, __METHOD__); @@ -297,13 +220,14 @@ public function query($collectionName, $options = []) /** * Drops database associated with this command. + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. */ - public function dropDatabase() + public function dropDatabase($execOptions = []) { $this->document = $this->db->getQueryBuilder()->dropDatabase(); - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } @@ -311,26 +235,28 @@ public function dropDatabase() * Creates new collection in database associated with this command.s * @param string $collectionName collection name * @param array $options collection options in format: "name" => "value" + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. */ - public function createCollection($collectionName, array $options = []) + public function createCollection($collectionName, array $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->createCollection($collectionName, $options); - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } /** * Drops specified collection. * @param string $collectionName name of the collection to be dropped. + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. */ - public function dropCollection($collectionName) + public function dropCollection($collectionName, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->dropCollection($collectionName); - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } @@ -348,13 +274,14 @@ public function dropCollection($collectionName) * * See [[https://docs.mongodb.com/manual/reference/method/db.collection.createIndex/#options-for-all-index-types]] * for the full list of options. + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. */ - public function createIndexes($collectionName, $indexes) + public function createIndexes($collectionName, $indexes, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->createIndexes($this->databaseName, $collectionName, $indexes); - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } @@ -362,28 +289,30 @@ public function createIndexes($collectionName, $indexes) * Drops collection indexes by name. * @param string $collectionName collection name. * @param string $indexes wildcard for name of the indexes to be dropped. + * @param array $execOptions -> goto Command::execute() * @return array result data. */ - public function dropIndexes($collectionName, $indexes) + public function dropIndexes($collectionName, $indexes, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->dropIndexes($collectionName, $indexes); - return current($this->execute()->toArray()); + return current($this->execute($execOptions)->toArray()); } /** * Returns information about current collection indexes. * @param string $collectionName collection name * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::execute() * @return array list of indexes info. * @throws Exception on failure. */ - public function listIndexes($collectionName, $options = []) + public function listIndexes($collectionName, $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->listIndexes($collectionName, $options); try { - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); } catch (Exception $e) { // The server may return an error if the collection does not exist. $notFoundCodes = [ @@ -405,13 +334,14 @@ public function listIndexes($collectionName, $options = []) * @param string $collectionName collection name * @param array $condition filter condition * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::execute() * @return int records count */ - public function count($collectionName, $condition = [], $options = []) + public function count($collectionName, $condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->count($collectionName, $condition, $options); - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['n']; } @@ -486,13 +416,14 @@ public function addDelete($condition, $options = []) * @param string $collectionName collection name * @param array $document document content * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::executeBatch() * @return ObjectID|bool inserted record ID, `false` - on failure. */ - public function insert($collectionName, $document, $options = []) + public function insert($collectionName, $document, $options = [], $execOptions = []) { $this->document = []; $this->addInsert($document); - $result = $this->executeBatch($collectionName, $options); + $result = $this->executeBatch($collectionName, $options, $execOptions); if ($result['result']->getInsertedCount() < 1) { return false; @@ -506,9 +437,10 @@ public function insert($collectionName, $document, $options = []) * @param string $collectionName collection name * @param array[] $documents documents list * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::executeBatch() * @return array|false list of inserted IDs, `false` on failure. */ - public function batchInsert($collectionName, $documents, $options = []) + public function batchInsert($collectionName, $documents, $options = [], $execOptions = []) { $this->document = []; foreach ($documents as $key => $document) { @@ -518,7 +450,7 @@ public function batchInsert($collectionName, $documents, $options = []) ]; } - $result = $this->executeBatch($collectionName, $options); + $result = $this->executeBatch($collectionName, $options, $execOptions); if ($result['result']->getInsertedCount() < 1) { return false; @@ -533,9 +465,10 @@ public function batchInsert($collectionName, $documents, $options = []) * @param array $condition filter condition * @param array $document data to be updated. * @param array $options update options. + * @param array $execOptions -> goto Command::executeBatch() * @return WriteResult write result. */ - public function update($collectionName, $condition, $document, $options = []) + public function update($collectionName, $condition, $document, $options = [], $execOptions = []) { $batchOptions = []; foreach (['bypassDocumentValidation'] as $name) { @@ -547,7 +480,7 @@ public function update($collectionName, $condition, $document, $options = []) $this->document = []; $this->addUpdate($condition, $document, $options); - $result = $this->executeBatch($collectionName, $batchOptions); + $result = $this->executeBatch($collectionName, $batchOptions, $execOptions); return $result['result']; } @@ -557,9 +490,10 @@ public function update($collectionName, $condition, $document, $options = []) * @param string $collectionName collection name. * @param array $condition filter condition. * @param array $options delete options. + * @param array $execOptions -> goto Command::executeBatch() * @return WriteResult write result. */ - public function delete($collectionName, $condition, $options = []) + public function delete($collectionName, $condition, $options = [], $execOptions = []) { $batchOptions = []; foreach (['bypassDocumentValidation'] as $name) { @@ -571,7 +505,7 @@ public function delete($collectionName, $condition, $options = []) $this->document = []; $this->addDelete($condition, $options); - $result = $this->executeBatch($collectionName, $batchOptions); + $result = $this->executeBatch($collectionName, $batchOptions, $execOptions); return $result['result']; } @@ -581,9 +515,10 @@ public function delete($collectionName, $condition, $options = []) * @param string $collectionName collection name * @param array $condition filter condition * @param array $options query options. + * @param array $execOptions -> goto Command::executeQuery() * @return \MongoDB\Driver\Cursor result cursor. */ - public function find($collectionName, $condition, $options = []) + public function find($collectionName, $condition, $options = [], $execOptions = []) { $queryBuilder = $this->db->getQueryBuilder(); @@ -612,7 +547,7 @@ public function find($collectionName, $condition, $options = []) } } - return $this->query($collectionName, $options); + return $this->query($collectionName, $options, $execOptions); } /** @@ -621,12 +556,13 @@ public function find($collectionName, $condition, $options = []) * @param array $condition query condition * @param array $update update criteria * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::execute() * @return array|null the original document, or the modified document when $options['new'] is set. */ - public function findAndModify($collectionName, $condition = [], $update = [], $options = []) + public function findAndModify($collectionName, $condition = [], $update = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->findAndModify($collectionName, $condition, $update, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -643,12 +579,13 @@ public function findAndModify($collectionName, $condition = [], $update = [], $o * @param string $fieldName field name to use. * @param array $condition query parameters. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions -> goto Command::execute() * @return array array of distinct values, or "false" on failure. */ - public function distinct($collectionName, $fieldName, $condition = [], $options = []) + public function distinct($collectionName, $fieldName, $condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->distinct($collectionName, $fieldName, $condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -672,12 +609,13 @@ public function distinct($collectionName, $fieldName, $condition = [], $options * @param array $options optional parameters to the group command. Valid options include: * - condition - criteria for including a document in the aggregation. * - finalize - function called once per unique key that takes the final output of the reduce function. + * @param array $execOptions -> goto Command::execute() * @return array the result of the aggregation. */ - public function group($collectionName, $keys, $initial, $reduce, $options = []) + public function group($collectionName, $keys, $initial, $reduce, $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->group($collectionName, $keys, $initial, $reduce, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -705,12 +643,13 @@ public function group($collectionName, $keys, $initial, $reduce, $options = []) * - jsMode: bool, specifies whether to convert intermediate data into BSON format between the execution of the map and reduce functions. * - verbose: bool, specifies whether to include the timing information in the result information. * + * @param array $execOptions -> goto Command::execute() * @return string|array the map reduce output collection name or output results. */ - public function mapReduce($collectionName, $map, $reduce, $out, $condition = [], $options = []) + public function mapReduce($collectionName, $map, $reduce, $out, $condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->mapReduce($collectionName, $map, $reduce, $out, $condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -724,9 +663,10 @@ public function mapReduce($collectionName, $map, $reduce, $out, $condition = [], * @param string $collectionName collection name * @param array $pipelines list of pipeline operators. * @param array $options optional parameters. + * @param array $execOptions -> goto Command::execute() * @return array|\MongoDB\Driver\Cursor aggregation result. */ - public function aggregate($collectionName, $pipelines, $options = []) + public function aggregate($collectionName, $pipelines, $options = [], $execOptions = []) { if (empty($options['cursor'])) { $returnCursor = false; @@ -736,7 +676,7 @@ public function aggregate($collectionName, $pipelines, $options = []) } $this->document = $this->db->getQueryBuilder()->aggregate($collectionName, $pipelines, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); if ($returnCursor) { return $cursor; @@ -749,12 +689,13 @@ public function aggregate($collectionName, $pipelines, $options = []) * Return an explanation of the query, often useful for optimization and debugging. * @param string $collectionName collection name * @param array $query query document. + * @param array $execOptions -> goto Command::execute() * @return array explanation of the query. */ - public function explain($collectionName, $query) + public function explain($collectionName, $query, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->explain($collectionName, $query); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); return current($cursor->toArray()); } @@ -763,16 +704,17 @@ public function explain($collectionName, $query) * Returns the list of available databases. * @param array $condition filter condition. * @param array $options options list. + * @param array $execOptions -> goto Command::execute() * @return array database information */ - public function listDatabases($condition = [], $options = []) + public function listDatabases($condition = [], $options = [], $execOptions = []) { if ($this->databaseName === null) { $this->databaseName = 'admin'; } $this->document = $this->db->getQueryBuilder()->listDatabases($condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); if (empty($result['databases'])) { @@ -785,12 +727,13 @@ public function listDatabases($condition = [], $options = []) * Returns the list of available collections. * @param array $condition filter condition. * @param array $options options list. + * @param array $execOptions -> goto Command::execute() * @return array collections information. */ - public function listCollections($condition = [], $options = []) + public function listCollections($condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->listCollections($condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); return $cursor->toArray(); } @@ -839,4 +782,4 @@ protected function endProfile($token, $category) Yii::endProfile($token, $category); } } -} +} \ No newline at end of file diff --git a/src/Connection.php b/src/Connection.php index 590f29d82..294635f90 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -10,6 +10,9 @@ use MongoDB\Driver\Manager; use yii\base\Component; use yii\base\InvalidConfigException; +use \MongoDB\Driver\ReadConcern; +use \MongoDB\Driver\WriteConcern; +use \MongoDB\Driver\ReadPreference; use Yii; /** @@ -83,6 +86,26 @@ class Connection extends Component * @event Event an event that is triggered after a DB connection is established */ const EVENT_AFTER_OPEN = 'afterOpen'; + /** + * @event yii\base\Event an event that is triggered right before a mongo client session is started + */ + const EVENT_START_SESSION = 'startSession'; + /** + * @event yii\base\Event an event that is triggered right after a mongo client session is ended + */ + const EVENT_END_SESSION = 'endSession'; + /** + * @event yii\base\Event an event that is triggered right before a transaction is started + */ + const EVENT_START_TRANSACTION = 'startTransaction'; + /** + * @event yii\base\Event an event that is triggered right after a transaction is committed + */ + const EVENT_COMMIT_TRANSACTION = 'commitTransaction'; + /** + * @event yii\base\Event an event that is triggered right after a transaction is rolled back + */ + const EVENT_ROLLBACK_TRANSACTION = 'rollbackTransaction'; /** * @var string host:port @@ -155,6 +178,8 @@ class Connection extends Component */ public $fileStreamWrapperClass = 'yii\mongodb\file\StreamWrapper'; + public $globalExecOptions = []; + /** * @var string name of the MongoDB database to use by default. * If this field left blank, connection instance will attempt to determine it from @@ -412,6 +437,7 @@ public function createCommand($document = [], $databaseName = null) 'db' => $this, 'databaseName' => $databaseName, 'document' => $document, + 'globalExecOptions' => $this->globalExecOptions ]); } @@ -432,4 +458,176 @@ public function registerFileStreamWrapper($force = false) return $this->fileStreamProtocol; } + + /** + * set global execOptions for Command::execute() and Command::executeBatch() and Command::query() + * this options when set if internal $execOptions is not set. + * @param array $execOptions see docs of Command::execute() and Command::executeBatch() and Command::query() + * @return $this + */ + public function execOptions($execOptions){ + if(empty($execOptions)) + $this->globalExecOptions = []; + else + $this->globalExecOptions = array_merge_recursive($this->globalExecOptions, $execOptions); + return $this; + } + + /** + * preapare execOptions for some purpose + * @param array|object by reference + * convert string option to object + * ['readConcern' => 'snapshot'] > ['readConcern' => new \MongoDB\Driver\ReadConcern('snapshot')] + * ['writeConcern' => 'majority'] > ['writeConcern' => new \MongoDB\Driver\WriteConcern('majority')] + * ['writeConcern' => ['majority',true]] > ['writeConcern' => new \MongoDB\Driver\WriteConcern('majority',true)] + */ + public static function prepareExecOptions(&$execOptions){ + + #convert readConcern option + if(array_key_exists('readConcern', $execOptions) && is_string($execOptions['readConcern'])) + $execOptions['readConcern'] = new ReadConcern($execOptions['readConcern']); + + #convert writeConcern option + if(array_key_exists('writeConcern', $execOptions)){ + if(is_string($execOptions['writeConcern'])) + $execOptions['writeConcern'] = new WriteConcern($execOptions['writeConcern']); + elseif(is_array($execOptions['writeConcern'])) + $execOptions['writeConcern'] = (new \ReflectionClass('\MongoDB\Driver\WriteConcern'))->newInstanceArgs($execOptions['writeConcern']); + } + + #conver readPreference option + if(array_key_exists('readPreference', $execOptions)){ + if(is_string($execOptions['readPreference'])) + $execOptions['readPreference'] = new ReadPreference($execOptions['readPreference']); + elseif(is_array($execOptions['readPreference'])) + $execOptions['readPreference'] = (new \ReflectionClass('\MongoDB\Driver\ReadPreference'))->newInstanceArgs($execOptions['readPreference']); + } + + #convert session option + if(array_key_exists('session',$execOptions)) + $execOptions['session'] = $execOptions['session']->mongoSession; + + #convert defaultTransactionOptions for MongoDB\Driver\Manager::startSession + if( + array_key_exists('defaultTransactionOptions',$execOptions) && + array_key_exists('readConcern',$execOptions['defaultTransactionOptions']) && + is_string($execOptions['defaultTransactionOptions']['readConcern']) + ) + $execOptions['defaultTransactionOptions']['readConcern'] = new ReadConcern($execOptions['defaultTransactionOptions']['readConcern']); + + if(array_key_exists('defaultTransactionOptions',$execOptions) && array_key_exists('writeConcern',$execOptions['defaultTransactionOptions'])){ + if(is_string($execOptions['defaultTransactionOptions']['writeConcern'])) + $execOptions['defaultTransactionOptions']['writeConcern'] = new WriteConcern($execOptions['defaultTransactionOptions']['writeConcern']); + else if(is_array($execOptions['defaultTransactionOptions']['writeConcern'])) + $execOptions['defaultTransactionOptions']['writeConcern'] = (new \ReflectionClass('\MongoDB\Driver\WriteConcern'))->newInstanceArgs($execOptions['defaultTransactionOptions']['writeConcern']); + } + + if(array_key_exists('defaultTransactionOptions',$execOptions) && array_key_exists('readPreference',$execOptions['defaultTransactionOptions'])){ + if(is_string($execOptions['defaultTransactionOptions']['readPreference'])) + $execOptions['defaultTransactionOptions']['readPreference'] = new ReadPreference($execOptions['defaultTransactionOptions']['readPreference']); + else if(is_array($execOptions['defaultTransactionOptions']['readPreference'])) + $execOptions['defaultTransactionOptions']['readPreference'] = (new \ReflectionClass('\MongoDB\Driver\ReadPreference'))->newInstanceArgs($execOptions['defaultTransactionOptions']['readPreference']); + } + } + + /** + * start new session for current connection + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function startSession($sessionOptions = []){ + return ClientSession::start($this, $sessionOptions); + } + + /** + * check if current connection is in session + * return bool + */ + public function getInSession(){ + return array_key_exists('session',$this->globalExecOptions); + } + + /** + * return current session + * return ClientSession|null + */ + public function getSession(){ + return $this->getInSession() ? $this->globalExecOptions['session'] : null; + } + + /** + * start transaction with three step : + * - start new session + * - start transaction of new session + * - set new session to current command + * @param array $transactionOptions see doc of Transaction::start() + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function startTransaction($transactionOptions = [], $sessionOptions = []){ + $newClientSession = $this->startSession($sessionOptions); + $newClientSession->getTransaction()->start($transactionOptions); + $this->withSession($newClientSession); + return $newClientSession; + } + + /** + * commit transaction in current session + */ + public function commitTransaction(){ + if(!$this->getInSession()) + throw new Exception('You can\'t commit transaction because current connection is\'t in a session.'); + if(!$this->getSession()->getHasTransaction()) + throw new Exception('You can\'t commit transaction because transaction not started in current session.'); + $this->getSession()->transaction->commit(); + } + + /** + * commit transaction in current session + */ + public function rollBackTransaction(){ + if(!$this->getInSession()) + throw new Exception('You can\'t roll back transaction because current connection is\'t in a session.'); + if(!$this->getSession()->getHasTransaction()) + throw new Exception('You can\'t roll back transaction because transaction not started in current session.'); + $this->getSession()->transaction->rollBack(); + } + + /** + * change current session of command (or drop session) + * @param ClientSession|null $clientSession new instance of ClientSession for replace + * return $this + */ + public function withSession($clientSession){ + #drop session + if(empty($clientSession)) + unset($this->globalExecOptions['session']); + else + $this->globalExecOptions['session'] = $clientSession; + return $this; + } + + /** + * easy start and commit transaction + * @param callable $actions your block of code must be run after transaction started and before commit + * if $actions return false then transaction rolled back. + * @param array $transactionOptions see doc of Transaction::start() + * @param array $sessionOptions see doc of ClientSession::start() + * return $this + */ + public function transaction(callable $actions, $transactionOptions = [], $sessionOptions = []){ + $newClientSession = $this->startTransaction($transactionOptions, $sessionOptions); + try { + $result = call_user_func($actions, $newClientSession); + if($newClientSession->getTransaction()->getIsActive()) + if($result === false) + $newClientSession->getTransaction()->rollBack(); + else + $newClientSession->getTransaction()->commit(); + } catch (\Exception $e){ + if($newClientSession->getTransaction()->getIsActive()) + $newClientSession->getTransaction()->rollBack(); + throw $e; + } + } } diff --git a/src/Database.php b/src/Database.php index 395aac79e..0eba51b5a 100644 --- a/src/Database.php +++ b/src/Database.php @@ -115,35 +115,38 @@ public function createCommand($document = []) * you need to create collection with the specific options. * @param string $name name of the collection * @param array $options collection options in format: "name" => "value" + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. * @throws Exception on failure. */ - public function createCollection($name, $options = []) + public function createCollection($name, $options = [], $execOptions = []) { - return $this->createCommand()->createCollection($name, $options); + return $this->createCommand()->createCollection($name, $options, $execOptions); } /** * Drops specified collection. * @param string $name name of the collection + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. * @since 2.1 */ - public function dropCollection($name) + public function dropCollection($name, $execOptions = []) { - return $this->createCommand()->dropCollection($name); + return $this->createCommand()->dropCollection($name, $execOptions); } /** * Returns the list of available collections in this database. * @param array $condition filter condition. * @param array $options options list. + * @param array $execOptions -> goto Command::execute() * @return array collections information. * @since 2.1.1 */ - public function listCollections($condition = [], $options = []) + public function listCollections($condition = [], $options = [], $execOptions = []) { - return $this->createCommand()->listCollections($condition, $options); + return $this->createCommand()->listCollections($condition, $options, $execOptions); } /** diff --git a/src/Transaction.php b/src/Transaction.php new file mode 100644 index 000000000..9e46e8e7e --- /dev/null +++ b/src/Transaction.php @@ -0,0 +1,85 @@ + + */ +class Transaction extends \yii\base\BaseObject +{ + + /** + * @var MongoDB\Driver\Session class represents a client session and Commands, queries, and write operations may then be associated the session. + * @see https://www.php.net/manual/en/class.mongodb-driver-session.php + */ + public $clientSession; + + /** + * Returns a value indicating whether this transaction is active. + * @return bool whether this transaction is active. Only an active transaction + * can [[commit()]] or [[rollBack()]]. + */ + public function getIsActive(){ + return $this->clientSession->db->getIsActive() && $this->clientSession->mongoSession->isInTransaction(); + } + + /** + * Start a transaction if session is not in transaction process. + * @see https://www.php.net/manual/en/mongodb-driver-session.starttransaction.php + * @param array $transactionOptions Options can be passed as argument to this method. + * Each element in this options array overrides the corresponding option from the "sessionOptions" option, + * if set when starting the session with ClientSession::start(). + * @see https://www.php.net/manual/en/mongodb-driver-session.starttransaction.php#refsect1-mongodb-driver-session.starttransaction-parameters + */ + public function start($transactionOptions = []){ + Connection::prepareExecOptions($transactionOptions); + yii::trace('Starting mongodb transaction ...', __METHOD__); + if($this->clientSession->mongoSession->isInTransaction()) + throw new Exception('Nested transaction not supported'); + $this->clientSession->db->trigger(Connection::EVENT_START_TRANSACTION); + $this->clientSession->mongoSession->startTransaction($transactionOptions); + yii::trace('MongoDB transaction started.', __METHOD__); + } + + /** + * Commit a transaction. + * @see https://www.php.net/manual/en/mongodb-driver-session.committransaction.php + */ + public function commit(){ + yii::trace('Committing mongodb transaction ...', __METHOD__); + $this->clientSession->mongoSession->commitTransaction(); + yii::trace('Commit mongodb transaction.', __METHOD__); + $this->clientSession->db->trigger(Connection::EVENT_COMMIT_TRANSACTION); + } + + /** + * Rolls back a transaction. + * @see https://www.php.net/manual/en/mongodb-driver-session.aborttransaction.php + */ + public function rollBack(){ + yii::trace('Rolling back mongodb transaction ...', __METHOD__); + $this->clientSession->mongoSession->abortTransaction(); + yii::trace('Roll back mongodb transaction.', __METHOD__); + $this->clientSession->db->trigger(Connection::EVENT_ROLLBACK_TRANSACTION); + } +} \ No newline at end of file