diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 9d5621a8283..05dd7fda907 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -105,6 +105,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin this.operationException = opEx } + def getOperationJobProgress: TProgressUpdateResp = operationJobProgress def setOperationJobProgress(opJobProgress: TProgressUpdateResp): Unit = { this.operationJobProgress = opJobProgress } diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/KyuubiOperationEvent.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/KyuubiOperationEvent.java index 13c40eecf78..ec583954216 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/KyuubiOperationEvent.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/KyuubiOperationEvent.java @@ -51,6 +51,8 @@ public class KyuubiOperationEvent { private Map metrics; + private OperationProgress progress; + public KyuubiOperationEvent() {} public KyuubiOperationEvent( @@ -68,7 +70,8 @@ public KyuubiOperationEvent( String sessionUser, String sessionType, String kyuubiInstance, - Map metrics) { + Map metrics, + OperationProgress progress) { this.statementId = statementId; this.remoteId = remoteId; this.statement = statement; @@ -84,6 +87,7 @@ public KyuubiOperationEvent( this.sessionType = sessionType; this.kyuubiInstance = kyuubiInstance; this.metrics = metrics; + this.progress = progress; } public static KyuubiOperationEvent.KyuubiOperationEventBuilder builder() { @@ -121,6 +125,8 @@ public static class KyuubiOperationEventBuilder { private Map metrics; + private OperationProgress progress; + public KyuubiOperationEventBuilder() {} public KyuubiOperationEvent.KyuubiOperationEventBuilder statementId(final String statementId) { @@ -201,6 +207,12 @@ public KyuubiOperationEvent.KyuubiOperationEventBuilder metrics( return this; } + public KyuubiOperationEvent.KyuubiOperationEventBuilder progress( + final OperationProgress progress) { + this.progress = progress; + return this; + } + public KyuubiOperationEvent build() { return new KyuubiOperationEvent( statementId, @@ -217,7 +229,8 @@ public KyuubiOperationEvent build() { sessionUser, sessionType, kyuubiInstance, - metrics); + metrics, + progress); } } @@ -340,4 +353,12 @@ public Map getMetrics() { public void setMetrics(Map metrics) { this.metrics = metrics; } + + public OperationProgress getProgress() { + return progress; + } + + public void setProgress(OperationProgress progress) { + this.progress = progress; + } } diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationData.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationData.java index dd3b302640b..8e5bafc6e28 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationData.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationData.java @@ -37,6 +37,7 @@ public class OperationData { private String sessionType; private String kyuubiInstance; private Map metrics; + private OperationProgress progress; public OperationData() {} @@ -53,7 +54,8 @@ public OperationData( String sessionUser, String sessionType, String kyuubiInstance, - Map metrics) { + Map metrics, + OperationProgress progress) { this.identifier = identifier; this.remoteId = remoteId; this.statement = statement; @@ -67,6 +69,7 @@ public OperationData( this.sessionType = sessionType; this.kyuubiInstance = kyuubiInstance; this.metrics = metrics; + this.progress = progress; } public String getIdentifier() { @@ -176,6 +179,14 @@ public void setMetrics(Map metrics) { this.metrics = metrics; } + public OperationProgress getProgress() { + return progress; + } + + public void setProgress(OperationProgress progress) { + this.progress = progress; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationProgress.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationProgress.java new file mode 100644 index 00000000000..8668f2f30f7 --- /dev/null +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/OperationProgress.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.client.api.v1.dto; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class OperationProgress { + private List headerNames; + private List> rows; + private double progressedPercentage; + private String status; + private String footerSummary; + private long startTime; + + public OperationProgress() {} + + public OperationProgress( + List headerNames, + List> rows, + double progressedPercentage, + String status, + String footerSummary, + long startTime) { + this.headerNames = headerNames; + this.rows = rows; + this.progressedPercentage = progressedPercentage; + this.status = status; + this.footerSummary = footerSummary; + this.startTime = startTime; + } + + public List getHeaderNames() { + if (headerNames == null) { + return Collections.emptyList(); + } + return headerNames; + } + + public void setHeaderNames(List headerNames) { + this.headerNames = headerNames; + } + + public List> getRows() { + if (rows == null) { + return Collections.emptyList(); + } + return rows; + } + + public void setRows(List> rows) { + this.rows = rows; + } + + public double getProgressedPercentage() { + return progressedPercentage; + } + + public void setProgressedPercentage(double progressedPercentage) { + this.progressedPercentage = progressedPercentage; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getFooterSummary() { + return footerSummary; + } + + public void setFooterSummary(String footerSummary) { + this.footerSummary = footerSummary; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OperationProgress that = (OperationProgress) o; + return Double.compare(getProgressedPercentage(), that.getProgressedPercentage()) == 0 + && getStartTime() == that.getStartTime() + && Objects.equals(getHeaderNames(), that.getHeaderNames()) + && Objects.equals(getRows(), that.getRows()) + && Objects.equals(getStatus(), that.getStatus()) + && Objects.equals(getFooterSummary(), that.getFooterSummary()); + } + + @Override + public int hashCode() { + return Objects.hash( + getHeaderNames(), + getRows(), + getProgressedPercentage(), + getStatus(), + getFooterSummary(), + getStartTime()); + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE); + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala index 04a12623575..49442160878 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala @@ -20,13 +20,35 @@ package org.apache.kyuubi.server.api import scala.collection.JavaConverters._ import org.apache.kyuubi.{Logging, Utils} -import org.apache.kyuubi.client.api.v1.dto.{OperationData, ServerData, SessionData} +import org.apache.kyuubi.client.api.v1.dto +import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress, ServerData, SessionData} import org.apache.kyuubi.events.KyuubiOperationEvent import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.KyuubiOperation import org.apache.kyuubi.session.KyuubiSession object ApiUtils extends Logging { + def sessionEvent(session: KyuubiSession): dto.KyuubiSessionEvent = { + session.getSessionEvent.map(event => + dto.KyuubiSessionEvent.builder() + .sessionId(event.sessionId) + .clientVersion(event.clientVersion) + .sessionType(event.sessionType) + .sessionName(event.sessionName) + .user(event.user) + .clientIp(event.clientIP) + .serverIp(event.serverIP) + .conf(event.conf.asJava) + .remoteSessionId(event.remoteSessionId) + .engineId(event.engineId) + .eventTime(event.eventTime) + .openedTime(event.openedTime) + .startTime(event.startTime) + .endTime(event.endTime) + .totalOperations(event.totalOperations) + .exception(event.exception.orNull) + .build()).orNull + } def sessionData(session: KyuubiSession): SessionData = { val sessionEvent = session.getSessionEvent @@ -45,6 +67,40 @@ object ApiUtils extends Logging { sessionEvent.map(_.engineId).getOrElse("")) } + private def operationProgress(operation: KyuubiOperation): OperationProgress = { + Option(operation.getOperationJobProgress).map { jobProgress => + new OperationProgress( + jobProgress.getHeaderNames, + jobProgress.getRows, + jobProgress.getProgressedPercentage, + jobProgress.getStatus.toString, + jobProgress.getFooterSummary, + jobProgress.getStartTime) + }.orNull + } + + def operationEvent(operation: KyuubiOperation): dto.KyuubiOperationEvent = { + val opEvent = KyuubiOperationEvent(operation) + dto.KyuubiOperationEvent.builder() + .statementId(opEvent.statementId) + .remoteId(opEvent.remoteId) + .statement(opEvent.statement) + .shouldRunAsync(opEvent.shouldRunAsync) + .state(opEvent.state) + .eventTime(opEvent.eventTime) + .createTime(opEvent.createTime) + .startTime(opEvent.startTime) + .completeTime(opEvent.completeTime) + .exception(opEvent.exception.orNull) + .sessionId(opEvent.sessionId) + .sessionUser(opEvent.sessionUser) + .sessionType(opEvent.sessionType) + .kyuubiInstance(opEvent.kyuubiInstance) + .metrics(opEvent.metrics.asJava) + .progress(operationProgress(operation)) + .build() + } + def operationData(operation: KyuubiOperation): OperationData = { val opEvent = KyuubiOperationEvent(operation) new OperationData( @@ -60,7 +116,8 @@ object ApiUtils extends Logging { opEvent.sessionUser, opEvent.sessionType, operation.getSession.asInstanceOf[KyuubiSession].connectionUrl, - operation.metrics.asJava) + operation.metrics.asJava, + operationProgress(operation)) } def serverData(nodeInfo: ServiceNodeInfo): ServerData = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala index d1ae01e7f27..e7a15ab9293 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala @@ -29,7 +29,6 @@ import io.swagger.v3.oas.annotations.tags.Tag import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.client.api.v1.dto._ -import org.apache.kyuubi.events.KyuubiOperationEvent import org.apache.kyuubi.operation.{FetchOrientation, KyuubiOperation, OperationHandle} import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils} import org.apache.kyuubi.shaded.hive.service.rpc.thrift._ @@ -54,7 +53,7 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging { try { val opHandle = OperationHandle(operationHandleStr) val operation = fe.be.sessionManager.operationManager.getOperation(opHandle) - KyuubiOperationEvent(operation.asInstanceOf[KyuubiOperation]) + ApiUtils.operationEvent(operation.asInstanceOf[KyuubiOperation]) } catch { case NonFatal(e) => val errorMsg = "Error getting an operation event" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala index a5f1ab95fb1..928bb207a1e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala @@ -69,26 +69,7 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging { @Path("{sessionHandle}") def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): dto.KyuubiSessionEvent = { try { - sessionManager.getSession(sessionHandleStr) - .asInstanceOf[KyuubiSession].getSessionEvent.map(event => - dto.KyuubiSessionEvent.builder - .sessionId(event.sessionId) - .clientVersion(event.clientVersion) - .sessionType(event.sessionType) - .sessionName(event.sessionName) - .user(event.user) - .clientIp(event.clientIP) - .serverIp(event.serverIP) - .conf(event.conf.asJava) - .remoteSessionId(event.remoteSessionId) - .engineId(event.engineId) - .eventTime(event.eventTime) - .openedTime(event.openedTime) - .startTime(event.startTime) - .endTime(event.endTime) - .totalOperations(event.totalOperations) - .exception(event.exception.orNull) - .build).get + ApiUtils.sessionEvent(sessionManager.getSession(sessionHandleStr).asInstanceOf[KyuubiSession]) } catch { case NonFatal(e) => val errorMsg = s"Invalid $sessionHandleStr" diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala index e94632a3d9e..c4d67ad6211 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala @@ -27,9 +27,9 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper} +import org.apache.kyuubi.client.api.v1.dto import org.apache.kyuubi.client.api.v1.dto._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.events.KyuubiOperationEvent import org.apache.kyuubi.operation.{ExecuteStatement, OperationState} import org.apache.kyuubi.operation.OperationState.{FINISHED, OperationState} import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2 @@ -205,6 +205,23 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper assert(logRowSet.getRowCount == 1) } + test("support to return operation progress for REST api") { + val sessionHandle = fe.be.openSession( + HIVE_CLI_SERVICE_PROTOCOL_V2, + "admin", + "123456", + "localhost", + Map(KyuubiConf.SESSION_PROGRESS_ENABLE.key -> "true")) + val op = fe.be.executeStatement(sessionHandle, "show tables", Map.empty, runAsync = true, 3000) + eventually(Timeout(5.seconds)) { + val response = webTarget.path(s"api/v1/operations/${op.identifier}/event") + .request(MediaType.APPLICATION_JSON_TYPE).get() + assert(response.getStatus === 200) + val operationEvent = response.readEntity(classOf[dto.KyuubiOperationEvent]) + assert(operationEvent.getProgress != null) + } + } + def getOpHandleStr(statement: String = "show tables"): String = { val sessionHandle = fe.be.openSession( HIVE_CLI_SERVICE_PROTOCOL_V2, @@ -228,8 +245,8 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper val response = webTarget.path(s"api/v1/operations/$opHandleStr/event") .request(MediaType.APPLICATION_JSON_TYPE).get() assert(response.getStatus === 200) - val operationEvent = response.readEntity(classOf[KyuubiOperationEvent]) - assert(operationEvent.state === state.name()) + val operationEvent = response.readEntity(classOf[dto.KyuubiOperationEvent]) + assert(operationEvent.getState === state.name()) } } }