From f58714ffa08a7f281853e4085c9bd6d2b6a862dd Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Tue, 6 Feb 2024 14:33:21 -0800 Subject: [PATCH] Index the `_id` field with encoding to be searchable (#762) * add a new ID field * typo --- .../opensearch/OpenSearchAdapter.java | 2 ++ .../SchemaAwareLogDocumentBuilderImpl.java | 9 +++++- .../kaldb/metadata/schema/FieldType.java | 29 ++++++++++++++++++- .../opensearch/OpenSearchAdapterTest.java | 15 ++++++++++ .../kaldb/logstore/schema/DropPolicyTest.java | 4 +-- .../search/LogIndexSearcherImplTest.java | 16 ++++++++++ 6 files changed, 71 insertions(+), 4 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java index 9c5f43887f..07d684a6d1 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java @@ -220,6 +220,8 @@ public void reloadSchema() { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "text")); } else if (entry.getValue().fieldType == FieldType.STRING) { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword")); + } else if (entry.getValue().fieldType == FieldType.ID) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword")); } else if (entry.getValue().fieldType == FieldType.INTEGER) { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "integer")); } else if (entry.getValue().fieldType == FieldType.LONG) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index 16edce5ea5..d98dd18b2e 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -59,7 +59,6 @@ public static ImmutableMap getDefaultLuceneFieldDefiniti } String[] fieldsAsString = { - LogMessage.SystemField.ID.fieldName, LogMessage.SystemField.INDEX.fieldName, LogMessage.ReservedField.TYPE.fieldName, LogMessage.ReservedField.HOSTNAME.fieldName, @@ -78,6 +77,14 @@ public static ImmutableMap getDefaultLuceneFieldDefiniti fieldName, new LuceneFieldDef(fieldName, FieldType.STRING.name, false, true, true)); } + String[] fieldsAsIds = { + LogMessage.SystemField.ID.fieldName, + }; + for (String fieldName : fieldsAsIds) { + fieldDefBuilder.put( + fieldName, new LuceneFieldDef(fieldName, FieldType.ID.name, false, true, true)); + } + String[] fieldsAsLong = { LogMessage.ReservedField.DURATION_MS.fieldName, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java b/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java index f119ef2103..76fb1388da 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java @@ -28,6 +28,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.util.BytesRef; +import org.opensearch.index.mapper.Uid; /** The FieldType enum describes the types of fields in a chunk. */ public enum FieldType { @@ -81,6 +82,32 @@ public Analyzer getAnalyzer(boolean quoted) { return KEYWORD_ANALYZER; } }, + ID("id") { + @Override + public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { + BytesRef id = Uid.encodeId((String) value); + if (fieldDef.isIndexed) { + doc.add(new StringField(name, id, getStoreEnum(fieldDef.isStored))); + } + if (fieldDef.isStored) { + doc.add(new StoredField(name, (String) value)); + } + if (fieldDef.storeDocValue) { + doc.add(new SortedDocValuesField(name, id)); + } + } + + @Override + public Query termQuery(String field, String queryText, Analyzer analyzer) { + final Term term = new Term(field, queryText); + return new TermQuery(term); + } + + @Override + public Analyzer getAnalyzer(boolean quoted) { + return KEYWORD_ANALYZER; + } + }, INTEGER("integer") { @Override public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { @@ -373,7 +400,7 @@ private static Field.Store getStoreEnum(boolean isStored) { // Aliased Field Types are FieldTypes that can be considered as same type from a field conflict // detection perspective public static final List> ALIASED_FIELD_TYPES = - ImmutableList.of(ImmutableSet.of(FieldType.STRING, FieldType.TEXT)); + ImmutableList.of(ImmutableSet.of(FieldType.STRING, FieldType.TEXT, FieldType.ID)); public static boolean areTypeAliasedFieldTypes(FieldType type1, FieldType type2) { for (Set s : ALIASED_FIELD_TYPES) { diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java index 38a5502316..58e71f83b7 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java @@ -30,8 +30,10 @@ import org.apache.lucene.search.IndexSortSortedNumericDocValuesRangeQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.index.mapper.Uid; import org.opensearch.search.aggregations.AbstractAggregationBuilder; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.InternalAggregation; @@ -404,6 +406,19 @@ public void handlesDateHistogramExtendedBoundsMinDocEdgeCases() throws IOExcepti .isThrownBy(() -> collectorManager.newCollector()); } + @Test + public void shouldParseIdFieldSearch() throws Exception { + String idField = "_id"; + String idValue = "1"; + IndexSearcher indexSearcher = logStoreAndSearcherRule.logStore.getSearcherManager().acquire(); + Query idQuery = + openSearchAdapter.buildQuery("foo", STR."\{idField}:\{idValue}", null, null, indexSearcher); + BytesRef queryStrBytes = new BytesRef(Uid.encodeId("1").bytes); + // idQuery.toString="#_id:([fe 1f])" + // queryStrBytes.toString="[fe 1f]" + assertThat(idQuery.toString()).contains(queryStrBytes.toString()); + } + @Test public void shouldExcludeDateFilterWhenNullTimestamps() throws Exception { IndexSearcher indexSearcher = logStoreAndSearcherRule.logStore.getSearcherManager().acquire(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java index e393f29348..10448628aa 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java @@ -50,7 +50,7 @@ public void testBasicDocumentCreation() throws IOException { assertThat(MetricsUtil.getCount(CONVERT_FIELD_VALUE_COUNTER, meterRegistry)).isZero(); assertThat(MetricsUtil.getCount(CONVERT_AND_DUPLICATE_FIELD_COUNTER, meterRegistry)).isZero(); // Only string fields have doc values not text fields. - assertThat(docBuilder.getSchema().get("_id").fieldType.name).isEqualTo(FieldType.STRING.name); + assertThat(docBuilder.getSchema().get("_id").fieldType.name).isEqualTo(FieldType.ID.name); assertThat( testDocument.getFields().stream() .filter( @@ -96,7 +96,7 @@ public void testBasicDocumentCreationWithoutFullTextSearch() throws IOException assertThat(MetricsUtil.getCount(CONVERT_FIELD_VALUE_COUNTER, meterRegistry)).isZero(); assertThat(MetricsUtil.getCount(CONVERT_AND_DUPLICATE_FIELD_COUNTER, meterRegistry)).isZero(); // Only string fields have doc values not text fields. - assertThat(docBuilder.getSchema().get("_id").fieldType.name).isEqualTo(FieldType.STRING.name); + assertThat(docBuilder.getSchema().get("_id").fieldType.name).isEqualTo(FieldType.ID.name); assertThat( testDocument.getFields().stream() .filter( diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java index 575fbc2ab3..966c030bc9 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java @@ -1725,4 +1725,20 @@ public void testConcurrentSearches() throws InterruptedException { assertThat(searchFailures.get()).isEqualTo(0); assertThat(successfulRuns.get()).isEqualTo(200); } + + @Test + public void testSearchById() { + Instant time = Instant.ofEpochSecond(1593365471); + loadTestData(time); + SearchResult index = + strictLogStore.logSearcher.search( + TEST_DATASET_NAME, + "_id:1", + time.toEpochMilli(), + time.plusSeconds(2).toEpochMilli(), + 10, + new DateHistogramAggBuilder( + "1", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, "1s")); + assertThat(index.hits.size()).isEqualTo(1); + } }