diff --git a/README.md b/README.md index b0ac6c7..9837fae 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,39 @@ $client->ping(); // true ## Publish Subscribe ```php +// queue usage example +$queue = $client->subscribe('test_subject'); + +$client->publish('test_subject', 'hello'); +$client->publish('test_subject', 'world'); + +// optional message fetch +// if there are no updates null will be returned +$message1 = $queue->fetch(); +echo $message1->getPayload(); // hello + +// locks untill message is fetched from subject +// to limit lock timeout, pass optional timeout value +$message2 = $queue->next(); +echo $message2->getPayload(); // world + +$client->publish('test_subject', 'hello'); +$client->publish('test_subject', 'batching'); + +// batch message fetching, limit argument is optional +$messages = $queue->fetchAll(10); +echo count($messages); + +// fetch all messages that are published to the subject client connection +// queue will stop message fetching when another subscription receives a message +// in advance you can time limit batch fetching +$queue->setLimit(1); // limit to 1 second +$messages = $queue->fetchAll(); + +// reset subscription +$client->unsubscribe($queue); + +// callback hell example $client->subscribe('hello', function ($message) { var_dump('got message', $message); // tester }); @@ -192,6 +225,46 @@ $goodbyer // $goodbyer->interrupt(); }); +// consumer can be used via queue interface +$queue = $goodbyer->getQueue(); +while ($message = $queue->next()) { + if (rand(1, 10) % 2 == 0) { + mail($address, "See you later"); + $message->ack(); + } else { + // ack with 1 second timeout + $message->nack(1); + } + // stop processing + if (rand(1, 10) % 2 == 10) { + // don't forget to unsubscribe + $client->unsubscribe($queue); + break; + } +} + +// use fetchAll method to batch process messages +// let's set batch size to 50 +$queue = $goodbyer->setBatching(50)->create()->getQueue(); + +// fetching 100 messages provides 2 stream requests +// limit message fetching to 1 second +// it means no more that 100 messages would be fetched +$messages = $queue->setLimit(1)->fetchAll(100); + +$recipients = []; +foreach ($messages as $message) { + $recipients[] = (string) $message->payload; +} + +mail_to_all($recipients, "See you later"); + +// ack all messages +foreach ($messages as $message) { + $message->ack(); +} + + // you also can create ephemeral consumer // the only thing that ephemeral consumer is created as soon as object is created // you have to create full consumer configuration first diff --git a/src/Client.php b/src/Client.php index aae3527..13291da 100644 --- a/src/Client.php +++ b/src/Client.php @@ -122,18 +122,21 @@ public function request(string $name, mixed $payload, Closure $handler): self return $this; } - public function subscribe(string $name, Closure $handler): self + public function subscribe(string $name, ?Closure $handler = null): self|Queue { return $this->doSubscribe($name, null, $handler); } - public function subscribeQueue(string $name, string $group, Closure $handler) + public function subscribeQueue(string $name, string $group, ?Closure $handler = null): self|Queue { return $this->doSubscribe($name, $group, $handler); } - public function unsubscribe(string $name): self + public function unsubscribe(string|Queue $name): self { + if ($name instanceof Queue) { + $name = $name->subject; + } foreach ($this->subscriptions as $i => $subscription) { if ($subscription['name'] == $name) { unset($this->subscriptions[$i]); @@ -162,7 +165,7 @@ public function setLogger(?LoggerInterface $logger): self return $this; } - public function process(null|int|float $timeout = 0, bool $reply = true) + public function process(null|int|float $timeout = 0, bool $reply = true): mixed { $message = $this->connection->getMessage($timeout); @@ -173,20 +176,30 @@ public function process(null|int|float $timeout = 0, bool $reply = true) } throw new LogicException("No handler for message $message->sid"); } - $result = $this->handlers[$message->sid]($message->payload, $message->replyTo); - if ($reply && $message->replyTo) { - $this->publish($message->replyTo, $result); + $handler = $this->handlers[$message->sid]; + if ($handler instanceof Queue) { + $handler->handle($message); + return $handler; + } else { + $result = $handler($message->payload, $message->replyTo); + if ($reply && $message->replyTo) { + $message->reply($result); + } + return $result; } - return $result; } else { return $message; } } - private function doSubscribe(string $subject, ?string $group, Closure $handler): self + private function doSubscribe(string $subject, ?string $group, ?Closure $handler = null): self|Queue { $sid = bin2hex(random_bytes(4)); - $this->handlers[$sid] = $handler; + if ($handler == null) { + $this->handlers[$sid] = new Queue($this, $subject); + } else { + $this->handlers[$sid] = $handler; + } $this->connection->sendMessage(new Subscribe([ 'sid' => $sid, @@ -199,6 +212,9 @@ private function doSubscribe(string $subject, ?string $group, Closure $handler): 'sid' => $sid, ]; + if ($handler == null) { + return $this->handlers[$sid]; + } return $this; } diff --git a/src/Consumer/Consumer.php b/src/Consumer/Consumer.php index 4dc4ec0..34020a8 100644 --- a/src/Consumer/Consumer.php +++ b/src/Consumer/Consumer.php @@ -4,9 +4,12 @@ namespace Basis\Nats\Consumer; +use Basis\Nats\Client; +use Basis\Nats\Queue; use Basis\Nats\Message\Payload; +use Basis\Nats\Message\Publish; use Closure; -use Basis\Nats\Client; +use Throwable; class Consumer { @@ -96,9 +99,11 @@ public function getIterations(): int return $this->iterations; } - public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack = true): int + public function getQueue(): Queue { - $requestSubject = '$JS.API.CONSUMER.MSG.NEXT.' . $this->getStream() . '.' . $this->getName(); + $queueSubject = 'handler.' . bin2hex(random_bytes(4)); + $queue = $this->client->subscribe($queueSubject); + $args = [ 'batch' => $this->getBatching(), ]; @@ -111,62 +116,59 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack $args['no_wait'] = true; } - $handlerSubject = 'handler.' . bin2hex(random_bytes(4)); + $launcher = new Publish([ + 'payload' => Payload::parse($args), + 'replyTo' => $queue->subject, + 'subject' => '$JS.API.CONSUMER.MSG.NEXT.' . $this->getStream() . '.' . $this->getName(), + ]); - $runtime = new Runtime(); - - $this->create(); - - $this->client->subscribe($handlerSubject, function ($message, $replyTo) use ($handler, $runtime) { - if (!($message instanceof Payload)) { - return; - } - - $kv_operation = $message->getHeader('KV-Operation'); + $queue->setLauncher($launcher); + return $queue; + } - // Consuming deleted or purged messages must not stop processing messages as more - // messages might arrive after this. - if (!$message->isEmpty() || $kv_operation === 'DEL' || $kv_operation === 'PURGE') { - $runtime->empty = false; - $runtime->processed++; + public function handle(Closure $messageHandler, Closure $emptyHandler = null, bool $ack = true): int + { + $queue = $this->create()->getQueue(); + $iterations = $this->getIterations(); + $processed = 0; - if (!$message->isEmpty()) { - $handler($message, $replyTo); + while (!$this->interrupt && $iterations--) { + $messages = $queue->fetchAll($this->getBatching()); + foreach ($messages as $message) { + $processed++; + $payload = $message->payload; + if ($payload->isEmpty()) { + if ($emptyHandler && !in_array($payload->getHeader('KV-Operation'), ['DEL', 'PURGE'])) { + $emptyHandler($payload, $message->replyTo); + } + continue; } - } - }); - - $iteration = $this->getIterations(); - while ($iteration--) { - $this->client->publish($requestSubject, $args, $handlerSubject); - - foreach (range(1, $this->batch) as $_) { - $runtime->empty = true; - // expires request means that we should receive answer from stream - // consumer timeout can be more that client connection timeout - $this->client->process($this->expires ? PHP_INT_MAX : null, $ack); - - if ($runtime->empty) { - if ($emptyHandler) { - $emptyHandler(); + try { + $messageHandler($payload, $message->replyTo); + if ($ack) { + $message->ack(); } - break; + } catch (Throwable $e) { + if ($ack) { + $message->nack(); + } + throw $e; + } + if ($this->interrupt) { + $this->interrupt = false; + break 2; } } - - if ($this->interrupt) { - $this->interrupt = false; - break; - } - - if ($iteration && $runtime->empty && !$expires) { - usleep((int) floor($this->getDelay() * 1_000_000)); + if (!count($messages) && $emptyHandler) { + $emptyHandler(); + if ($iterations) { + usleep((int) floor($this->getDelay() * 1_000_000)); + } } } - $this->client->unsubscribe($handlerSubject); - - return $runtime->processed; + $this->client->unsubscribe($queue); + return $processed; } public function info() diff --git a/src/Message/Ack.php b/src/Message/Ack.php new file mode 100644 index 0000000..5e2c428 --- /dev/null +++ b/src/Message/Ack.php @@ -0,0 +1,19 @@ +payload ?: Payload::parse(''))->render(); + return "PUB $this->subject $this->command $payload"; + } +} diff --git a/src/Message/Msg.php b/src/Message/Msg.php index 1cc1bd5..1deb6f4 100644 --- a/src/Message/Msg.php +++ b/src/Message/Msg.php @@ -4,7 +4,9 @@ namespace Basis\Nats\Message; +use Basis\Nats\Client; use Exception; +use LogicException; class Msg extends Prototype { @@ -17,6 +19,8 @@ class Msg extends Prototype public ?int $timestampNanos = null; public ?string $replyTo = null; + public readonly Client $client; + public static function create(string $data): self { $args = explode(' ', $data, 5); @@ -59,9 +63,30 @@ public static function create(string $data): self return new self($values); } - public function __toString(): string + public function ack(): void { - return $this->payload->body; + $this->reply(new Ack([ + 'subject' => $this->replyTo + ])); + } + + public function nack(float $delay = 0): void + { + $this->reply(new Ack([ + 'command' => '-NAK', + 'subject' => $this->replyTo, + 'payload' => Payload::parse([ + 'delay' => $delay, + ]), + ])); + } + + public function progress(): void + { + $this->reply(new Ack([ + 'command' => '+WPI', + 'subject' => $this->replyTo, + ])); } public function parse($payload): self @@ -106,6 +131,28 @@ public function render(): string return 'MSG ' . json_encode($this); } + public function reply($data): void + { + if (!$this->replyTo) { + throw new LogicException("Invalid replyTo property"); + } + if ($data instanceof Prototype) { + $this->client->connection->sendMessage($data); + } else { + $this->client->publish($this->replyTo, $data); + } + } + + public function setClient($client): void + { + $this->client = $client; + } + + public function __toString(): string + { + return $this->payload->body; + } + private static function tryParseMessageTime(array $values): array { if ( diff --git a/src/Queue.php b/src/Queue.php new file mode 100644 index 0000000..4fed362 --- /dev/null +++ b/src/Queue.php @@ -0,0 +1,93 @@ +timeout = $client->configuration->timeout; + } + + public function fetch(): ?Msg + { + $messages = $this->fetchAll(1); + return array_shift($messages); + } + + public function fetchAll(int $limit = 0): array + { + if ($this->launcher) { + $this->client->connection->sendMessage($this->launcher); + } + $max = microtime(true) + $this->timeout; + while (true) { + $now = microtime(true); + if ($limit && count($this->queue) >= $limit) { + // optional limit reached + break; + } + + $now = microtime(true); + $processingTimeout = $this->timeout ? $max - $now : 0; + if ($processingTimeout < 0) { + // optional timeout reached + break; + } + + if ($this->client->process($processingTimeout) !== $this) { + // stop when clients got message for another handler + break; + } + } + + $result = []; + while (count($this->queue) && (!$limit || count($result) < $limit)) { + $message = array_shift($this->queue); + $result[] = $message; + } + + return $result; + } + + public function handle(Msg $message) + { + $this->queue[] = $message; + } + + public function next(float $timeout = 0): Msg + { + $start = microtime(true); + while (true) { + $message = $this->fetch(); + if ($message) { + return $message; + } + if ($timeout && ($start + $timeout < microtime(true))) { + throw new Exception("Subject $this->subject is empty"); + } + } + } + + public function setLauncher(Publish $message): void + { + $this->launcher = $message; + } + + public function setTimeout(float $value): void + { + $this->timeout = $value; + } +} diff --git a/tests/Functional/ClientTest.php b/tests/Functional/ClientTest.php index ec8af51..b82678b 100644 --- a/tests/Functional/ClientTest.php +++ b/tests/Functional/ClientTest.php @@ -17,14 +17,16 @@ public function test() public function testConnectionTimeout(): void { - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Socket read timeout'); - $client = $this->createClient([ 'reconnect' => false, ]); $this->assertTrue($client->ping()); + $property = new ReflectionProperty(Connection::class, 'socket'); + $property->setAccessible(true); + fclose($property->getValue($client->connection)); + + $this->expectExceptionMessage('supplied resource is not a valid stream resource'); $client->process(1); } diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index 77b7e61..9de10ce 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -4,8 +4,10 @@ namespace Tests\Functional; +use Basis\Nats\Consumer\AckPolicy; use Basis\Nats\Consumer\Configuration; use Basis\Nats\Consumer\Consumer; +use Basis\Nats\Consumer\ReplayPolicy; use Basis\Nats\Message\Payload; use Basis\Nats\Stream\RetentionPolicy; use Basis\Nats\Stream\StorageBackend; @@ -17,6 +19,48 @@ class StreamTest extends FunctionalTestCase private bool $empty; + public function testNack() + { + $client = $this->createClient(); + $stream = $client->getApi()->getStream('nacks'); + $stream->getConfiguration()->setSubjects(['nacks'])->setRetentionPolicy(RetentionPolicy::INTEREST); + $stream->create(); + + $consumer = $stream->getConsumer('nacks'); + $consumer->setExpires(5); + $consumer->getConfiguration() + ->setSubjectFilter('nacks') + ->setReplayPolicy(ReplayPolicy::INSTANT) + ->setAckPolicy(AckPolicy::EXPLICIT); + + $consumer->create(); + + $stream->publish('nacks', 'first'); + $stream->publish('nacks', 'second'); + + $this->assertSame(2, $consumer->info()->num_pending); + + $queue = $consumer->getQueue(); + $message = $queue->fetch(); + $this->assertNotNull($message); + $this->assertSame((string) $message->payload, 'first'); + $message->nack(1); + + $this->assertSame(1, $consumer->info()->num_ack_pending); + $this->assertSame(1, $consumer->info()->num_pending); + + $queue->setTimeout(1); + $messages = $queue->fetchAll(); + $this->assertCount(1, $messages); + [$message] = $messages; + $this->assertSame((string) $message->payload, 'second'); + $message->progress(); + $message->ack(); + + $this->assertSame(1, $consumer->info()->num_ack_pending); + $this->assertSame(0, $consumer->info()->num_pending); + } + public function testConsumerExpiration() { $client = $this->createClient(['timeout' => 0.2, 'delay' => 0.1]); @@ -117,6 +161,7 @@ public function testInterrupt() $consumer->setBatching(1)->setIterations(2) ->handle(function ($response) use ($consumer) { $consumer->interrupt(); + $this->logger?->info('interrupt!!'); }); $this->assertWrongNumPending($consumer, 3); @@ -124,6 +169,7 @@ public function testInterrupt() $consumer->setBatching(2)->setIterations(1) ->handle(function ($response) use ($consumer) { $consumer->interrupt(); + $this->logger?->info('interrupt!!'); }); $this->assertWrongNumPending($consumer, 1); diff --git a/tests/Functional/SubjectTest.php b/tests/Functional/SubjectTest.php index 7ac693c..bbc3e97 100644 --- a/tests/Functional/SubjectTest.php +++ b/tests/Functional/SubjectTest.php @@ -15,6 +15,51 @@ class SubjectTest extends FunctionalTestCase private int $responseCounter = 0; private $socket; + public function testQueue() + { + $client = $this->createClient(['timeout' => 0.1]); + + $queue = $client->subscribe('handler'); + $queue->setTimeout(0.1); + + $client->publish('handler', 'tester'); + $client->logger?->info('published'); + $message = $queue->fetch(1); + $this->assertNotNull($message); + $this->assertSame("$message->payload", 'tester'); + + $message = $queue->fetch(1); + $this->assertNull($message); + $this->assertCount(0, $queue->fetchAll(10)); + $this->assertCount(0, $queue->fetchAll(10)); + + $client->publish('handler', 'tester1'); + $client->publish('handler', 'tester2'); + $this->assertCount(1, $queue->fetchAll(1)); + $this->assertCount(1, $queue->fetchAll(1)); + $this->assertCount(0, $queue->fetchAll(1)); + + $client->publish('handler', 'tester3'); + $client->publish('handler', 'tester4'); + $this->assertCount(2, $queue->fetchAll(10)); + $this->assertCount(0, $queue->fetchAll(10)); + + $client->publish('handler', 'tester5'); + $this->assertNotNull($queue->next()); + + $this->expectExceptionMessage("Subject handler is empty"); + $queue->next(0.1); + } + + public function testQueueUnsubscribe() + { + $client = $this->createClient(['timeout' => 0.1]); + $queue = $client->subscribe('bazyaba'); + $this->assertCount(1, $client->getSubscriptions()); + $client->unsubscribe($queue); + $this->assertCount(0, $client->getSubscriptions()); + } + public function testPublishSubscribe() { $this->tested = false;