Skip to content

Commit

Permalink
feat: Add new Bigquery Table for Computation Participant Stages (#1805)
Browse files Browse the repository at this point in the history
feat: Add new Bigquery Table for Computation Participant Stages
  • Loading branch information
tristanvuong2021 authored Nov 7, 2024
1 parent a09f087 commit edd90c9
Show file tree
Hide file tree
Showing 6 changed files with 929 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/main/k8s/dev/kingdom_gke.cue
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ kingdom: #Kingdom & {
"--latest-measurement-read-table=latest_measurement_read",
"--requisitions-table=requisitions",
"--latest-requisition-read-table=latest_requisition_read",
"--computation-participant-stages-table=computation_participant_stages",
"--latest-computation-read-table=latest_computation_read",
"--tls-cert-file=/var/run/secrets/files/kingdom_tls.pem",
"--tls-key-file=/var/run/secrets/files/kingdom_tls.key",
"--cert-collection-file=/var/run/secrets/files/kingdom_root.pem",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.google.protobuf.util.Timestamps
import com.google.rpc.Code
import io.grpc.StatusException
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.logging.Logger
import kotlinx.coroutines.flow.catch
import org.jetbrains.annotations.Blocking
Expand All @@ -38,21 +39,28 @@ import org.wfanet.measurement.api.v2alpha.MeasurementSpec
import org.wfanet.measurement.common.identity.apiIdToExternalId
import org.wfanet.measurement.common.identity.externalIdToApiId
import org.wfanet.measurement.common.toInstant
import org.wfanet.measurement.internal.kingdom.DuchyMeasurementLogEntry
import org.wfanet.measurement.internal.kingdom.Measurement
import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt
import org.wfanet.measurement.internal.kingdom.Requisition
import org.wfanet.measurement.internal.kingdom.RequisitionsGrpcKt
import org.wfanet.measurement.internal.kingdom.StreamMeasurementsRequestKt
import org.wfanet.measurement.internal.kingdom.StreamRequisitionsRequestKt
import org.wfanet.measurement.internal.kingdom.bigquerytables.ComputationParticipantStagesTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.LatestComputationReadTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.LatestMeasurementReadTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.LatestRequisitionReadTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.MeasurementType
import org.wfanet.measurement.internal.kingdom.bigquerytables.MeasurementsTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.RequisitionsTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.computationParticipantStagesTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.copy
import org.wfanet.measurement.internal.kingdom.bigquerytables.latestComputationReadTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.latestMeasurementReadTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.latestRequisitionReadTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.measurementsTableRow
import org.wfanet.measurement.internal.kingdom.bigquerytables.requisitionsTableRow
import org.wfanet.measurement.internal.kingdom.computationKey
import org.wfanet.measurement.internal.kingdom.copy
import org.wfanet.measurement.internal.kingdom.measurementKey
import org.wfanet.measurement.internal.kingdom.streamMeasurementsRequest
Expand All @@ -67,13 +75,16 @@ class OperationalMetricsExport(
private val datasetId: String,
private val latestMeasurementReadTableId: String,
private val latestRequisitionReadTableId: String,
private val latestComputationReadTableId: String,
private val measurementsTableId: String,
private val requisitionsTableId: String,
private val computationParticipantStagesTableId: String,
private val streamWriterFactory: StreamWriterFactory = StreamWriterFactoryImpl(),
) {
suspend fun execute() {
exportMeasurements()
exportRequisitions()
exportComputationParticipants()
}

private suspend fun exportMeasurements() {
Expand Down Expand Up @@ -434,6 +445,243 @@ class OperationalMetricsExport(
}
}

private suspend fun exportComputationParticipants() {
var computationsQueryResponseSize: Int

val query =
"""
SELECT update_time, external_computation_id
FROM `$datasetId.$latestComputationReadTableId`
ORDER BY update_time DESC, external_computation_id DESC
LIMIT 1
"""
.trimIndent()

val queryJobConfiguration: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(query).build()

val results = bigQuery.query(queryJobConfiguration).iterateAll()
logger.info("Retrieved latest computation read info from BigQuery")

val latestComputationReadFromPreviousJob: FieldValueList? = results.firstOrNull()

var streamComputationsRequest = streamMeasurementsRequest {
measurementView = Measurement.View.COMPUTATION_STATS
limit = BATCH_SIZE
filter =
StreamMeasurementsRequestKt.filter {
states += Measurement.State.SUCCEEDED
states += Measurement.State.FAILED
if (latestComputationReadFromPreviousJob != null) {
after =
StreamMeasurementsRequestKt.FilterKt.after {
updateTime =
Timestamps.fromNanos(
latestComputationReadFromPreviousJob.get("update_time").longValue
)
computation = computationKey {
externalComputationId =
latestComputationReadFromPreviousJob.get("external_computation_id").longValue
}
}
}
}
}

DataWriter(
projectId = projectId,
datasetId = datasetId,
tableId = computationParticipantStagesTableId,
client = bigQueryWriteClient,
protoSchema =
ProtoSchemaConverter.convert(ComputationParticipantStagesTableRow.getDescriptor()),
streamWriterFactory = streamWriterFactory,
)
.use { computationParticipantStagesDataWriter ->
DataWriter(
projectId = projectId,
datasetId = datasetId,
tableId = latestComputationReadTableId,
client = bigQueryWriteClient,
protoSchema =
ProtoSchemaConverter.convert(LatestComputationReadTableRow.getDescriptor()),
streamWriterFactory = streamWriterFactory,
)
.use { latestComputationReadDataWriter ->
do {
computationsQueryResponseSize = 0

val computationParticipantStagesProtoRowsBuilder: ProtoRows.Builder =
ProtoRows.newBuilder()
var latestComputation: Measurement = Measurement.getDefaultInstance()

measurementsClient
.streamMeasurements(streamComputationsRequest)
.catch { e ->
if (e is StatusException) {
logger.warning("Failed to retrieved Computations")
throw e
}
}
.collect { measurement ->
computationsQueryResponseSize++
latestComputation = measurement

if (measurement.externalComputationId != 0L) {
val measurementType =
getMeasurementType(
measurement.details.measurementSpec,
measurement.details.apiVersion,
)

val measurementConsumerId =
externalIdToApiId(measurement.externalMeasurementConsumerId)
val measurementId = externalIdToApiId(measurement.externalMeasurementId)
val computationId = externalIdToApiId(measurement.externalComputationId)

val baseComputationParticipantStagesTableRow =
computationParticipantStagesTableRow {
this.measurementConsumerId = measurementConsumerId
this.measurementId = measurementId
this.computationId = computationId
this.measurementType = measurementType
}

// Map of ExternalDuchyId to log entries.
val logEntriesMap: Map<String, MutableList<DuchyMeasurementLogEntry>> =
buildMap {
for (logEntry in measurement.logEntriesList) {
val logEntries = getOrPut(logEntry.externalDuchyId) { mutableListOf() }
logEntries.add(logEntry)
}
}

for (computationParticipant in measurement.computationParticipantsList) {
val sortedStageLogEntries =
logEntriesMap[computationParticipant.externalDuchyId]?.sortedBy {
it.details.stageAttempt.stage
} ?: emptyList()

if (sortedStageLogEntries.isEmpty()) {
continue
}

sortedStageLogEntries.zipWithNext { logEntry, nextLogEntry ->
if (logEntry.details.stageAttempt.stageName.isNotBlank()) {
computationParticipantStagesProtoRowsBuilder.addSerializedRows(
baseComputationParticipantStagesTableRow
.copy {
duchyId = computationParticipant.externalDuchyId
result = ComputationParticipantStagesTableRow.Result.SUCCEEDED
stageName = logEntry.details.stageAttempt.stageName
stageStartTime = logEntry.details.stageAttempt.stageStartTime
completionDurationSeconds =
Duration.between(
logEntry.details.stageAttempt.stageStartTime.toInstant(),
nextLogEntry.details.stageAttempt.stageStartTime.toInstant(),
)
.seconds
completionDurationSecondsSquared =
completionDurationSeconds * completionDurationSeconds
}
.toByteString()
)
}
}

val logEntry = sortedStageLogEntries.last()
if (logEntry.details.stageAttempt.stageName.isBlank()) {
continue
}

if (measurement.state == Measurement.State.SUCCEEDED) {
computationParticipantStagesProtoRowsBuilder.addSerializedRows(
baseComputationParticipantStagesTableRow
.copy {
duchyId = computationParticipant.externalDuchyId
result = ComputationParticipantStagesTableRow.Result.SUCCEEDED
stageName = logEntry.details.stageAttempt.stageName
stageStartTime = logEntry.details.stageAttempt.stageStartTime
completionDurationSeconds =
Duration.between(
logEntry.details.stageAttempt.stageStartTime.toInstant(),
measurement.updateTime.toInstant(),
)
.seconds
completionDurationSecondsSquared =
completionDurationSeconds * completionDurationSeconds
}
.toByteString()
)
} else if (measurement.state == Measurement.State.FAILED) {
computationParticipantStagesProtoRowsBuilder.addSerializedRows(
baseComputationParticipantStagesTableRow
.copy {
duchyId = computationParticipant.externalDuchyId
result = ComputationParticipantStagesTableRow.Result.FAILED
stageName = logEntry.details.stageAttempt.stageName
stageStartTime = logEntry.details.stageAttempt.stageStartTime
completionDurationSeconds =
Duration.between(
logEntry.details.stageAttempt.stageStartTime.toInstant(),
measurement.updateTime.toInstant(),
)
.seconds
completionDurationSecondsSquared =
completionDurationSeconds * completionDurationSeconds
}
.toByteString()
)
}
}
}
}

logger.info("Computations read from the Kingdom Internal Server")

if (computationParticipantStagesProtoRowsBuilder.serializedRowsCount > 0) {
computationParticipantStagesDataWriter.appendRows(
computationParticipantStagesProtoRowsBuilder.build()
)

logger.info("Computation Participant Stages Metrics written to BigQuery")
// Possible for there to be no stages because all measurements in response are
// direct.
} else if (computationsQueryResponseSize == 0) {
logger.info("No more Computations to process")
break
}

val latestComputationReadTableRow = latestComputationReadTableRow {
updateTime = Timestamps.toNanos(latestComputation.updateTime)
externalComputationId = latestComputation.externalComputationId
}

latestComputationReadDataWriter.appendRows(
ProtoRows.newBuilder()
.addSerializedRows(latestComputationReadTableRow.toByteString())
.build()
)

streamComputationsRequest =
streamComputationsRequest.copy {
filter =
filter.copy {
after =
StreamMeasurementsRequestKt.FilterKt.after {
updateTime = latestComputation.updateTime
computation = computationKey {
externalComputationId =
latestComputationReadTableRow.externalComputationId
}
}
}
}
} while (computationsQueryResponseSize == BATCH_SIZE)
}
}
}

companion object {
private val logger: Logger = Logger.getLogger(this::class.java.name)
private const val BATCH_SIZE = 3000
Expand Down Expand Up @@ -515,10 +763,19 @@ class OperationalMetricsExport(
break
}
} catch (e: AppendSerializationError) {
logger.warning("Logging serialization errors")
for (value in e.rowIndexToErrorMessage.values) {
logger.warning(value)
}
throw e
} catch (e: ExecutionException) {
if (e.cause is AppendSerializationError) {
logger.warning("Logging serialization errors")
for (value in (e.cause as AppendSerializationError).rowIndexToErrorMessage.values) {
logger.warning(value)
}
}
throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ private fun run(
val datasetId = operationalMetricsFlags.bigQueryDataSet
val measurementsTableId = operationalMetricsFlags.measurementsTable
val requisitionsTableId = operationalMetricsFlags.requisitionsTable
val computationParticipantStagesTableId =
operationalMetricsFlags.computationParticipantStagesTable
val latestMeasurementReadTableId = operationalMetricsFlags.latestMeasurementReadTable
val latestRequisitionReadTableId = operationalMetricsFlags.latestRequisitionReadTable
val latestComputationReadTableId = operationalMetricsFlags.latestComputationReadTable

BigQueryWriteClient.create().use { bigQueryWriteClient ->
val operationalMetricsExport =
Expand All @@ -89,8 +92,10 @@ private fun run(
datasetId = datasetId,
latestMeasurementReadTableId = latestMeasurementReadTableId,
latestRequisitionReadTableId = latestRequisitionReadTableId,
latestComputationReadTableId = latestComputationReadTableId,
measurementsTableId = measurementsTableId,
requisitionsTableId = requisitionsTableId,
computationParticipantStagesTableId = computationParticipantStagesTableId,
)

operationalMetricsExport.execute()
Expand Down Expand Up @@ -133,6 +138,14 @@ class OperationalMetricsFlags {
lateinit var requisitionsTable: String
private set

@CommandLine.Option(
names = ["--computation-participant-stages-table"],
description = ["Computation Participant Stages table ID"],
required = true,
)
lateinit var computationParticipantStagesTable: String
private set

@CommandLine.Option(
names = ["--latest-measurement-read-table"],
description = ["Latest Measurement Read table ID"],
Expand All @@ -148,4 +161,12 @@ class OperationalMetricsFlags {
)
lateinit var latestRequisitionReadTable: String
private set

@CommandLine.Option(
names = ["--latest-computation-read-table"],
description = ["Latest Computation Read table ID"],
required = true,
)
lateinit var latestComputationReadTable: String
private set
}
Loading

0 comments on commit edd90c9

Please sign in to comment.