Skip to content

Commit

Permalink
Use miliseconds for statistics
Browse files Browse the repository at this point in the history
store waiting_time in seconds+microseconds

store handling_time in seconds+microseconds

store failing_time in seconds+microseconds

Fixed statistics computing

Handle micro/milli seconds in twig time filter

Tested retry behavior

Fixed waiting_time for messages with delay
  • Loading branch information
nikophil authored and bendavies committed Oct 4, 2022
1 parent 1746e5c commit 5648987
Show file tree
Hide file tree
Showing 36 changed files with 438 additions and 343 deletions.
14 changes: 0 additions & 14 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,4 @@
<directory name="vendor"/>
</ignoreFiles>
</projectFiles>
<issueHandlers>
<MissingPropertyType>
<errorLevel type="suppress">
<file name="src/Storage/Doctrine/StoredMessage.php"/>
</errorLevel>
</MissingPropertyType>
<PossiblyFalseArgument>
<errorLevel type="suppress">
<file name="src/Storage/Doctrine/EventListener/UpdateStoredMessageListener.php"/>
<file name="src/Storage/Doctrine/EventListener/SaveRetriedMessageListener.php"/>
<file name="src/Storage/Doctrine/StoredMessage.php"/>
</errorLevel>
</PossiblyFalseArgument>
</issueHandlers>
</psalm>
12 changes: 6 additions & 6 deletions src/Statistics/MetricsPerMessageType.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public function __construct(
private \DateTimeImmutable $toDate,
private string $class,
private int $messagesCountOnPeriod,
private float $averageWaitingTime,
private float $averageHandlingTime
private ?float $averageWaitingTime,
private ?float $averageHandlingTime
) {
}

Expand All @@ -39,14 +39,14 @@ public function getMessagesHandledPerHour(): float
return round($this->getMessagesCount() / $this->getNbHoursInPeriod(), 2);
}

public function getAverageWaitingTime(): float
public function getAverageWaitingTime(): ?float
{
return round($this->averageWaitingTime, 2);
return null !== $this->averageWaitingTime ? round($this->averageWaitingTime, 6) : null;
}

public function getAverageHandlingTime(): float
public function getAverageHandlingTime(): ?float
{
return round($this->averageHandlingTime, 2);
return null !== $this->averageHandlingTime ? round($this->averageHandlingTime, 6) : null;
}

private function getNbHoursInPeriod(): float
Expand Down
62 changes: 28 additions & 34 deletions src/Storage/Doctrine/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Doctrine\DBAL\Types\Types;
use SymfonyCasts\MessengerMonitorBundle\Statistics\MetricsPerMessageType;
use SymfonyCasts\MessengerMonitorBundle\Statistics\Statistics;
use SymfonyCasts\MessengerMonitorBundle\Storage\Doctrine\Driver\SQLDriverInterface;

/**
* @internal
Expand All @@ -20,7 +19,7 @@
*/
class Connection
{
public function __construct(private DBALConnection $driverConnection, private SQLDriverInterface $SQLDriver, private string $tableName)
public function __construct(private DBALConnection $driverConnection, private string $tableName)
{
}

Expand All @@ -40,10 +39,7 @@ public function saveMessage(StoredMessage $storedMessage): void
[
'message_uid' => $storedMessage->getMessageUid(),
'class' => $storedMessage->getMessageClass(),
'dispatched_at' => $storedMessage->getDispatchedAt(),
],
[
'dispatched_at' => Types::DATETIME_IMMUTABLE,
'dispatched_at' => (float) $storedMessage->getDispatchedAt()->format('U.u'),
]
);

