Skip to content

Commit

Permalink
Prevent concurrent runs of the encoder by using a lock file with time…
Browse files Browse the repository at this point in the history
…stamp

- Added a lock file mechanism to prevent the `run` function from executing concurrently.
- The lock file includes a timestamp to ensure it is valid only for 10 seconds.
- If the lock file is older than 10 seconds, it will be removed before creating a new one.
- Ensured the lock file is removed before every return statement to avoid stale locks.
- This change helps avoid issues caused by concurrent executions of the encoder.

This commit addresses the need to ensure only one instance of the encoder function runs at any given time, improving stability and reliability.
  • Loading branch information
Daniel Neto committed Jul 18, 2024
1 parent b8b50a1 commit a899045
Showing 1 changed file with 56 additions and 28 deletions.
84 changes: 56 additions & 28 deletions objects/Encoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public static function downloadFile($queue_id)

if (!self::canDownloadNow()) {
_error_log("downloadFile: there is a file downloading");
if(self::areDownloaded()){
if (self::areDownloaded()) {
$obj->error = false;
}
return $obj;
Expand Down Expand Up @@ -1039,21 +1039,39 @@ public static function run($try = 0)
{
global $global;
$maxTries = 4;
$lockFile = sys_get_temp_dir() . '/encoder_run.lock';

// Check if the lock file exists
if (file_exists($lockFile)) {
$lockFileContent = file_get_contents($lockFile);
$lockFileTime = intval($lockFileContent);

// If the lock file is older than 10 seconds, remove it
if (time() - $lockFileTime > 10) {
unlink($lockFile);
} else {
_error_log("Encoder::run: Lock file exists, exiting to prevent duplicate run.");
return false;
}
}

// Create the lock file with the current time
file_put_contents($lockFile, time());

if ($try > $maxTries) {
unlink($lockFile); // Remove the lock file before returning
return false;
}

if (self::areDownloading(array(Encoder::$STATUS_DOWNLOADING))) {
_error_log("You have a video downloading now, please wait ");
unlink($lockFile); // Remove the lock file before returning
return false;
}

$try++;
$obj = new stdClass();
$obj->error = true;
// check if is encoding something
//_error_log("Encoder::run: try=($try)");
$rows = static::areEncoding();
$rowNext = static::getNext();
$obj->hasNext = !empty($rowNext);
Expand Down Expand Up @@ -1081,61 +1099,62 @@ public static function run($try = 0)
$encoder->setStatus(Encoder::$STATUS_QUEUE);
$encoder->setStatus_obs($msg);
$encoder->save();
unlink($lockFile); // Remove the lock file before returning
return false;
} else if ($try <= $maxTries) {
$msg = "Encoder::run: Trying again: [$try] => Could not download the file " . json_encode($objFile);
_error_log($msg);
$encoder->setStatus(Encoder::$STATUS_QUEUE);
$encoder->setStatus_obs($msg);
$encoder->save();
unlink($lockFile); // Remove the lock file before returning
return self::run($try);
} else {
$msg = "Encoder::run: Max tries reached {$objFile->msg}";
_error_log($msg);
$obj->msg = $objFile->msg;
self::setStatusError($rowNext['id'], $msg);
unlink($lockFile); // Remove the lock file before returning
return false;
}
} elseif (!$objFile->error && !empty($return_vars->videos_id)) {
$encoder->setStatus(Encoder::$STATUS_ENCODING);
$encoder->save();
// run to try to download next
self::run(0);
self::sendImages($objFile->pathFileName, $return_vars, $encoder);
//self::sendRawVideo($objFile->pathFileName, $return_vars, $encoder);
// get the encode code and convert it
$code = new Format($encoder->getFormats_id());
$resp = $code->run($objFile->pathFileName, $encoder->getId());
if (!empty($resp->error)) {
if (empty($resp->code) && empty($resp->addInQueueAgain)) {
_error_log("Encoder::run: Encoder Run the code is empty: " . json_encode($resp));
unlink($lockFile); // Remove the lock file before returning
return false;
} else if ($resp->error === -1) {
unlink($lockFile); // Remove the lock file before returning
return false;
} else if ($try < 4) {
$msg = "Encoder::run: Trying again: [$try] => Execute code error 1 " . json_encode($resp->msg) . " \n Code: {$resp->code}";
_error_log($msg);
$encoder->setStatus(Encoder::$STATUS_QUEUE);
$encoder->setStatus_obs($msg);
$encoder->save();
unlink($lockFile); // Remove the lock file before returning
return static::run($try);
} else {
$obj->msg = "Execute code error 2 [{$objFile->pathFileName}] [{$rowNext['id']} == " . $encoder->getId() . '] ' . json_encode($resp->msg) . " \n Code: {$resp->code}";
_error_log("Encoder::run: Encoder Run: " . json_encode($obj));
self::setStatusError($encoder->getId(), $obj->msg);
unlink($lockFile); // Remove the lock file before returning
return false;
}
} else {
// if is audio send the spectrum image as well
if ($encoder->getFormats_id() == 6) {
self::sendSpectrumFromMP3($objFile->pathFileName, $return_vars, $encoder);
}
$obj->error = false;
$obj->msg = $resp->code;
// notify AVideo it is done
$response = $encoder->send();
if (!$response->error) {
// update queue status
$encoder->setStatus(Encoder::$STATUS_DONE);
$config = new Configuration();
if (!empty($config->getAutodelete())) {
Expand All @@ -1149,6 +1168,7 @@ public static function run($try = 0)
$msg = "Encoder::run: Send message error = " . $response->msg;
_error_log($msg);
self::setStatusError($encoder->getId(), $msg, 1);
unlink($lockFile); // Remove the lock file before returning
return false;
}
}
Expand All @@ -1157,38 +1177,41 @@ public static function run($try = 0)
$setError = false;
if ($objFile->error && !empty($objFile->msg)) {
$errorMsg[] = $objFile->msg;
}else{
$errorMsg[] = 'objFile no error '.json_encode($objFile);
} else {
$errorMsg[] = 'objFile no error ' . json_encode($objFile);
}
if (empty($return_vars->videos_id)) {
$errorMsg[] = 'return_vars->videos_id is empty';
$obj = Encoder::getVideosId($rowNext['id']);
if(empty($obj->videos_id)){
if (empty($obj->videos_id)) {
$errorMsg[] = 'We could not get videos_id check the streamer logs';
self::setStatusError($encoder->getId(), "We could not get videos_id check the streamer logs", 1);
unlink($lockFile); // Remove the lock file before returning
return false;
}
}else{
$errorMsg[] = 'videos_id = '.$return_vars->videos_id;
} else {
$errorMsg[] = 'videos_id = ' . $return_vars->videos_id;
}
if(!self::canEncodeNow()){
if (!self::canEncodeNow()) {
$setError = false;
$errorMsg[] = 'Something is encoding now';
}else{
} else {
$errorMsg[] = 'There is nothing encoding';
}
if(self::areDownloading()){
if (self::areDownloading()) {
$setError = false;
$errorMsg[] = 'Something is downloading now';
}else{
} else {
$errorMsg[] = 'There is nothing downloading';
}
_error_log("try [{$try}] " . implode(', ', $errorMsg) . ' ' . json_encode($return_vars));
if($setError){
if ($setError) {
self::setStatusError($encoder->getId(), "try [{$try}] ", 1);
}
unlink($lockFile); // Remove the lock file before returning
return false;
}
unlink($lockFile); // Remove the lock file before returning
return static::run(0);
}
} else {
Expand All @@ -1211,9 +1234,13 @@ public static function run($try = 0)
$msg .= (count($rows) == 1) ? " is encoding" : " are encoding";
$obj->msg = $msg;
}

// Remove the lock file before returning
unlink($lockFile);
return $obj;
}


private function notifyVideoIsDone($fail = 0)
{
global $global;
Expand Down Expand Up @@ -2631,36 +2658,37 @@ public static function setStreamerLog($encoder_queue_id, $msg, $type)
return self::sendToStreamer($target, $postFields, $return_vars, $encoder);
}

public static function getVideosId($encoder_queue_id){
public static function getVideosId($encoder_queue_id)
{
$e = new Encoder($encoder_queue_id);

$rv = $e->getReturn_vars();
if(!empty($rv)){
if (!empty($rv)) {
$json = json_decode($rv);
}
if(!empty($json)){
if (!empty($json)) {
$obj = $json;
error_log("queue: Encoder::sendFile done line=".__LINE__.' '.json_encode($json));
}else{
error_log("queue: Encoder::sendFile done line=" . __LINE__ . ' ' . json_encode($json));
} else {
$obj = new stdClass();
}
if(!empty($obj->videos_id)){
if (!empty($obj->videos_id)) {
return $obj;
}
$obj->videos_id = 0;
$obj->video_id_hash = '';
$f = new Format($e->getFormats_id());
$format = $f->getExtension();
$response = Encoder::sendFile('', 0, $format, $e);
error_log("queue: Encoder::sendFile done line=".__LINE__).' '.json_encode($response);
error_log("queue: Encoder::sendFile done line=" . __LINE__) . ' ' . json_encode($response);
//var_dump($response);exit;
if (!empty($response->response->video_id)) {
$obj->videos_id = $response->response->video_id;
}
if (!empty($response->response->video_id_hash)) {
$obj->video_id_hash = $response->response->video_id_hash;
}

$e->setReturn_vars(json_encode($obj));
$e->save();
return $obj;
Expand Down

0 comments on commit a899045

Please sign in to comment.