Skip to content

Commit

Permalink
The YarnClientTrait add clusterClient close (#2906)
Browse files Browse the repository at this point in the history
* The YarnClientTrait add clusterClient close

* Optimized code
  • Loading branch information
ChengJie1053 authored Jul 29, 2023
1 parent 99fcf29 commit b71e094
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ trait YarnClientTrait extends FlinkClientTrait {
request: R,
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {
val jobID = getJobID(request.jobId)
val clusterClient = {

Utils.using {
flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
val clusterClientFactory = new YarnClusterClientFactory
val applicationId = clusterClientFactory.getClusterId(flinkConf)
Expand All @@ -53,15 +53,15 @@ trait YarnClientTrait extends FlinkClientTrait {
}
val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf)
clusterDescriptor.retrieve(applicationId).getClusterClient
} {
client =>
Try(actionFunc(getJobID(request.jobId), client)).recover {
case e =>
throw new FlinkException(
s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " +
s"detail: ${Utils.stringifyException(e)}");
}.get
}
Try {
actionFunc(jobID, clusterClient)
}.recover {
case e =>
throw new FlinkException(
s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " +
s"detail: ${Utils.stringifyException(e)}");
}.get
}

override def doTriggerSavepoint(
Expand Down

0 comments on commit b71e094

Please sign in to comment.