Skip to content

Commit

Permalink
TRELLO-2517 : limit export files displayed + clean old ones
Browse files Browse the repository at this point in the history
  • Loading branch information
ssedoudbgouv committed Aug 26, 2024
1 parent 9093284 commit 878f4cd
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 14 deletions.
5 changes: 5 additions & 0 deletions app/config/TaskConfiguration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ case class TaskConfiguration(
subscription: SubscriptionTaskConfiguration,
reportClosure: ReportClosureTaskConfiguration,
orphanReportFileDeletion: OrphanReportFileDeletionTaskConfiguration,
oldReportExportDeletion: OldReportExportDeletionTaskConfiguration,
reportReminders: ReportRemindersTaskConfiguration,
inactiveAccounts: InactiveAccountsTaskConfiguration,
companyUpdate: CompanyUpdateTaskConfiguration,
Expand Down Expand Up @@ -39,6 +40,10 @@ case class OrphanReportFileDeletionTaskConfiguration(
startTime: LocalTime
)

case class OldReportExportDeletionTaskConfiguration(
startTime: LocalTime
)

case class ReportRemindersTaskConfiguration(
startTime: LocalTime,
intervalInHours: FiniteDuration,
Expand Down
16 changes: 13 additions & 3 deletions app/loader/SignalConsoApplicationLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ import tasks.account.InactiveAccountTask
import tasks.account.InactiveDgccrfAccountReminderTask
import tasks.account.InactiveDgccrfAccountRemoveTask
import tasks.company._
import tasks.report.FileDeletionTask
import tasks.report.OldReportExportDeletionTask
import tasks.report.OrphanReportFileDeletionTask
import tasks.report.ReportClosureTask
import tasks.report.ReportNotificationTask
import tasks.report.ReportRemindersTask
Expand Down Expand Up @@ -502,14 +503,22 @@ class SignalConsoComponents(
messagesApi
)

val fileDeletionTask = new FileDeletionTask(
val orphanReportFileDeletionTask = new OrphanReportFileDeletionTask(
actorSystem,
reportFileRepository,
s3Service,
taskConfiguration,
taskRepository
)

val oldReportExportDeletionTask = new OldReportExportDeletionTask(
actorSystem,
asyncFileRepository,
s3Service,
taskConfiguration,
taskRepository
)

val reportReminderTask = new ReportRemindersTask(
actorSystem,
reportRepository,
Expand Down Expand Up @@ -830,7 +839,8 @@ class SignalConsoComponents(
exportReportsToSFTPTask.schedule()
reportClosureTask.schedule()
reportReminderTask.schedule()
fileDeletionTask.schedule()
orphanReportFileDeletionTask.schedule()
oldReportExportDeletionTask.schedule()
if (applicationConfiguration.task.probe.active) {
probeOrchestrator.scheduleProbeTasks()
}
Expand Down
22 changes: 22 additions & 0 deletions app/repositories/asyncfiles/AsyncFileRepository.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package repositories.asyncfiles

import models._
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.Source
import repositories.CRUDRepository
import repositories.PostgresProfile.api._
import repositories.asyncfiles.AsyncFilesColumnType._
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import slick.jdbc.ResultSetConcurrency
import slick.jdbc.ResultSetType

import java.time.OffsetDateTime
import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -34,11 +39,28 @@ class AsyncFileRepository(override val dbConfig: DatabaseConfig[JdbcProfile])(im
.filterOpt(kind) { case (table, kind) =>
table.kind === kind
}
.filter(_.creationDate >= OffsetDateTime.now().minusDays(7))
.sortBy(_.creationDate.desc)
.to[List]
.result
)

def streamOldReportExports: Source[AsyncFile, NotUsed] = Source
.fromPublisher(
db.stream(
table
.filter(_.creationDate <= OffsetDateTime.now().minusDays(10))
.result
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10000
)
.transactionally
)
)
.log("user")

def deleteByUserId(userId: UUID): Future[Int] = db
.run(
table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package repositories.asyncfiles
import models.AsyncFile
import models.AsyncFileKind
import models.User
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.Source
import repositories.CRUDRepositoryInterface

import java.util.UUID
Expand All @@ -14,4 +16,6 @@ trait AsyncFileRepositoryInterface extends CRUDRepositoryInterface[AsyncFile] {
def list(user: User, kind: Option[AsyncFileKind] = None): Future[List[AsyncFile]]

def deleteByUserId(userId: UUID): Future[Int]

def streamOldReportExports: Source[AsyncFile, NotUsed]
}
52 changes: 52 additions & 0 deletions app/tasks/report/OldReportExportDeletionTask.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package tasks.report

import cats.implicits.toFunctorOps
import config.TaskConfiguration
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Sink
import repositories.asyncfiles.AsyncFileRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import services.S3ServiceInterface
import tasks.ScheduledTask
import tasks.model.TaskSettings.DailyTaskSettings
import utils.Logs.RichLogger

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

class OldReportExportDeletionTask(
actorSystem: ActorSystem,
reportFileRepository: AsyncFileRepositoryInterface,
s3Service: S3ServiceInterface,
taskConfiguration: TaskConfiguration,
taskRepository: TaskRepositoryInterface
)(implicit val executionContext: ExecutionContext, mat: Materializer)
extends ScheduledTask(6, "old_report_export_deletion_task", taskRepository, actorSystem, taskConfiguration) {

override val taskSettings = DailyTaskSettings(startTime = taskConfiguration.oldReportExportDeletion.startTime)

override def runTask(): Future[Unit] =
reportFileRepository.streamOldReportExports
.mapAsync(parallelism = 1) { r =>
logger.debug(s"Deleting file ${r.storageFilename}")
for {
_ <- r.storageFilename match {
case Some(storageFilename) =>
s3Service
.delete(storageFilename)
case None => Future.successful(Done)
}
_ = logger.debug(s"File ${r.storageFilename} deleted from s3")
// _ <- reportFileRepository.delete(r.id)
// _ = logger.debug(s"Export File ${r.storageFilename} deleted from signal conso database")
} yield ()
}
.recover { case ex =>
logger.errorWithTitle("old_report_export_deletion_task", "error deleting old report export file", ex)
}
.runWith(Sink.ignore)
.as(())

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import utils.Logs.RichLogger
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

class FileDeletionTask(
class OrphanReportFileDeletionTask(
actorSystem: ActorSystem,
reportFileRepository: ReportFileRepositoryInterface,
s3Service: S3ServiceInterface,
taskConfiguration: TaskConfiguration,
taskRepository: TaskRepositoryInterface
)(implicit val executionContext: ExecutionContext, mat: Materializer)
extends ScheduledTask(5, "report_file_deletion_task", taskRepository, actorSystem, taskConfiguration) {
extends ScheduledTask(5, "orphan_report_file_deletion_task", taskRepository, actorSystem, taskConfiguration) {

override val taskSettings = DailyTaskSettings(startTime = taskConfiguration.orphanReportFileDeletion.startTime)

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ scalafmtOnCompile := true
scalacOptions ++= Seq(
"-explaintypes",
"-Ywarn-macros:after",
"-release:17",
"-release:21",
"-Wconf:cat=unused-imports&src=views/.*:s",
"-Wconf:cat=unused:info",
s"-Wconf:src=${target.value}/.*:s",
Expand Down
4 changes: 4 additions & 0 deletions conf/common/task.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ task {
start-time = "07:00:00"
start-time = ${?ORPHAN_REPORT_FILE_DELETION_TASK_START_TIME}
}
old-report-export-deletion {
start-time = "07:15:00"
start-time = ${?OLD_REPORT_EXPORT_DELETION_TASK_START_TIME}
}


}
129 changes: 129 additions & 0 deletions test/tasks/report/OldReportExportDeletionTaskSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package tasks.report

import models.AsyncFileKind.ReportedPhones
import models.AsyncFileKind.ReportedWebsites
import models.AsyncFileKind.Reports
import models.report.Report
import models.AsyncFile
import models.AsyncFileKind
import org.specs2.concurrent.ExecutionEnv
import org.specs2.matcher.FutureMatchers
import org.specs2.mutable.Specification
import play.api.mvc.Results
import play.api.test.WithApplication
import repositories.event.EventFilter
import utils.Constants.ActionEvent.ActionEventValue
import utils.AppSpec
import utils.Fixtures
import utils.TestApp

import java.time.OffsetDateTime
import java.util.UUID
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.Duration

class OldReportExportDeletionTaskSpec(implicit ee: ExecutionEnv)
extends Specification
with AppSpec
with Results
with FutureMatchers {

val (app, components, queue) = TestApp.buildAppWithS3Queue()

lazy val asyncFileRepository = components.asyncFileRepository
lazy val userRepository = components.userRepository
lazy val fileDeletionTask = components.oldReportExportDeletionTask
lazy val eventRepository = components.eventRepository

val creationDate = OffsetDateTime.parse("2020-01-01T00:00:00Z")
val taskRunDate = OffsetDateTime.parse("2020-06-01T00:00:00Z")
val dateInThePast = taskRunDate.minusDays(5)
val dateInTheFuture = taskRunDate.plusDays(5)

def genUser() =
Fixtures.genUser.sample.get

def genAsyncFile(userId: UUID, creationDate: OffsetDateTime, kind: AsyncFileKind) =
AsyncFile(
UUID.randomUUID(),
userId,
creationDate = creationDate,
Some(UUID.randomUUID().toString),
kind,
Some(UUID.randomUUID().toString)
)

def hasEvent(report: Report, action: ActionEventValue): Future[Boolean] =
eventRepository
.getEvents(report.id, EventFilter(action = Some(action)))
.map(_.nonEmpty)

"OldReportExportDeletionTask should delete old files" >> {

val user = genUser()
val recentReportedWebsitesExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), ReportedWebsites)
val oldReportedWebsitesExport =
genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), ReportedWebsites)

val recentReportedPhonesExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), ReportedPhones)
val oldReportedPhonesExport =
genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), ReportedPhones)

val recentReportsExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), Reports)
val oldReportsExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), Reports)

