diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index a180d27b..84643583 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -59,6 +59,16 @@ class Resque_Worker */ private $child = null; + /** + * @var int How many to process per child. + */ + private $perChild = 1; + + /** + * @var int How many jobs processed since last fork. + */ + private $processedInChild = 0; + /** * Return all workers known to Resque as instantiated instances. */ @@ -126,8 +136,9 @@ public function setId($workerId) * this method. * * @param string|array $queues String with a single queue name, array with multiple. + * @param int $perChild How many jobs to perform per child */ - public function __construct($queues) + public function __construct($queues, $perChild = 1) { if(!is_array($queues)) { $queues = array($queues); @@ -142,6 +153,8 @@ public function __construct($queues) } $this->hostname = $hostname; $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); + + $this->perChild = $perChild; } /** @@ -191,19 +204,18 @@ public function work($interval = 5) $this->child = $this->fork(); - // Forked and we're the child. Run the job. + // We're the child. Run the job. if($this->child === 0 || $this->child === false) { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->log($status, self::LOG_VERBOSE); $this->perform($job); - if($this->child === 0) { - exit(0); - } + $this->doneWorking(); } if($this->child > 0) { // Parent process, sit and wait + $job = null; // we forget the job, because it might change $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); $this->updateProcLine($status); $this->log($status, self::LOG_VERBOSE); @@ -212,19 +224,33 @@ public function work($interval = 5) pcntl_wait($status); $exitStatus = pcntl_wexitstatus($status); if($exitStatus !== 0) { - $job->fail(new Resque_Job_DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); + $this->fail($exitStatus); + $this->doneWorking(); } + $this->child = null; } - $this->child = null; - $this->doneWorking(); } $this->unregisterWorker(); } + /** + * Fail the current job + * + * @param int $exitStatus the exit code + */ + private function fail($exitStatus) + { + // error, grab the job from redis and fail it + $jobData = Resque::redis()->get('worker:' . (string)$this); + $job = new Resque_Job($jobData['queue'], $jobData['payload']); + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + + } + /** * Process a single job. * @@ -304,12 +330,28 @@ private function fork() return false; } - $pid = pcntl_fork(); - if($pid === -1) { - throw new RuntimeException('Unable to fork child worker.'); - } - - return $pid; + // if we are the child + if ($this->child === 0 || $this->child === false) { + + // if we're meant to carry on in the same fork + if ($this->processedInChild > 0 && $this->processedInChild < $this->perChild) { + $this->processedInChild++; + return false; // tells work to move on as if we forked + } + + // otherwise we die and let the parent deal with the situation below + exit(0); + } else { + // if we pass test above, fork and reset + $this->processedInChild = 1; // theoretically this is unnecessary, since it's only incremented in the child which doesn't affect the parent + + $pid = pcntl_fork(); + if($pid === -1) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; + } } /** @@ -580,4 +622,4 @@ public function getStat($stat) return Resque_Stat::get($stat . ':' . $this); } } -?> \ No newline at end of file +?> diff --git a/resque.php b/resque.php index d85a70ea..ee7a759e 100644 --- a/resque.php +++ b/resque.php @@ -44,6 +44,12 @@ $count = $COUNT; } +$perChild = 1; +$PERCHILD = getenv('PERCHILD'); +if(!empty($PERCHILD) && $PERCHILD > 1) { + $perChild = $PERCHILD; +} + if($count > 1) { for($i = 0; $i < $count; ++$i) { $pid = pcntl_fork(); @@ -53,7 +59,7 @@ // Child, start the worker else if(!$pid) { $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); + $worker = new Resque_Worker($queues, $perChild); $worker->logLevel = $logLevel; fwrite(STDOUT, '*** Starting worker '.$worker."\n"); $worker->work($interval); @@ -64,7 +70,7 @@ // Start a single worker else { $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); + $worker = new Resque_Worker($queues, $perChild); $worker->logLevel = $logLevel; $PIDFILE = getenv('PIDFILE');