Skip to content

Commit

Permalink
Made unbundle async. Some times it can run for minutes.
Browse files Browse the repository at this point in the history
  • Loading branch information
cambell-prince committed Jun 29, 2012
1 parent c337113 commit 53d04ba
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 72 deletions.
18 changes: 12 additions & 6 deletions api/v02/src/BundleHelper.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
<?php

class BundleHelper {
private $_transactionId;
private $_basePath;

const State_Start = 'Start';
const State_MakingBundle = 'MakingBundle';
const State_Start = 'Start';
const State_Bundle = 'Bundle';
const State_Downloading = 'Downloading';
const State_Uploading = 'Uploading';
const State_Unbundle = 'Unbundle';

function __construct($id) {
private $_transactionId;
private $_basePath;

public function __construct($id) {
if(!BundleHelper::validateAlphaNumeric($id)) {
throw new Exception("ValidationException: transId $id did not validate as alpha numeric!");
}
Expand All @@ -33,7 +36,10 @@ public function exists() {
return file_exists($this->getBundleFileName());
}

function cleanUpFiles() {
/**
* Removes the bundle file and the meta file
*/
public function cleanUp() {
if (file_exists($this->getBundleFileName())) {
unlink($this->getBundleFileName());
}
Expand Down
128 changes: 83 additions & 45 deletions api/v02/src/HgResumeApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,46 +66,84 @@ function pushBundleChunk($repoId, $bundleSize, $offset, $data, $transId) {
// ------------------

$bundle = new BundleHelper($transId);

// if the data sent falls before the start of window, mark it as received and reply with correct startOfWindow
// Fail if there is overlap or a mismatch between the start of window and the data offset
$startOfWindow = $bundle->getOffset();
if ($offset != $startOfWindow) { // these are usually equal. It could be a client programming error if they are not
if ($offset < $startOfWindow) {
return new HgResumeResponse(HgResumeResponse::RECEIVED, array('sow' => $startOfWindow, 'Note' => 'server received duplicate data'));
} else {
return new HgResumeResponse(HgResumeResponse::FAIL, array('sow' => $startOfWindow, 'Error' => "data sent ($dataSize) with offset ($offset) falls after server's start of window ($startOfWindow)"));
}
}

// write chunk data to bundle file
$bundleFile = fopen($bundle->getBundleFileName(), "a");
fseek($bundleFile, $offset);
fwrite($bundleFile, $data);
fclose($bundleFile);

$newSow = $offset + $dataSize;
$bundle->setOffset($newSow);
// for the final chunk; assemble the bundle and apply the bundle
if ($newSow == $bundleSize) {
try {
$hg->unbundle($bundle->getBundleFileName()); // TODO make this async

$responseValues = array('transId' => $transId);
$response = new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues);
$this->finishPushBundle($transId); // clean up bundle assembly cache
} catch (Exception $e) {
$bundle->setOffset(0);
$responseValues = array('Error' => substr($e->getMessage(), 0, 1000));
$responseValues['transId'] = $transId;
$response = new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
//$this->finishPushBundle($transId); // clean up bundle assembly cache
}
return $response;
} else {
// received the chunk, but it's not the last one; we expect more chunks
$responseValues = array('transId' => $transId, 'sow' => $newSow);
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
switch ($bundle->getState()) {
case BundleHelper::State_Start:
$bundle->setState(BundleHelper::State_Uploading);
// Fall through to State_Uploading
case BundleHelper::State_Uploading:
// if the data sent falls before the start of window, mark it as received and reply with correct startOfWindow
// Fail if there is overlap or a mismatch between the start of window and the data offset
$startOfWindow = $bundle->getOffset();
if ($offset != $startOfWindow) { // these are usually equal. It could be a client programming error if they are not
if ($offset < $startOfWindow) {
return new HgResumeResponse(HgResumeResponse::RECEIVED, array('sow' => $startOfWindow, 'Note' => 'server received duplicate data'));
} else {
return new HgResumeResponse(HgResumeResponse::FAIL, array('sow' => $startOfWindow, 'Error' => "data sent ($dataSize) with offset ($offset) falls after server's start of window ($startOfWindow)"));
}
}
// write chunk data to bundle file
$bundleFile = fopen($bundle->getBundleFileName(), "a");
fseek($bundleFile, $offset);
fwrite($bundleFile, $data);
fclose($bundleFile);

$newSow = $offset + $dataSize;
$bundle->setOffset($newSow);

// for the final chunk; assemble the bundle and apply the bundle
if ($newSow == $bundleSize) {
$bundle->setState(BundleHelper::State_Unbundle);
try { // REVIEW Would be nice if the try / catch logic was universal. ie one policy for the api function. CP 2012-06
$bundleFilePath = $bundle->getBundleFileName();
$asyncRunner = new AsyncRunner($bundleFilePath);
$hg->unbundle($bundleFilePath, $asyncRunner);
for ($i = 0; $i < 4; $i++) {
if ($asyncRunner->isComplete()) {
if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) {
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
}
$bundle->cleanUp();
$asyncRunner->cleanUp();
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues);
}
sleep(1);
}
$responseValues = array('transId' => $transId, 'sow' => $newSow);
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
// REVIEW Not sure what returning 'RECEIVED' will do to the client here, we've got all the data but need to
} catch (Exception $e) {
echo $e->getMessage(); // FIXME
$bundle->setOffset(0);
$responseValues = array('Error' => substr($e->getMessage(), 0, 1000));
$responseValues['transId'] = $transId;
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
//$this->finishPushBundle($transId); // clean up bundle assembly cache
}
} else {
// received the chunk, but it's not the last one; we expect more chunks
$responseValues = array('transId' => $transId, 'sow' => $newSow);
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
}
break;
case BundleHelper::State_Unbundle:
$bundleFilePath = $bundle->getBundleFileName();
$asyncRunner = new AsyncRunner($bundleFilePath);
if ($asyncRunner->isComplete()) {
if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) {
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
}
$bundle->cleanUp();
$asyncRunner->cleanUp();
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues);
} else {
$responseValues = array('transId' => $transId, 'sow' => $newSow);
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
}
break;
}
}

Expand Down Expand Up @@ -179,12 +217,12 @@ function pullBundleChunkInternal($repoId, $baseHash, $offset, $chunkSize, $trans
}
$bundle->setProp("tip", $hg->getTip());
$bundle->setProp("repoId", $repoId);
$bundle->setState(BundleHelper::State_MakingBundle);
$bundle->setState(BundleHelper::State_Bundle);
}