def setup(): Future[Unit] =
for {
_ <- userRepository.create(user)
_ = queue.add(recentReportedWebsitesExport.storageFilename.get)
_ = queue.add(oldReportedWebsitesExport.storageFilename.get)
_ = queue.add(recentReportedPhonesExport.storageFilename.get)
_ = queue.add(oldReportedPhonesExport.storageFilename.get)
_ = queue.add(recentReportsExport.storageFilename.get)
_ = queue.add(oldReportsExport.storageFilename.get)

_ <- asyncFileRepository.create(recentReportedWebsitesExport)
_ <- asyncFileRepository.create(oldReportedWebsitesExport)
_ <- asyncFileRepository.create(recentReportedPhonesExport)
_ <- asyncFileRepository.create(oldReportedPhonesExport)
_ <- asyncFileRepository.create(recentReportsExport)
_ <- asyncFileRepository.create(oldReportsExport)

_ <- userRepository.get(user.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(recentReportedWebsitesExport.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(oldReportedWebsitesExport.id) map (_.isDefined must beTrue)
} yield ()

def check(): Future[Unit] =
for {
_ <- userRepository.get(user.id) map (_.isDefined must beTrue)

_ <- asyncFileRepository.get(recentReportedWebsitesExport.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(oldReportedWebsitesExport.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(recentReportedPhonesExport.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(oldReportedPhonesExport.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(recentReportsExport.id) map (_.isDefined must beTrue)
_ <- asyncFileRepository.get(oldReportsExport.id) map (_.isDefined must beTrue)

_ = queue.contains(recentReportedWebsitesExport.storageFilename.get) must beTrue
_ = queue.contains(oldReportedWebsitesExport.storageFilename.get) must beFalse
_ = queue.contains(recentReportedPhonesExport.storageFilename.get) must beTrue
_ = queue.contains(oldReportedPhonesExport.storageFilename.get) must beFalse
_ = queue.contains(recentReportsExport.storageFilename.get) must beTrue
_ = queue.contains(oldReportsExport.storageFilename.get) must beFalse

} yield ()

new WithApplication(app) {
Await.result(
for {
_ <- setup()
_ <- fileDeletionTask.runTask()
_ <- check()
} yield (),
Duration.Inf
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.Duration

class FileDeletionTaskSpec(implicit ee: ExecutionEnv)
class OrphanReportFileDeletionTaskSpec(implicit ee: ExecutionEnv)
extends Specification
with AppSpec
with Results
Expand All @@ -33,7 +33,7 @@ class FileDeletionTaskSpec(implicit ee: ExecutionEnv)

lazy val reportRepository = components.reportRepository
lazy val reportFileRepository = components.reportFileRepository
lazy val fileDeletionTask = components.fileDeletionTask
lazy val fileDeletionTask = components.orphanReportFileDeletionTask
lazy val eventRepository = components.eventRepository

val creationDate = OffsetDateTime.parse("2020-01-01T00:00:00Z")
Expand Down Expand Up @@ -67,7 +67,7 @@ class FileDeletionTaskSpec(implicit ee: ExecutionEnv)
.getEvents(report.id, EventFilter(action = Some(action)))
.map(_.nonEmpty)

"ReportClosureTask should close the reports that need to be closed" >> {
"OrphanReportFileDeletionTask should remove orphan reports" >> {

val report = genReport()
val recentReportFileLinkedToReport = genReportFile(reportId = report.id.some, creationDate = OffsetDateTime.now())
Expand Down
1 change: 1 addition & 0 deletions test/tasks/report/ReportRemindersTaskUnitSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ReportRemindersTaskUnitSpec extends Specification with FutureMatchers {
subscription = null,
reportClosure = null,
orphanReportFileDeletion = null,
oldReportExportDeletion = null,
reportReminders = ReportRemindersTaskConfiguration(
startTime = LocalTime.of(2, 0),
intervalInHours = 1.day,
Expand Down
Loading

0 comments on commit 878f4cd

Please sign in to comment.