-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1722 from betagouv/master
MEP
- Loading branch information
Showing
15 changed files
with
271 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package tasks.report | ||
|
||
import cats.implicits.toFunctorOps | ||
import config.TaskConfiguration | ||
import org.apache.pekko.Done | ||
import org.apache.pekko.actor.ActorSystem | ||
import org.apache.pekko.stream.Materializer | ||
import org.apache.pekko.stream.scaladsl.Sink | ||
import repositories.asyncfiles.AsyncFileRepositoryInterface | ||
import repositories.tasklock.TaskRepositoryInterface | ||
import services.S3ServiceInterface | ||
import tasks.ScheduledTask | ||
import tasks.model.TaskSettings.DailyTaskSettings | ||
import utils.Logs.RichLogger | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
|
||
class OldReportExportDeletionTask( | ||
actorSystem: ActorSystem, | ||
reportFileRepository: AsyncFileRepositoryInterface, | ||
s3Service: S3ServiceInterface, | ||
taskConfiguration: TaskConfiguration, | ||
taskRepository: TaskRepositoryInterface | ||
)(implicit val executionContext: ExecutionContext, mat: Materializer) | ||
extends ScheduledTask(6, "old_report_export_deletion_task", taskRepository, actorSystem, taskConfiguration) { | ||
|
||
override val taskSettings = DailyTaskSettings(startTime = taskConfiguration.oldReportExportDeletion.startTime) | ||
|
||
override def runTask(): Future[Unit] = | ||
reportFileRepository.streamOldReportExports | ||
.mapAsync(parallelism = 1) { r => | ||
logger.debug(s"Deleting file ${r.storageFilename}") | ||
for { | ||
_ <- r.storageFilename match { | ||
case Some(storageFilename) => | ||
s3Service | ||
.delete(storageFilename) | ||
case None => Future.successful(Done) | ||
} | ||
_ = logger.debug(s"File ${r.storageFilename} deleted from s3") | ||
// _ <- reportFileRepository.delete(r.id) | ||
// _ = logger.debug(s"Export File ${r.storageFilename} deleted from signal conso database") | ||
} yield () | ||
} | ||
.recover { case ex => | ||
logger.errorWithTitle("old_report_export_deletion_task", "error deleting old report export file", ex) | ||
} | ||
.runWith(Sink.ignore) | ||
.as(()) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
129 changes: 129 additions & 0 deletions
129
test/tasks/report/OldReportExportDeletionTaskSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package tasks.report | ||
|
||
import models.AsyncFileKind.ReportedPhones | ||
import models.AsyncFileKind.ReportedWebsites | ||
import models.AsyncFileKind.Reports | ||
import models.report.Report | ||
import models.AsyncFile | ||
import models.AsyncFileKind | ||
import org.specs2.concurrent.ExecutionEnv | ||
import org.specs2.matcher.FutureMatchers | ||
import org.specs2.mutable.Specification | ||
import play.api.mvc.Results | ||
import play.api.test.WithApplication | ||
import repositories.event.EventFilter | ||
import utils.Constants.ActionEvent.ActionEventValue | ||
import utils.AppSpec | ||
import utils.Fixtures | ||
import utils.TestApp | ||
|
||
import java.time.OffsetDateTime | ||
import java.util.UUID | ||
import scala.concurrent.Await | ||
import scala.concurrent.Future | ||
import scala.concurrent.duration.Duration | ||
|
||
class OldReportExportDeletionTaskSpec(implicit ee: ExecutionEnv) | ||
extends Specification | ||
with AppSpec | ||
with Results | ||
with FutureMatchers { | ||
|
||
val (app, components, queue) = TestApp.buildAppWithS3Queue() | ||
|
||
lazy val asyncFileRepository = components.asyncFileRepository | ||
lazy val userRepository = components.userRepository | ||
lazy val fileDeletionTask = components.oldReportExportDeletionTask | ||
lazy val eventRepository = components.eventRepository | ||
|
||
val creationDate = OffsetDateTime.parse("2020-01-01T00:00:00Z") | ||
val taskRunDate = OffsetDateTime.parse("2020-06-01T00:00:00Z") | ||
val dateInThePast = taskRunDate.minusDays(5) | ||
val dateInTheFuture = taskRunDate.plusDays(5) | ||
|
||
def genUser() = | ||
Fixtures.genUser.sample.get | ||
|
||
def genAsyncFile(userId: UUID, creationDate: OffsetDateTime, kind: AsyncFileKind) = | ||
AsyncFile( | ||
UUID.randomUUID(), | ||
userId, | ||
creationDate = creationDate, | ||
Some(UUID.randomUUID().toString), | ||
kind, | ||
Some(UUID.randomUUID().toString) | ||
) | ||
|
||
def hasEvent(report: Report, action: ActionEventValue): Future[Boolean] = | ||
eventRepository | ||
.getEvents(report.id, EventFilter(action = Some(action))) | ||
.map(_.nonEmpty) | ||
|
||
"OldReportExportDeletionTask should delete old files" >> { | ||
|
||
val user = genUser() | ||
val recentReportedWebsitesExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), ReportedWebsites) | ||
val oldReportedWebsitesExport = | ||
genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), ReportedWebsites) | ||
|
||
val recentReportedPhonesExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), ReportedPhones) | ||
val oldReportedPhonesExport = | ||
genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), ReportedPhones) | ||
|
||
val recentReportsExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now(), Reports) | ||
val oldReportsExport = genAsyncFile(user.id, creationDate = OffsetDateTime.now().minusYears(3), Reports) | ||
|
||
def setup(): Future[Unit] = | ||
for { | ||
_ <- userRepository.create(user) | ||
_ = queue.add(recentReportedWebsitesExport.storageFilename.get) | ||
_ = queue.add(oldReportedWebsitesExport.storageFilename.get) | ||
_ = queue.add(recentReportedPhonesExport.storageFilename.get) | ||
_ = queue.add(oldReportedPhonesExport.storageFilename.get) | ||
_ = queue.add(recentReportsExport.storageFilename.get) | ||
_ = queue.add(oldReportsExport.storageFilename.get) | ||
|
||
_ <- asyncFileRepository.create(recentReportedWebsitesExport) | ||
_ <- asyncFileRepository.create(oldReportedWebsitesExport) | ||
_ <- asyncFileRepository.create(recentReportedPhonesExport) | ||
_ <- asyncFileRepository.create(oldReportedPhonesExport) | ||
_ <- asyncFileRepository.create(recentReportsExport) | ||
_ <- asyncFileRepository.create(oldReportsExport) | ||
|
||
_ <- userRepository.get(user.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(recentReportedWebsitesExport.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(oldReportedWebsitesExport.id) map (_.isDefined must beTrue) | ||
} yield () | ||
|
||
def check(): Future[Unit] = | ||
for { | ||
_ <- userRepository.get(user.id) map (_.isDefined must beTrue) | ||
|
||
_ <- asyncFileRepository.get(recentReportedWebsitesExport.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(oldReportedWebsitesExport.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(recentReportedPhonesExport.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(oldReportedPhonesExport.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(recentReportsExport.id) map (_.isDefined must beTrue) | ||
_ <- asyncFileRepository.get(oldReportsExport.id) map (_.isDefined must beTrue) | ||
|
||
_ = queue.contains(recentReportedWebsitesExport.storageFilename.get) must beTrue | ||
_ = queue.contains(oldReportedWebsitesExport.storageFilename.get) must beFalse | ||
_ = queue.contains(recentReportedPhonesExport.storageFilename.get) must beTrue | ||
_ = queue.contains(oldReportedPhonesExport.storageFilename.get) must beFalse | ||
_ = queue.contains(recentReportsExport.storageFilename.get) must beTrue | ||
_ = queue.contains(oldReportsExport.storageFilename.get) must beFalse | ||
|
||
} yield () | ||
|
||
new WithApplication(app) { | ||
Await.result( | ||
for { | ||
_ <- setup() | ||
_ <- fileDeletionTask.runTask() | ||
_ <- check() | ||
} yield (), | ||
Duration.Inf | ||
) | ||
} | ||
} | ||
} |
Oops, something went wrong.