$response = new HgResumeResponse(HgResumeResponse::SUCCESS);
switch ($bundle->getState()) {
case BundleHelper::State_MakingBundle:
case BundleHelper::State_Bundle:
if ($asyncRunner->isComplete()) {
if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) {
$response = new HgResumeResponse(HgResumeResponse::FAIL);
Expand Down Expand Up @@ -306,7 +344,7 @@ function getRevisions($repoId, $offset, $quantity) {

function finishPushBundle($transId) {
$bundle = new BundleHelper($transId);
if ($bundle->cleanUpFiles()) {
if ($bundle->cleanUp()) {
return new HgResumeResponse(HgResumeResponse::SUCCESS);
} else {
return new HgResumeResponse(HgResumeResponse::FAIL);
Expand All @@ -321,12 +359,12 @@ function finishPullBundle($transId) {
$hg = new HgRunner($repoPath);
// check that the repo has not been updated, since a pull was started
if ($bundle->getProp("tip") != $hg->getTip()) {
$bundle->cleanUpFiles();
$bundle->cleanUp();
return new HgResumeResponse(HgResumeResponse::RESET);
}
}
}
if ($bundle->cleanUpFiles()) {
if ($bundle->cleanUp()) {
return new HgResumeResponse(HgResumeResponse::SUCCESS);
}
return new HgResumeResponse(HgResumeResponse::FAIL);
Expand Down
20 changes: 7 additions & 13 deletions api/v02/src/HgRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,15 @@ private function logEvent($message) {
file_put_contents($logFilename, "$message\n", FILE_APPEND | LOCK_EX);
}

function unbundle($filepath) {
if (is_file($filepath)) {
chdir($this->repoPath); // NOTE: I tried with -R and it didn't work for me. CP 2012-06
// TODO Make this async
$cmd = "hg unbundle $filepath";
$this->logEvent("cmd: $cmd");
exec(escapeshellcmd($cmd), $output, $returnval);
if ($returnval != 0) {
$this->logEvent("previous cmd failed with returnval '$returnval' and output "
. implode("|", $output));
throw new Exception("command '$cmd' failed!");
}
} else {
function unbundle($filepath, $asyncRunner) {
if (!is_file($filepath)) {
throw new Exception("bundle file '$filepath' is not a file!");
}
chdir($this->repoPath); // NOTE: I tried with -R and it didn't work for me. CP 2012-06
// TODO Make this async
$cmd = "hg unbundle $filepath";
$this->logEvent("cmd: $cmd");
$asyncRunner->run($cmd);
}

function update($revision = "") {
Expand Down
16 changes: 8 additions & 8 deletions api/v02/test/BundleHelper_Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

class TestOfBundleHelper extends UnitTestCase {

function testCleanUpFiles_BundleFileExists_DeletesBundleFile() {
function testcleanUp_BundleFileExists_DeletesBundleFile() {
$bundle = new BundleHelper(__FUNCTION__);
$bundle->cleanUpFiles();
$bundle->cleanUp();
$bundleFilename = $bundle->getBundleFileName();
file_put_contents($bundleFilename, "bundle data");
$bundle->cleanUpFiles();
$bundle->cleanUp();
$this->assertFalse(is_file($bundleFilename));
}

Expand All @@ -27,14 +27,14 @@ function testConstructor_TransIdCodeInjection_ThrowsException() {
function testGetOffset_Unset_ReturnsZero() {
$transId = __FUNCTION__;
$bundle = new BundleHelper($transId);
$bundle->cleanUpFiles();
$bundle->cleanUp();
$this->assertEqual(0, $bundle->getOffset());
}

function testSetGetOffset_SetThenGet_GetReturnsValueThatWasSet() {
$transId = __FUNCTION__;
$bundle = new BundleHelper($transId);
$bundle->cleanUpFiles();
$bundle->cleanUp();
$sow = 5023;
$bundle->setOffset($sow);
$this->assertEqual($sow, $bundle->getOffset());
Expand All @@ -44,21 +44,21 @@ function testGetState_GetReturnsDefault() {
$transId = __FUNCTION__;
$bundle = new BundleHelper($transId);
$this->assertEqual(BundleHelper::State_Start, $bundle->getState());
$bundle->cleanUpFiles();
$bundle->cleanUp();
}

function testSetGetState_GetReturnsSet() {
$transId = __FUNCTION__;
$bundle = new BundleHelper($transId);
$bundle->setState(BundleHelper::State_Downloading);
$this->assertEqual(BundleHelper::State_Downloading, $bundle->getState());
$bundle->cleanUpFiles();
$bundle->cleanUp();
}

function testSetGetHasProp_SetMultipleProps_GetPropsOkAndVerifyHasPropsOk() {
$transId = __FUNCTION__;
$bundle = new BundleHelper($transId);
$bundle->cleanUpFiles();
$bundle->cleanUp();
$this->assertFalse($bundle->hasProp("tip"));
$bundle->setProp("tip", "7890");
$this->assertTrue($bundle->hasProp("tip"));
Expand Down

0 comments on commit 53d04ba

Please sign in to comment.