Expand All @@ -55,23 +51,18 @@ public function updateMessage(StoredMessage $storedMessage): void
$this->executeQuery(
$this->driverConnection->createQueryBuilder()
->update($this->tableName)
->set('received_at', ':received_at')
->set('waiting_time', ':waiting_time')
->set('receiver_name', ':receiver_name')
->set('handled_at', ':handled_at')
->set('failed_at', ':failed_at')
->set('handling_time', ':handling_time')
->set('failing_time', ':failing_time')
->where('id = :id')
->getSQL(),
[
'received_at' => $storedMessage->getReceivedAt(),
'handled_at' => $storedMessage->getHandledAt(),
'failed_at' => $storedMessage->getFailedAt(),
'waiting_time' => $storedMessage->getWaitingTime(),
'receiver_name' => $storedMessage->getReceiverName(),
'handling_time' => $storedMessage->getHandlingTime(),
'failing_time' => $storedMessage->getFailingTime(),
'id' => $storedMessage->getId(),
],
[
'received_at' => Types::DATETIME_IMMUTABLE,
'handled_at' => Types::DATETIME_IMMUTABLE,
'failed_at' => Types::DATETIME_IMMUTABLE,
]
);
}
Expand All @@ -93,15 +84,16 @@ public function findMessage(string $messageUid): ?StoredMessage
return null;
}

/** @psalm-suppress PossiblyFalseArgument */
return new StoredMessage(
$row['message_uid'],
$row['class'],
new \DateTimeImmutable($row['dispatched_at']),
\DateTimeImmutable::createFromFormat('U.u', sprintf('%.6f', $row['dispatched_at'])),
(int) $row['id'],
null !== $row['received_at'] ? new \DateTimeImmutable($row['received_at']) : null,
null !== $row['handled_at'] ? new \DateTimeImmutable($row['handled_at']) : null,
null !== $row['failed_at'] ? new \DateTimeImmutable($row['failed_at']) : null,
$row['receiver_name'] ?? null
null !== $row['waiting_time'] ? (float) $row['waiting_time'] : null,
$row['receiver_name'] ?? null,
null !== $row['handling_time'] ? (float) $row['handling_time'] : null,
null !== $row['failing_time'] ? (float) $row['failing_time'] : null
);
}

Expand All @@ -110,15 +102,17 @@ public function getStatistics(\DateTimeImmutable $fromDate, \DateTimeImmutable $
$statement = $this->executeQuery(
$this->driverConnection->createQueryBuilder()
->select('count(id) as count_messages_on_period, class')
->addSelect(sprintf('AVG(%s) AS average_waiting_time', $this->SQLDriver->getDateDiffInSecondsExpression('received_at', 'dispatched_at')))
->addSelect(sprintf('AVG(%s) AS average_handling_time', $this->SQLDriver->getDateDiffInSecondsExpression('handled_at', 'received_at')))
->addSelect('AVG(waiting_time) AS average_waiting_time')
->addSelect('AVG(handling_time) AS average_handling_time')
->from($this->tableName)
->where('handled_at >= :from_date')
->andWhere('handled_at <= :to_date')
->where('dispatched_at >= :from_date')
->andWhere('dispatched_at <= :to_date')
->groupBy('class')
->getSQL(),
['from_date' => $fromDate, 'to_date' => $toDate],
['from_date' => Types::DATETIME_IMMUTABLE, 'to_date' => Types::DATETIME_IMMUTABLE]
[
'from_date' => (float) $fromDate->format('U'),
'to_date' => (float) $toDate->format('U'),
]
);

$statistics = new Statistics($fromDate, $toDate);
Expand All @@ -129,8 +123,8 @@ public function getStatistics(\DateTimeImmutable $fromDate, \DateTimeImmutable $
$toDate,
$row['class'],
(int) $row['count_messages_on_period'],
(float) $row['average_waiting_time'],
(float) $row['average_handling_time']
$row['average_waiting_time'] ? (float) $row['average_waiting_time'] : null,
$row['average_handling_time'] ? (float) $row['average_handling_time'] : null
)
);
}
Expand Down Expand Up @@ -182,10 +176,10 @@ private function addTableToSchema(Schema $schema): void
$table->addColumn('id', Types::INTEGER)->setNotnull(true)->setAutoincrement(true);
$table->addColumn('message_uid', Types::GUID)->setNotnull(true);
$table->addColumn('class', Types::STRING)->setLength(255)->setNotnull(true);
$table->addColumn('dispatched_at', Types::DATETIME_IMMUTABLE)->setNotnull(true);
$table->addColumn('received_at', Types::DATETIME_IMMUTABLE)->setNotnull(false);
$table->addColumn('handled_at', Types::DATETIME_IMMUTABLE)->setNotnull(false);
$table->addColumn('failed_at', Types::DATETIME_IMMUTABLE)->setNotnull(false);
$table->addColumn('dispatched_at', Types::FLOAT)->setNotnull(true);
$table->addColumn('waiting_time', Types::FLOAT)->setNotnull(false);
$table->addColumn('handling_time', Types::FLOAT)->setNotnull(false);
$table->addColumn('failing_time', Types::FLOAT)->setNotnull(false);
$table->addColumn('receiver_name', Types::STRING)->setLength(255)->setNotnull(false);
$table->addIndex(['dispatched_at']);
$table->addIndex(['class']);
Expand Down
11 changes: 1 addition & 10 deletions src/Storage/Doctrine/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\Persistence\ConnectionRegistry;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use SymfonyCasts\MessengerMonitorBundle\Storage\Doctrine\Driver\MySQLDriver;
use SymfonyCasts\MessengerMonitorBundle\Storage\Doctrine\Driver\PostgreSQLDriver;

