From 53d04ba72f3eb88ed719f2d88ccab72847c2499d Mon Sep 17 00:00:00 2001 From: Cambell Prince Date: Fri, 29 Jun 2012 17:24:05 +0700 Subject: [PATCH] Made unbundle async. Some times it can run for minutes. --- api/v02/src/BundleHelper.php | 18 ++-- api/v02/src/HgResumeApi.php | 128 +++++++++++++++++++---------- api/v02/src/HgRunner.php | 20 ++--- api/v02/test/BundleHelper_Test.php | 16 ++-- 4 files changed, 110 insertions(+), 72 deletions(-) diff --git a/api/v02/src/BundleHelper.php b/api/v02/src/BundleHelper.php index b8d6d81..144d33f 100644 --- a/api/v02/src/BundleHelper.php +++ b/api/v02/src/BundleHelper.php @@ -1,14 +1,17 @@ getBundleFileName()); } - function cleanUpFiles() { + /** + * Removes the bundle file and the meta file + */ + public function cleanUp() { if (file_exists($this->getBundleFileName())) { unlink($this->getBundleFileName()); } diff --git a/api/v02/src/HgResumeApi.php b/api/v02/src/HgResumeApi.php index 4749a59..c6a5b8b 100644 --- a/api/v02/src/HgResumeApi.php +++ b/api/v02/src/HgResumeApi.php @@ -66,46 +66,84 @@ function pushBundleChunk($repoId, $bundleSize, $offset, $data, $transId) { // ------------------ $bundle = new BundleHelper($transId); - - // if the data sent falls before the start of window, mark it as received and reply with correct startOfWindow - // Fail if there is overlap or a mismatch between the start of window and the data offset - $startOfWindow = $bundle->getOffset(); - if ($offset != $startOfWindow) { // these are usually equal. It could be a client programming error if they are not - if ($offset < $startOfWindow) { - return new HgResumeResponse(HgResumeResponse::RECEIVED, array('sow' => $startOfWindow, 'Note' => 'server received duplicate data')); - } else { - return new HgResumeResponse(HgResumeResponse::FAIL, array('sow' => $startOfWindow, 'Error' => "data sent ($dataSize) with offset ($offset) falls after server's start of window ($startOfWindow)")); - } - } - - // write chunk data to bundle file - $bundleFile = fopen($bundle->getBundleFileName(), "a"); - fseek($bundleFile, $offset); - fwrite($bundleFile, $data); - fclose($bundleFile); - - $newSow = $offset + $dataSize; - $bundle->setOffset($newSow); - // for the final chunk; assemble the bundle and apply the bundle - if ($newSow == $bundleSize) { - try { - $hg->unbundle($bundle->getBundleFileName()); // TODO make this async - - $responseValues = array('transId' => $transId); - $response = new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues); - $this->finishPushBundle($transId); // clean up bundle assembly cache - } catch (Exception $e) { - $bundle->setOffset(0); - $responseValues = array('Error' => substr($e->getMessage(), 0, 1000)); - $responseValues['transId'] = $transId; - $response = new HgResumeResponse(HgResumeResponse::RESET, $responseValues); - //$this->finishPushBundle($transId); // clean up bundle assembly cache - } - return $response; - } else { - // received the chunk, but it's not the last one; we expect more chunks - $responseValues = array('transId' => $transId, 'sow' => $newSow); - return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); + switch ($bundle->getState()) { + case BundleHelper::State_Start: + $bundle->setState(BundleHelper::State_Uploading); + // Fall through to State_Uploading + case BundleHelper::State_Uploading: + // if the data sent falls before the start of window, mark it as received and reply with correct startOfWindow + // Fail if there is overlap or a mismatch between the start of window and the data offset + $startOfWindow = $bundle->getOffset(); + if ($offset != $startOfWindow) { // these are usually equal. It could be a client programming error if they are not + if ($offset < $startOfWindow) { + return new HgResumeResponse(HgResumeResponse::RECEIVED, array('sow' => $startOfWindow, 'Note' => 'server received duplicate data')); + } else { + return new HgResumeResponse(HgResumeResponse::FAIL, array('sow' => $startOfWindow, 'Error' => "data sent ($dataSize) with offset ($offset) falls after server's start of window ($startOfWindow)")); + } + } + // write chunk data to bundle file + $bundleFile = fopen($bundle->getBundleFileName(), "a"); + fseek($bundleFile, $offset); + fwrite($bundleFile, $data); + fclose($bundleFile); + + $newSow = $offset + $dataSize; + $bundle->setOffset($newSow); + + // for the final chunk; assemble the bundle and apply the bundle + if ($newSow == $bundleSize) { + $bundle->setState(BundleHelper::State_Unbundle); + try { // REVIEW Would be nice if the try / catch logic was universal. ie one policy for the api function. CP 2012-06 + $bundleFilePath = $bundle->getBundleFileName(); + $asyncRunner = new AsyncRunner($bundleFilePath); + $hg->unbundle($bundleFilePath, $asyncRunner); + for ($i = 0; $i < 4; $i++) { + if ($asyncRunner->isComplete()) { + if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) { + $responseValues = array('transId' => $transId); + return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); + } + $bundle->cleanUp(); + $asyncRunner->cleanUp(); + $responseValues = array('transId' => $transId); + return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues); + } + sleep(1); + } + $responseValues = array('transId' => $transId, 'sow' => $newSow); + return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); + // REVIEW Not sure what returning 'RECEIVED' will do to the client here, we've got all the data but need to + } catch (Exception $e) { + echo $e->getMessage(); // FIXME + $bundle->setOffset(0); + $responseValues = array('Error' => substr($e->getMessage(), 0, 1000)); + $responseValues['transId'] = $transId; + return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); + //$this->finishPushBundle($transId); // clean up bundle assembly cache + } + } else { + // received the chunk, but it's not the last one; we expect more chunks + $responseValues = array('transId' => $transId, 'sow' => $newSow); + return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); + } + break; + case BundleHelper::State_Unbundle: + $bundleFilePath = $bundle->getBundleFileName(); + $asyncRunner = new AsyncRunner($bundleFilePath); + if ($asyncRunner->isComplete()) { + if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) { + $responseValues = array('transId' => $transId); + return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); + } + $bundle->cleanUp(); + $asyncRunner->cleanUp(); + $responseValues = array('transId' => $transId); + return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues); + } else { + $responseValues = array('transId' => $transId, 'sow' => $newSow); + return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); + } + break; } } @@ -179,12 +217,12 @@ function pullBundleChunkInternal($repoId, $baseHash, $offset, $chunkSize, $trans } $bundle->setProp("tip", $hg->getTip()); $bundle->setProp("repoId", $repoId); - $bundle->setState(BundleHelper::State_MakingBundle); + $bundle->setState(BundleHelper::State_Bundle); } $response = new HgResumeResponse(HgResumeResponse::SUCCESS); switch ($bundle->getState()) { - case BundleHelper::State_MakingBundle: + case BundleHelper::State_Bundle: if ($asyncRunner->isComplete()) { if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) { $response = new HgResumeResponse(HgResumeResponse::FAIL); @@ -306,7 +344,7 @@ function getRevisions($repoId, $offset, $quantity) { function finishPushBundle($transId) { $bundle = new BundleHelper($transId); - if ($bundle->cleanUpFiles()) { + if ($bundle->cleanUp()) { return new HgResumeResponse(HgResumeResponse::SUCCESS); } else { return new HgResumeResponse(HgResumeResponse::FAIL); @@ -321,12 +359,12 @@ function finishPullBundle($transId) { $hg = new HgRunner($repoPath); // check that the repo has not been updated, since a pull was started if ($bundle->getProp("tip") != $hg->getTip()) { - $bundle->cleanUpFiles(); + $bundle->cleanUp(); return new HgResumeResponse(HgResumeResponse::RESET); } } } - if ($bundle->cleanUpFiles()) { + if ($bundle->cleanUp()) { return new HgResumeResponse(HgResumeResponse::SUCCESS); } return new HgResumeResponse(HgResumeResponse::FAIL); diff --git a/api/v02/src/HgRunner.php b/api/v02/src/HgRunner.php index 66bdbe6..b501f60 100644 --- a/api/v02/src/HgRunner.php +++ b/api/v02/src/HgRunner.php @@ -26,21 +26,15 @@ private function logEvent($message) { file_put_contents($logFilename, "$message\n", FILE_APPEND | LOCK_EX); } - function unbundle($filepath) { - if (is_file($filepath)) { - chdir($this->repoPath); // NOTE: I tried with -R and it didn't work for me. CP 2012-06 - // TODO Make this async - $cmd = "hg unbundle $filepath"; - $this->logEvent("cmd: $cmd"); - exec(escapeshellcmd($cmd), $output, $returnval); - if ($returnval != 0) { - $this->logEvent("previous cmd failed with returnval '$returnval' and output " - . implode("|", $output)); - throw new Exception("command '$cmd' failed!"); - } - } else { + function unbundle($filepath, $asyncRunner) { + if (!is_file($filepath)) { throw new Exception("bundle file '$filepath' is not a file!"); } + chdir($this->repoPath); // NOTE: I tried with -R and it didn't work for me. CP 2012-06 + // TODO Make this async + $cmd = "hg unbundle $filepath"; + $this->logEvent("cmd: $cmd"); + $asyncRunner->run($cmd); } function update($revision = "") { diff --git a/api/v02/test/BundleHelper_Test.php b/api/v02/test/BundleHelper_Test.php index cf2c2bd..6968f4f 100644 --- a/api/v02/test/BundleHelper_Test.php +++ b/api/v02/test/BundleHelper_Test.php @@ -6,12 +6,12 @@ class TestOfBundleHelper extends UnitTestCase { - function testCleanUpFiles_BundleFileExists_DeletesBundleFile() { + function testcleanUp_BundleFileExists_DeletesBundleFile() { $bundle = new BundleHelper(__FUNCTION__); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); $bundleFilename = $bundle->getBundleFileName(); file_put_contents($bundleFilename, "bundle data"); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); $this->assertFalse(is_file($bundleFilename)); } @@ -27,14 +27,14 @@ function testConstructor_TransIdCodeInjection_ThrowsException() { function testGetOffset_Unset_ReturnsZero() { $transId = __FUNCTION__; $bundle = new BundleHelper($transId); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); $this->assertEqual(0, $bundle->getOffset()); } function testSetGetOffset_SetThenGet_GetReturnsValueThatWasSet() { $transId = __FUNCTION__; $bundle = new BundleHelper($transId); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); $sow = 5023; $bundle->setOffset($sow); $this->assertEqual($sow, $bundle->getOffset()); @@ -44,7 +44,7 @@ function testGetState_GetReturnsDefault() { $transId = __FUNCTION__; $bundle = new BundleHelper($transId); $this->assertEqual(BundleHelper::State_Start, $bundle->getState()); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); } function testSetGetState_GetReturnsSet() { @@ -52,13 +52,13 @@ function testSetGetState_GetReturnsSet() { $bundle = new BundleHelper($transId); $bundle->setState(BundleHelper::State_Downloading); $this->assertEqual(BundleHelper::State_Downloading, $bundle->getState()); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); } function testSetGetHasProp_SetMultipleProps_GetPropsOkAndVerifyHasPropsOk() { $transId = __FUNCTION__; $bundle = new BundleHelper($transId); - $bundle->cleanUpFiles(); + $bundle->cleanUp(); $this->assertFalse($bundle->hasProp("tip")); $bundle->setProp("tip", "7890"); $this->assertTrue($bundle->hasProp("tip"));