From e4e30f5c8b9697533362df1dc4c2caae30afd070 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Thu, 12 Dec 2024 19:04:47 -0800 Subject: [PATCH] Refactor ProcessorExecutionDetail to improve field handling Signed-off-by: Junwei Dai --- .../action/search/SearchResponse.java | 6 ++ .../action/search/SearchResponseSections.java | 3 +- .../pipeline/ProcessorExecutionDetail.java | 78 ++++++++++++++++--- 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 90d8232a82c8b..0d55fbf2e7f88 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -74,6 +74,7 @@ import java.util.function.Supplier; import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD; +import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -519,6 +520,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE extBuilders.add(searchExtBuilder); } } + } else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + while ((token = parser.nextToken()) != Token.END_ARRAY) { + ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser); + processorResult.add(detail); + } } else { parser.skipChildren(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java index fa4b0030148f5..5eb305d91ee04 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java @@ -66,6 +66,7 @@ public class SearchResponseSections implements ToXContentFragment { public static final ParseField EXT_FIELD = new ParseField("ext"); + public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results"); protected final SearchHits hits; protected final Aggregations aggregations; protected final Suggest suggest; @@ -181,7 +182,7 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) } if (!processorResult.isEmpty()) { - builder.field("processor_result", processorResult); + builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult); } return builder; } diff --git a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java index 3ad00c373c8b5..81d906b075097 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java +++ b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java @@ -8,16 +8,17 @@ package org.opensearch.search.pipeline; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -33,7 +34,10 @@ public class ProcessorExecutionDetail implements Writeable, ToXContentObject { private long durationMillis; private Object inputData; private Object outputData; - private static final Logger logger = LogManager.getLogger(ProcessorExecutionDetail.class); + public static final ParseField PROCESSOR_NAME_FIELD = new ParseField("processor_name"); + public static final ParseField DURATION_MILLIS_FIELD = new ParseField("duration_millis"); + public static final ParseField INPUT_DATA_FIELD = new ParseField("input_data"); + public static final ParseField OUTPUT_DATA_FIELD = new ParseField("output_data"); /** * Constructor for ProcessorExecutionDetail @@ -111,13 +115,10 @@ public void addTook(long durationMillis) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("processor_name", processorName); - builder.field("duration_millis", durationMillis); - - addFieldToXContent(builder, "input_data", inputData, params); - - addFieldToXContent(builder, "output_data", outputData, params); - + builder.field(PROCESSOR_NAME_FIELD.getPreferredName(), processorName); + builder.field(DURATION_MILLIS_FIELD.getPreferredName(), durationMillis); + addFieldToXContent(builder, INPUT_DATA_FIELD.getPreferredName(), inputData, params); + addFieldToXContent(builder, OUTPUT_DATA_FIELD.getPreferredName(), outputData, params); builder.endObject(); return builder; } @@ -161,6 +162,63 @@ public boolean equals(Object o) { && Objects.equals(outputData, that.outputData); } + public static ProcessorExecutionDetail fromXContent(XContentParser parser) throws IOException { + String processorName = null; + long durationMillis = 0; + Object inputData = null; + Object outputData = null; + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + if (PROCESSOR_NAME_FIELD.match(fieldName, parser.getDeprecationHandler())) { + processorName = parser.text(); + } else if (DURATION_MILLIS_FIELD.match(fieldName, parser.getDeprecationHandler())) { + durationMillis = parser.longValue(); + } else if (INPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) { + inputData = parseFieldFromXContent(parser); + } else if (OUTPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) { + outputData = parseFieldFromXContent(parser); + } else { + parser.skipChildren(); + } + } + + if (processorName == null) { + throw new IllegalArgumentException("Processor name is required"); + } + + return new ProcessorExecutionDetail(processorName, durationMillis, inputData, outputData); + } + + private static Object parseFieldFromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == XContentParser.Token.VALUE_NULL) { + return null; + } else if (token == XContentParser.Token.START_ARRAY) { + return parseArrayFromXContent(parser); + } else if (token == XContentParser.Token.START_OBJECT) { + return parser.map(); + } else { + return parser.textOrNull(); + } + } + + private static List parseArrayFromXContent(XContentParser parser) throws IOException { + List list = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + list.add(parser.map()); + } else if (parser.currentToken() == XContentParser.Token.START_ARRAY) { + list.add(parseArrayFromXContent(parser)); + } else { + list.add(parser.textOrNull()); + } + } + return list; + } + @Override public int hashCode() { return Objects.hash(processorName, durationMillis, inputData, outputData);