/**
* @internal
Expand All @@ -24,15 +22,8 @@ public function __invoke(): Connection
try {
/** @var DBALConnection $driverConnection */
$driverConnection = $this->registry->getConnection($this->connectionName);
$databasePlatform = $driverConnection->getDatabasePlatform()->getName();

$driver = match ($databasePlatform) {
'mysql' => new MySQLDriver(),
'postgresql' => new PostgreSQLDriver(),
default => throw new InvalidConfigurationException(sprintf('Doctrine platform "%s" is not supported', $databasePlatform))
};

return new Connection($driverConnection, $driver, $this->tableName);
return new Connection($driverConnection, $this->tableName);
} catch (\InvalidArgumentException) {
throw new InvalidConfigurationException(sprintf('Doctrine connection with name "%s" does not exist', $this->connectionName));
}
Expand Down
16 changes: 0 additions & 16 deletions src/Storage/Doctrine/Driver/MySQLDriver.php

This file was deleted.

16 changes: 0 additions & 16 deletions src/Storage/Doctrine/Driver/PostgreSQLDriver.php

This file was deleted.

13 changes: 0 additions & 13 deletions src/Storage/Doctrine/Driver/SQLDriverInterface.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ public function __construct(private Connection $doctrineConnection)

public function onMessageRetried(MessageRetriedByUserEvent $retriedMessageEvent): void
{
/** @psalm-suppress PossiblyFalseArgument */
$this->doctrineConnection->saveMessage(
new StoredMessage(
$retriedMessageEvent->getMessageUid(),
$retriedMessageEvent->getMessageClass(),
\DateTimeImmutable::createFromFormat('U', (string) time())
\DateTimeImmutable::createFromFormat('0.u00 U', microtime())
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use SymfonyCasts\MessengerMonitorBundle\Storage\Doctrine\Connection;
use SymfonyCasts\MessengerMonitorBundle\Storage\Doctrine\StoredMessageProvider;

Expand All @@ -28,7 +29,10 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void
return;
}

$storedMessage->setReceivedAt(\DateTimeImmutable::createFromFormat('U', (string) time()));
/** @var DelayStamp $delayStamp */
$delayStamp = $event->getEnvelope()->last(DelayStamp::class) ?? new DelayStamp(0);

$storedMessage->updateWaitingTime($delayStamp->getDelay() / 1000);
$storedMessage->setReceiverName($event->getReceiverName());

$this->doctrineConnection->updateMessage($storedMessage);
Expand All @@ -42,7 +46,7 @@ public function onMessageHandled(WorkerMessageHandledEvent $event): void
return;
}

$storedMessage->setHandledAt(\DateTimeImmutable::createFromFormat('U', (string) time()));
$storedMessage->updateHandlingTime();
$this->doctrineConnection->updateMessage($storedMessage);
}

Expand All @@ -54,7 +58,7 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
return;
}

$storedMessage->setFailedAt(\DateTimeImmutable::createFromFormat('U', (string) time()));
$storedMessage->updateFailingTime();
$this->doctrineConnection->updateMessage($storedMessage);
}

Expand Down
Loading

0 comments on commit 5648987

Please sign in to comment.