diff --git a/app/config/TaskConfiguration.scala b/app/config/TaskConfiguration.scala index dc955c694..9e03af40f 100644 --- a/app/config/TaskConfiguration.scala +++ b/app/config/TaskConfiguration.scala @@ -10,6 +10,7 @@ case class TaskConfiguration( subscription: SubscriptionTaskConfiguration, reportClosure: ReportClosureTaskConfiguration, orphanReportFileDeletion: OrphanReportFileDeletionTaskConfiguration, + oldReportExportDeletion: OldReportExportDeletionTaskConfiguration, reportReminders: ReportRemindersTaskConfiguration, inactiveAccounts: InactiveAccountsTaskConfiguration, companyUpdate: CompanyUpdateTaskConfiguration, @@ -39,6 +40,10 @@ case class OrphanReportFileDeletionTaskConfiguration( startTime: LocalTime ) +case class OldReportExportDeletionTaskConfiguration( + startTime: LocalTime +) + case class ReportRemindersTaskConfiguration( startTime: LocalTime, intervalInHours: FiniteDuration, diff --git a/app/loader/SignalConsoApplicationLoader.scala b/app/loader/SignalConsoApplicationLoader.scala index 0bb0ff4b8..78ce382c5 100644 --- a/app/loader/SignalConsoApplicationLoader.scala +++ b/app/loader/SignalConsoApplicationLoader.scala @@ -103,7 +103,8 @@ import tasks.account.InactiveAccountTask import tasks.account.InactiveDgccrfAccountReminderTask import tasks.account.InactiveDgccrfAccountRemoveTask import tasks.company._ -import tasks.report.FileDeletionTask +import tasks.report.OldReportExportDeletionTask +import tasks.report.OrphanReportFileDeletionTask import tasks.report.ReportClosureTask import tasks.report.ReportNotificationTask import tasks.report.ReportRemindersTask @@ -500,7 +501,7 @@ class SignalConsoComponents( messagesApi ) - val fileDeletionTask = new FileDeletionTask( + val orphanReportFileDeletionTask = new OrphanReportFileDeletionTask( actorSystem, reportFileRepository, s3Service, @@ -508,6 +509,14 @@ class SignalConsoComponents( taskRepository ) + val oldReportExportDeletionTask = new OldReportExportDeletionTask( + actorSystem, + asyncFileRepository, + s3Service, + taskConfiguration, + taskRepository + ) + val reportReminderTask = new ReportRemindersTask( actorSystem, reportRepository, @@ -819,7 +828,8 @@ class SignalConsoComponents( exportReportsToSFTPTask.schedule() reportClosureTask.schedule() reportReminderTask.schedule() - fileDeletionTask.schedule() + orphanReportFileDeletionTask.schedule() + oldReportExportDeletionTask.schedule() if (applicationConfiguration.task.probe.active) { probeOrchestrator.scheduleProbeTasks() } diff --git a/app/repositories/asyncfiles/AsyncFileRepository.scala b/app/repositories/asyncfiles/AsyncFileRepository.scala index 52da794ec..2fd610718 100644 --- a/app/repositories/asyncfiles/AsyncFileRepository.scala +++ b/app/repositories/asyncfiles/AsyncFileRepository.scala @@ -1,12 +1,17 @@ package repositories.asyncfiles import models._ +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl.Source import repositories.CRUDRepository import repositories.PostgresProfile.api._ import repositories.asyncfiles.AsyncFilesColumnType._ import slick.basic.DatabaseConfig import slick.jdbc.JdbcProfile +import slick.jdbc.ResultSetConcurrency +import slick.jdbc.ResultSetType +import java.time.OffsetDateTime import java.util.UUID import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -34,11 +39,28 @@ class AsyncFileRepository(override val dbConfig: DatabaseConfig[JdbcProfile])(im .filterOpt(kind) { case (table, kind) => table.kind === kind } + .filter(_.creationDate >= OffsetDateTime.now().minusDays(1)) .sortBy(_.creationDate.desc) .to[List] .result ) + def streamOldReportExports: Source[AsyncFile, NotUsed] = Source + .fromPublisher( + db.stream( + table + .filter(_.creationDate <= OffsetDateTime.now().minusDays(10)) + .result + .withStatementParameters( + rsType = ResultSetType.ForwardOnly, + rsConcurrency = ResultSetConcurrency.ReadOnly, + fetchSize = 10000 + ) + .transactionally + ) + ) + .log("user") + def deleteByUserId(userId: UUID): Future[Int] = db .run( table diff --git a/app/repositories/asyncfiles/AsyncFileRepositoryInterface.scala b/app/repositories/asyncfiles/AsyncFileRepositoryInterface.scala index fe3b925f5..ee9c37347 100644 --- a/app/repositories/asyncfiles/AsyncFileRepositoryInterface.scala +++ b/app/repositories/asyncfiles/AsyncFileRepositoryInterface.scala @@ -2,6 +2,8 @@ package repositories.asyncfiles import models.AsyncFile import models.AsyncFileKind import models.User +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl.Source import repositories.CRUDRepositoryInterface import java.util.UUID @@ -14,4 +16,6 @@ trait AsyncFileRepositoryInterface extends CRUDRepositoryInterface[AsyncFile] { def list(user: User, kind: Option[AsyncFileKind] = None): Future[List[AsyncFile]] def deleteByUserId(userId: UUID): Future[Int] + + def streamOldReportExports: Source[AsyncFile, NotUsed] } diff --git a/app/tasks/report/OldReportExportDeletionTask.scala b/app/tasks/report/OldReportExportDeletionTask.scala new file mode 100644 index 000000000..4d5012a7d --- /dev/null +++ b/app/tasks/report/OldReportExportDeletionTask.scala @@ -0,0 +1,52 @@ +package tasks.report + +import cats.implicits.toFunctorOps +import config.TaskConfiguration +import org.apache.pekko.Done +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Sink +import repositories.asyncfiles.AsyncFileRepositoryInterface +import repositories.tasklock.TaskRepositoryInterface +import services.S3ServiceInterface +import tasks.ScheduledTask +import tasks.model.TaskSettings.DailyTaskSettings +import utils.Logs.RichLogger + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +class OldReportExportDeletionTask( + actorSystem: ActorSystem, + reportFileRepository: AsyncFileRepositoryInterface, + s3Service: S3ServiceInterface, + taskConfiguration: TaskConfiguration, + taskRepository: TaskRepositoryInterface +)(implicit val executionContext: ExecutionContext, mat: Materializer) + extends ScheduledTask(6, "old_report_export_deletion_task", taskRepository, actorSystem, taskConfiguration) { + + override val taskSettings = DailyTaskSettings(startTime = taskConfiguration.oldReportExportDeletion.startTime) + + override def runTask(): Future[Unit] = + reportFileRepository.streamOldReportExports + .mapAsync(parallelism = 1) { r => + logger.debug(s"Deleting file ${r.storageFilename}") + for { + _ <- r.storageFilename match { + case Some(storageFilename) => + s3Service + .delete(storageFilename) + case None => Future.successful(Done) + } + _ = logger.debug(s"File ${r.storageFilename} deleted from s3") +// _ <- reportFileRepository.delete(r.id) +// _ = logger.debug(s"Export File ${r.storageFilename} deleted from signal conso database") + } yield () + } + .recover { case ex => + logger.errorWithTitle("old_report_export_deletion_task", "error deleting old report export file", ex) + } + .runWith(Sink.ignore) + .as(()) + +} diff --git a/app/tasks/report/FileDeletionTask.scala b/app/tasks/report/OrphanReportFileDeletionTask.scala similarity index 91% rename from app/tasks/report/FileDeletionTask.scala rename to app/tasks/report/OrphanReportFileDeletionTask.scala index 565ad7a20..277b74999 100644 --- a/app/tasks/report/FileDeletionTask.scala +++ b/app/tasks/report/OrphanReportFileDeletionTask.scala @@ -15,14 +15,14 @@ import utils.Logs.RichLogger import scala.concurrent.ExecutionContext import scala.concurrent.Future -class FileDeletionTask( +class OrphanReportFileDeletionTask( actorSystem: ActorSystem, reportFileRepository: ReportFileRepositoryInterface, s3Service: S3ServiceInterface, taskConfiguration: TaskConfiguration, taskRepository: TaskRepositoryInterface )(implicit val executionContext: ExecutionContext, mat: Materializer) - extends ScheduledTask(5, "report_file_deletion_task", taskRepository, actorSystem, taskConfiguration) { + extends ScheduledTask(5, "orphan_report_file_deletion_task", taskRepository, actorSystem, taskConfiguration) { override val taskSettings = DailyTaskSettings(startTime = taskConfiguration.orphanReportFileDeletion.startTime) diff --git a/app/views/mails/consumer/reportAcknowledgment.scala.html b/app/views/mails/consumer/reportAcknowledgment.scala.html index 0d7d061c4..9647db2be 100644 --- a/app/views/mails/consumer/reportAcknowledgment.scala.html +++ b/app/views/mails/consumer/reportAcknowledgment.scala.html @@ -231,12 +231,13 @@
@Messages("ReportAckEmail.email", report.email)
+
@if(report.visibleToPro) { - @Messages("ReportAckEmail.contactAgreement", if(report.contactAgreement) { - Messages("ReportAckEmail.yes") + @if(report.contactAgreement) { + @Messages("ReportAckEmail.yes") } else { - Messages("ReportAckEmail.no") - }) + @Messages("ReportAckEmail.no") + }
} diff --git a/conf/common/task.conf b/conf/common/task.conf index cff3b9e92..18d18be04 100644 --- a/conf/common/task.conf +++ b/conf/common/task.conf @@ -54,6 +54,10 @@ task { start-time = "07:00:00" start-time = ${?ORPHAN_REPORT_FILE_DELETION_TASK_START_TIME} } + old-report-export-deletion { + start-time = "07:15:00" + start-time = ${?OLD_REPORT_EXPORT_DELETION_TASK_START_TIME} + } } \ No newline at end of file diff --git a/conf/messages.en b/conf/messages.en index 8b5a49006..aa298ef24 100644 --- a/conf/messages.en +++ b/conf/messages.en @@ -162,9 +162,8 @@ ReportAckEmail.consumer=Consumer ReportAckEmail.lastName=Last Name: {0} ReportAckEmail.firstName=First Name: {0} ReportAckEmail.email=Email: {0} -ReportAckEmail.contactAgreement=Consent for contact: {0} -ReportAckEmail.yes=yes -ReportAckEmail.no=no +ReportAckEmail.yes=As requested, your details will be sent to the company +ReportAckEmail.no=As requested, your details will not be sent to the company ReportAckEmail.understandSignalConso=Understanding SignalConso ReportAckEmail.yourReport=Your report ReportAckEmail.influencerName=Influencer: {0} diff --git a/conf/messages.fr b/conf/messages.fr index 7d6626b24..edc9992c6 100644 --- a/conf/messages.fr +++ b/conf/messages.fr @@ -164,9 +164,9 @@ ReportAckEmail.consumer=Consommateur ReportAckEmail.lastName=Nom : {0} ReportAckEmail.firstName=Prénom : {0} ReportAckEmail.email=Email : {0} -ReportAckEmail.contactAgreement=Accord pour contact : {0} -ReportAckEmail.yes=oui -ReportAckEmail.no=non + +ReportAckEmail.yes=Comme demandé, vos coordonnées seront transmise à l’entreprise +ReportAckEmail.no=Comme demandé, vos coordonnées ne seront pas transmise à l’entreprise ReportAckEmail.understandSignalConso=Comprendre SignalConso ReportAckEmail.yourReport=Votre signalement ReportAckEmail.influencerName=Influenceur: {0} diff --git a/test/tasks/report/OldReportExportDeletionTaskSpec.scala b/test/tasks/report/OldReportExportDeletionTaskSpec.scala new file mode 100644 index 000000000..fbcec5017 --- /dev/null +++ b/test/tasks/report/OldReportExportDeletionTaskSpec.scala @@ -0,0 +1,129 @@ +package tasks.report + +import models.AsyncFileKind.ReportedPhones +import models.AsyncFileKind.ReportedWebsites +import models.AsyncFileKind.Reports +import models.report.Report +import models.AsyncFile +import models.AsyncFileKind +import org.specs2.concurrent.ExecutionEnv +import org.specs2.matcher.FutureMatchers +import org.specs2.mutable.Specification +import play.api.mvc.Results +import play.api.test.WithApplication +import repositories.event.EventFilter +import utils.Constants.ActionEvent.ActionEventValue +import utils.AppSpec +import utils.Fixtures +import utils.TestApp + +import java.time.OffsetDateTime +import java.util.UUID +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +class OldReportExportDeletionTaskSpec(implicit ee: ExecutionEnv) + extends Specification + with AppSpec + with Results + with FutureMatchers { + + val (app, components, queue) = TestApp.buildAppWithS3Queue() + + lazy val asyncFileRepository = components.asyncFileRepository + lazy val userRepository = components.userRepository + lazy val fileDeletionTask = components.oldReportExportDeletionTask + lazy val eventRepository = components.eventRepository + + val creationDate = OffsetDateTime.parse("2020-01-01T00:00:00Z") + val taskRunDate = OffsetDateTime.parse("2020-06-01T00:00:00Z") + val dateInThePast = taskRunDate.minusDays(5) + val dateInTheFuture = taskRunDate.plusDays(5) + + def genUser() = + Fixtures.genUser.sample.get + + def genAsyncFile(userId: UUID, creationDate: OffsetDateTime, kind: AsyncFileKind) = + AsyncFile( + UUID.randomUUID(), + userId, + creationDate = creationDate, + Some(UUID.randomUUID().toString), + kind, + Some(UUID.randomUUID().toString) + ) + + def hasEvent(report: Report, action: ActionEventValue): Future[Boolean] = + eventRepository + .getEvents(report.id, EventFilter(action = Some(action))) + .map(_.nonEmpty) + + "OldReportExportDeletionTask should delete old files" >> { + + val user = genUser() + val recentReportedWebsitesExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), ReportedWebsites) + val oldReportedWebsitesExport = + genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), ReportedWebsites) + + val recentReportedPhonesExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), ReportedPhones) + val oldReportedPhonesExport = + genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), ReportedPhones) + + val recentReportsExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), Reports) + val oldReportsExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), Reports) + + def setup(): Future[Unit] = + for { + _ <- userRepository.create(user) + _ = queue.add(recentReportedWebsitesExport.storageFilename.get) + _ = queue.add(oldReportedWebsitesExport.storageFilename.get) + _ = queue.add(recentReportedPhonesExport.storageFilename.get) + _ = queue.add(oldReportedPhonesExport.storageFilename.get) + _ = queue.add(recentReportsExport.storageFilename.get) + _ = queue.add(oldReportsExport.storageFilename.get) + + _ <- asyncFileRepository.create(recentReportedWebsitesExport) + _ <- asyncFileRepository.create(oldReportedWebsitesExport) + _ <- asyncFileRepository.create(recentReportedPhonesExport) + _ <- asyncFileRepository.create(oldReportedPhonesExport) + _ <- asyncFileRepository.create(recentReportsExport) + _ <- asyncFileRepository.create(oldReportsExport) + + _ <- userRepository.get(user.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(recentReportedWebsitesExport.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(oldReportedWebsitesExport.id) map (_.isDefined must beTrue) + } yield () + + def check(): Future[Unit] = + for { + _ <- userRepository.get(user.id) map (_.isDefined must beTrue) + + _ <- asyncFileRepository.get(recentReportedWebsitesExport.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(oldReportedWebsitesExport.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(recentReportedPhonesExport.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(oldReportedPhonesExport.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(recentReportsExport.id) map (_.isDefined must beTrue) + _ <- asyncFileRepository.get(oldReportsExport.id) map (_.isDefined must beTrue) + + _ = queue.contains(recentReportedWebsitesExport.storageFilename.get) must beTrue + _ = queue.contains(oldReportedWebsitesExport.storageFilename.get) must beFalse + _ = queue.contains(recentReportedPhonesExport.storageFilename.get) must beTrue + _ = queue.contains(oldReportedPhonesExport.storageFilename.get) must beFalse + _ = queue.contains(recentReportsExport.storageFilename.get) must beTrue + _ = queue.contains(oldReportsExport.storageFilename.get) must beFalse + + } yield () + + new WithApplication(app) { + Await.result( + for { + _ <- setup() + _ <- fileDeletionTask.runTask() + _ <- check() + } yield (), + Duration.Inf + ) + } + } +} diff --git a/test/tasks/report/FileDeletionTaskSpec.scala b/test/tasks/report/OrphanReportFileDeletionTaskSpec.scala similarity index 95% rename from test/tasks/report/FileDeletionTaskSpec.scala rename to test/tasks/report/OrphanReportFileDeletionTaskSpec.scala index 95f039a37..28f6a2ac6 100644 --- a/test/tasks/report/FileDeletionTaskSpec.scala +++ b/test/tasks/report/OrphanReportFileDeletionTaskSpec.scala @@ -23,7 +23,7 @@ import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration.Duration -class FileDeletionTaskSpec(implicit ee: ExecutionEnv) +class OrphanReportFileDeletionTaskSpec(implicit ee: ExecutionEnv) extends Specification with AppSpec with Results @@ -33,7 +33,7 @@ class FileDeletionTaskSpec(implicit ee: ExecutionEnv) lazy val reportRepository = components.reportRepository lazy val reportFileRepository = components.reportFileRepository - lazy val fileDeletionTask = components.fileDeletionTask + lazy val fileDeletionTask = components.orphanReportFileDeletionTask lazy val eventRepository = components.eventRepository val creationDate = OffsetDateTime.parse("2020-01-01T00:00:00Z") @@ -67,7 +67,7 @@ class FileDeletionTaskSpec(implicit ee: ExecutionEnv) .getEvents(report.id, EventFilter(action = Some(action))) .map(_.nonEmpty) - "ReportClosureTask should close the reports that need to be closed" >> { + "OrphanReportFileDeletionTask should remove orphan reports" >> { val report = genReport() val recentReportFileLinkedToReport = genReportFile(reportId = report.id.some, creationDate = OffsetDateTime.now()) diff --git a/test/tasks/report/ReportRemindersTaskUnitSpec.scala b/test/tasks/report/ReportRemindersTaskUnitSpec.scala index 0ce764b57..1ebb19d0c 100644 --- a/test/tasks/report/ReportRemindersTaskUnitSpec.scala +++ b/test/tasks/report/ReportRemindersTaskUnitSpec.scala @@ -47,6 +47,7 @@ class ReportRemindersTaskUnitSpec extends Specification with FutureMatchers { subscription = null, reportClosure = null, orphanReportFileDeletion = null, + oldReportExportDeletion = null, reportReminders = ReportRemindersTaskConfiguration( startTime = LocalTime.of(2, 0), intervalInHours = 1.day, diff --git a/test/utils/AppSpec.scala b/test/utils/AppSpec.scala index abb0ffb9b..d5c24c212 100644 --- a/test/utils/AppSpec.scala +++ b/test/utils/AppSpec.scala @@ -32,6 +32,7 @@ import tasks.company.CompanySyncServiceInterface import java.io.File import java.time.LocalTime import java.time.format.DateTimeFormatter +import java.util.concurrent.ConcurrentLinkedQueue import scala.annotation.nowarn import scala.concurrent.ExecutionContext @@ -92,6 +93,18 @@ trait AppSpec extends BeforeAfterAll with Mockito { object TestApp { + def buildAppWithS3Queue(): ( + Application, + SignalConsoComponents, + ConcurrentLinkedQueue[String] + ) = { + val appEnv: play.api.Environment = play.api.Environment.simple(new File(".")) + val context: ApplicationLoader.Context = ApplicationLoader.Context.create(appEnv) + val s3Queue = new ConcurrentLinkedQueue[String]() + val loader = new DefaultApplicationLoader(None, s3Queue) + (loader.load(context), loader.components, s3Queue) + } + def buildApp( maybeConfiguration: Option[Configuration] = None ): ( @@ -100,7 +113,9 @@ object TestApp { ) = { val appEnv: play.api.Environment = play.api.Environment.simple(new File(".")) val context: ApplicationLoader.Context = ApplicationLoader.Context.create(appEnv) - val loader = new DefaultApplicationLoader(maybeConfiguration) + val s3Queue = new ConcurrentLinkedQueue[String]() + val loader = new DefaultApplicationLoader(maybeConfiguration, s3Queue) + (loader.load(context), loader.components) } @@ -113,7 +128,8 @@ object TestApp { } class DefaultApplicationLoader( - maybeConfiguration: Option[Configuration] = None + maybeConfiguration: Option[Configuration] = None, + s3Queue: ConcurrentLinkedQueue[String] ) extends ApplicationLoader with Mockito { var components: SignalConsoComponents = _ @@ -125,7 +141,7 @@ class DefaultApplicationLoader( override def load(context: ApplicationLoader.Context): Application = { components = new SignalConsoComponents(context) { - override def s3Service: S3ServiceInterface = new S3ServiceMock() + override def s3Service: S3ServiceInterface = new S3ServiceMock(s3Queue) override lazy val mailRetriesService: MailRetriesService = mailRetriesServiceMock diff --git a/test/utils/S3ServiceMock.scala b/test/utils/S3ServiceMock.scala index 706f22aa2..113c649d8 100644 --- a/test/utils/S3ServiceMock.scala +++ b/test/utils/S3ServiceMock.scala @@ -10,9 +10,11 @@ import org.apache.pekko.stream.connectors.s3.MultipartUploadResult import org.apache.pekko.stream.connectors.s3.ObjectMetadata import services.S3ServiceInterface +import java.util.concurrent.ConcurrentLinkedQueue import scala.concurrent.Future -class S3ServiceMock extends S3ServiceInterface { +class S3ServiceMock(atomicQueue: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]()) + extends S3ServiceInterface { override def upload(bucketKey: String): Sink[ByteString, Future[MultipartUploadResult]] = ??? @@ -20,7 +22,10 @@ class S3ServiceMock extends S3ServiceInterface { override def downloadOnCurrentHost(bucketKey: String, filePath: String): Future[IOResult] = ??? - override def delete(bucketKey: String): Future[Done] = Future.successful(Done) + override def delete(bucketKey: String): Future[Done] = Future.successful { + atomicQueue.remove(bucketKey) + Done + } override def getSignedUrl(bucketKey: String, method: HttpMethod): String = ???