Skip to content

Commit

Permalink
Merge pull request #54 from betagouv/main
Browse files Browse the repository at this point in the history
MEP [TRELLO-2604] Migrate to new Insee Sirene API (#53)
  • Loading branch information
charlescd authored Oct 8, 2024
2 parents 19c55ad + bce73a2 commit a456919
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 61 deletions.
11 changes: 11 additions & 0 deletions app/Main.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import _root_.controllers._
import actors.InseeTokenActor
import clients.GeoApiClient
import clients.InseeClient
import clients.InseeClientImpl
Expand All @@ -23,6 +24,9 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import config.SignalConsoConfiguration.HashedTokenReader
import org.apache.pekko.actor.typed
import org.apache.pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import org.apache.pekko.util.Timeout
import org.flywaydb.core.Flyway
class Main extends ApplicationLoader {
var components: SignalConsoComponents = _
Expand Down Expand Up @@ -75,8 +79,15 @@ class SignalConsoComponents(

val geoApiClient = new GeoApiClient()

val inseeAuthTokenActor: typed.ActorRef[InseeTokenActor.Command] = actorSystem.spawn(
InseeTokenActor(inseeClient),
"insee-auth-token-actor"
)

implicit val timeout: Timeout = 30.seconds
val etablissementService =
new EtablissementImportService(
inseeAuthTokenActor,
inseeClient,
geoApiClient,
companyDataRepository,
Expand Down
103 changes: 103 additions & 0 deletions app/actors/InseeTokenActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package actors

import actors.InseeTokenActor.FetchToken
import actors.InseeTokenActor.FetchTokenFailed
import actors.InseeTokenActor.FetchTokenSuccess
import actors.InseeTokenActor.GetToken
import actors.InseeTokenActor.GotToken
import actors.InseeTokenActor.RenewToken
import actors.InseeTokenActor.TokenError
import clients.InseeClient
import models.insee.token.InseeTokenResponse
import org.apache.pekko.actor.typed.ActorRef
import org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.scaladsl._

import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
import org.apache.pekko.actor.typed.scaladsl.adapter._
import org.apache.pekko.actor.Scheduler
import org.apache.pekko.pattern.retry
import play.api.Logger

object InseeTokenActor {
sealed trait Command
case class GetToken(replyTo: ActorRef[Reply]) extends Command
case class RenewToken(replyTo: ActorRef[Reply]) extends Command
private case class FetchToken(replyTo: ActorRef[Reply]) extends Command
private case class FetchTokenSuccess(replyTo: ActorRef[Reply], token: InseeTokenResponse) extends Command
private case class FetchTokenFailed(error: Throwable, replyTo: ActorRef[Reply]) extends Command

sealed trait Reply
case class GotToken(token: InseeTokenResponse) extends Reply
case class TokenError(error: Throwable) extends Reply

def apply(inseeClient: InseeClient): Behavior[Command] =
Behaviors.withStash(100) { buffer =>
Behaviors.setup[Command] { context =>
new InseeTokenActor(context, buffer, inseeClient).noToken()
}
}
}

class InseeTokenActor(
context: ActorContext[InseeTokenActor.Command],
buffer: StashBuffer[InseeTokenActor.Command],
inseeClient: InseeClient
) {

val logger: Logger = Logger(this.getClass)

import context.executionContext
implicit private val scheduler: Scheduler = context.system.scheduler.toClassic

private def noToken(): Behaviors.Receive[InseeTokenActor.Command] =
Behaviors.receiveMessagePartial { case GetToken(replyTo) =>
logger.debug("Requesting a token while no token in cache, switch to fetching state")
context.self ! FetchToken(replyTo)
fetchToken()

}

private def fetchToken(): Behaviors.Receive[InseeTokenActor.Command] = Behaviors.receiveMessage {
case FetchToken(replyTo) =>
logger.debug("Fetching a fresh new token")
val authenticateWithRetry = retry(() => inseeClient.generateToken(), 2, 500.milliseconds)
context.pipeToSelf(authenticateWithRetry) {
case Success(value) => FetchTokenSuccess(replyTo, value)
case Failure(error) => FetchTokenFailed(error, replyTo)
}
fetchedToken()
case other =>
buffer.stash(other): Unit
Behaviors.same
}

private def fetchedToken(): Behaviors.Receive[InseeTokenActor.Command] = Behaviors.receiveMessage {
case FetchTokenSuccess(replyTo, value) =>
logger.debug("Successfully fetched token")
replyTo ! GotToken(value)
buffer.unstashAll(hasToken(value))
case FetchTokenFailed(error, replyTo) =>
logger.debug(s"Fail to fetch token", error)
replyTo ! TokenError(error)
buffer.unstashAll(noToken())
case other =>
buffer.stash(other): Unit
Behaviors.same
}

private def hasToken(token: InseeTokenResponse): Behaviors.Receive[InseeTokenActor.Command] =
Behaviors.receiveMessagePartial {
case GetToken(replyTo) =>
logger.debug("Token requested")
replyTo ! GotToken(token)
Behaviors.same
case RenewToken(replyTo) =>
logger.debug("Renew token requested, switch to fetching state")
context.self ! FetchToken(replyTo)
fetchToken()
}

}
95 changes: 50 additions & 45 deletions app/clients/InseeClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package clients
import cats.syntax.either._
import clients.InseeClient.EtablissementPageSize
import clients.InseeClient.InitialCursor
import clients.InseeClient.InseeClientError
import clients.InseeClient.LastModifiedField
import clients.InseeClient.TokenExpired
import clients.InseeClient.WildCardPeriod
import config.InseeTokenConfiguration
import controllers.error.InseeEtablissementError
Expand Down Expand Up @@ -40,9 +42,10 @@ trait InseeClient {

def generateToken(): Future[InseeTokenResponse]
def getEtablissement(
token: InseeTokenResponse,
query: InseeEtablissementQuery,
cursor: Option[String] = None
): Future[InseeEtablissementResponse]
cursor: Option[String]
): Future[Either[InseeClientError, InseeEtablissementResponse]]
}

class InseeClientImpl(inseeConfiguration: InseeTokenConfiguration)(implicit ec: ExecutionContext) extends InseeClient {
Expand All @@ -58,11 +61,16 @@ class InseeClientImpl(inseeConfiguration: InseeTokenConfiguration)(implicit ec:
override def generateToken(): Future[InseeTokenResponse] = {
val response: Future[Response[Either[ResponseException[String, JsError], InseeTokenResponse]]] =
basicRequest
.post(uri"https://api.insee.fr/token")
.body("grant_type=client_credentials")
.contentType("")
.auth
.basic(inseeConfiguration.key, inseeConfiguration.secret)
.post(uri"https://auth.insee.net/auth/realms/apim-gravitee/protocol/openid-connect/token")
.body(
Map(
"grant_type" -> "password",
"client_id" -> inseeConfiguration.clientId,
"client_secret" -> inseeConfiguration.clientSecret,
"username" -> inseeConfiguration.username,
"password" -> inseeConfiguration.password
)
)
.response(asJson[InseeTokenResponse])
.send(backend)

Expand All @@ -78,23 +86,19 @@ class InseeClientImpl(inseeConfiguration: InseeTokenConfiguration)(implicit ec:
}

override def getEtablissement(
token: InseeTokenResponse,
query: InseeEtablissementQuery,
cursor: Option[String]
): Future[InseeEtablissementResponse] = {
): Future[Either[InseeClientError, InseeEtablissementResponse]] = {

val req: RequestT[Identity, Either[String, String], Any] = basicRequest
.get(buildUri(query.beginPeriod, cursor, query.endPeriod, query.siret, query.disclosedStatus))
.auth
.bearer(query.token.accessToken.value)
.bearer(token.accessToken.value)

logger.debug(req.toCurl(Set("Authorization")))

val response: Future[Response[Either[ResponseException[String, JsError], InseeEtablissementResponse]]] =
sendRequest(req)

response
.map(_.body)
.flatMap(r => r.liftTo[Future])
sendRequest(req)
}

private def buildUri(
Expand All @@ -117,7 +121,7 @@ class InseeClientImpl(inseeConfiguration: InseeTokenConfiguration)(implicit ec:
.getOrElse("")}${siret.map(s => s" AND siret:$s").getOrElse("")}"""
)

uri"https://api.insee.fr/entreprises/sirene/V3.11/siret"
uri"https://api.insee.fr/api-sirene/prive/3.11/siret"
.addQuerySegment(searchQueryParam)
.addQuerySegment(cursorQueryParam)
.addQuerySegment(sortQueryParam)
Expand All @@ -126,41 +130,39 @@ class InseeClientImpl(inseeConfiguration: InseeTokenConfiguration)(implicit ec:

def sendRequest(
req: RequestT[Identity, Either[String, String], Any]
): Future[Response[Either[ResponseException[String, JsError], InseeEtablissementResponse]]] = {

def response(): Future[Response[Either[ResponseException[String, JsError], InseeEtablissementResponse]]] = req
): Future[Either[InseeClientError, InseeEtablissementResponse]] =
req
.response(asJson[InseeEtablissementResponse])
.send(backend)

response().flatMap { r =>
logger.info(s"Response : ${r.show(includeBody = false)}")
logger.trace(s"Response : ${r.show()}")
if (r.isSuccess) {
Future.successful(r)
} else {
r.code match {
case StatusCode.TooManyRequests =>
logger.debug("Reaching API threshold (30 request/min) , waiting a bit to recover")
Thread.sleep(60000)
response()
case StatusCode.NotFound =>
Future.failed(
InseeEtablissementError(
s"No result found : ${r.body.swap.map(_.getMessage)}"
.flatMap { r =>
logger.info(s"Response : ${r.show(includeBody = false)}")
logger.trace(s"Response : ${r.show()}")
if (r.isSuccess) {
r.body.liftTo[Future].map(Right(_))
} else {
r.code match {
case StatusCode.TooManyRequests =>
logger.debug("Reaching API threshold (30 request/min) , waiting a bit to recover")
Thread.sleep(60000)
sendRequest(req)
case StatusCode.NotFound =>
Future.failed(
InseeEtablissementError(
s"No result found : ${r.body.swap.map(_.getMessage)}"
)
)
)
case failedStatusCode =>
logger.error(s" Failed status $failedStatusCode error ${r.show()}")
Future.failed(
InseeEtablissementError(
s"Etablissement call failed with status code : ${r.show()}"
case StatusCode.Unauthorized | StatusCode.Forbidden =>
Future.successful(Left(TokenExpired))
case failedStatusCode =>
logger.error(s" Failed status $failedStatusCode error ${r.show()}")
Future.failed(
InseeEtablissementError(
s"Etablissement call failed with status code : ${r.show()}"
)
)
)
}
}
}
}

}
}

object InseeClient {
Expand All @@ -170,4 +172,7 @@ object InseeClient {
val LastModifiedField = "dateDernierTraitementEtablissement"
val WildCardPeriod = "*"

sealed trait InseeClientError
case object TokenExpired extends InseeClientError
case object RateLimitExceeded extends InseeClientError
}
2 changes: 1 addition & 1 deletion app/config/SignalConsoConfiguration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ case class SignalConsoConfiguration(
filterNonDisclosed: Boolean
)

case class InseeTokenConfiguration(key: String, secret: String)
case class InseeTokenConfiguration(clientId: String, clientSecret: String, username: String, password: String)

object SignalConsoConfiguration {

Expand Down
1 change: 0 additions & 1 deletion app/models/insee/token/InseeEtablissementQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import models.insee.etablissement.DisclosedStatus
import java.time.OffsetDateTime

case class InseeEtablissementQuery(
token: InseeTokenResponse,
beginPeriod: Option[OffsetDateTime] = None,
endPeriod: Option[OffsetDateTime] = None,
siret: Option[SIRET] = None,
Expand Down
Loading

0 comments on commit a456919

Please sign in to comment.