From 8791cc3000e25de1dbdfb8f380251e94a39223f2 Mon Sep 17 00:00:00 2001 From: Ekaterina Dimitrova Date: Fri, 13 Sep 2024 14:34:21 -0400 Subject: [PATCH 1/2] Slow SAI queries logging Configuration/tooling needs to be finished; tests should be updated as per the comments. Any comment starting with KATE: is a placeholder for update to be done --- .../config/CassandraRelevantProperties.java | 8 +- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 14 ++ .../cassandra/db/monitoring/SaiSlowLog.java | 193 ++++++++++++++++++ .../db/monitoring/SaiSlowestQueriesQueue.java | 160 +++++++++++++++ .../cassandra/index/sai/QueryContext.java | 13 ++ .../index/sai/plan/QueryController.java | 5 +- .../plan/StorageAttachedIndexSearcher.java | 2 + .../db/monitoring/SaiSlowLogTest.java | 92 +++++++++ .../SaiSlowestQueriesQueueTest.java | 188 +++++++++++++++++ 10 files changed, 674 insertions(+), 2 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/monitoring/SaiSlowLog.java create mode 100644 src/java/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueue.java create mode 100644 test/unit/org/apache/cassandra/db/monitoring/SaiSlowLogTest.java create mode 100644 test/unit/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueueTest.java diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index ee59dfbf8965..7f92ec6283e9 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -497,7 +497,13 @@ public enum CassandraRelevantProperties * Allows custom implementation of {@link org.apache.cassandra.sensors.RequestSensorsFactory} to optionally create * and configure {@link org.apache.cassandra.sensors.RequestSensors} instances. */ - REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class"); + REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class"), + + /** + * SAI slow log parameters + */ + SLOW_SAI_QUERY_BUFFER_SIZE("cassandra.slow_sai_query_buffer_size", "8192"), + SLOW_SAI_QUERY_BUFFER_PROCESSING_DELAY_MS("cassandra.slow_sai_query_buffer_processing_delay_ms", "1000"); CassandraRelevantProperties(String key, String defaultVal) { diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 1b0152520e1b..f28da01a6e92 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -119,6 +119,7 @@ public class Config public boolean cross_node_timeout = true; public volatile long slow_query_log_timeout_in_ms = 500L; + public volatile int sai_slow_log_num_slowest_queries = 5; public volatile double phi_convict_threshold = 8.0; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3d1948df3f43..d602e3835dda 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -976,6 +976,8 @@ else if (conf.max_value_size_in_mb >= 2048) throw new ConfigurationException("aggregation_subpage_size_in_kb must be greater than 0"); setAggregationSubPageSize(getAggregationSubPageSize()); + + validateSaiSlowLogNumSlowestQueries(); } @VisibleForTesting @@ -1776,6 +1778,11 @@ public static long getSlowQueryTimeout(TimeUnit units) return units.convert(conf.slow_query_log_timeout_in_ms, MILLISECONDS); } + public static int getSaiSlowLogNumSlowestQueries() + { + return conf.sai_slow_log_num_slowest_queries; + } + /** * @return the minimum configured {read, write, range, truncate, misc} timeout */ @@ -3690,6 +3697,13 @@ public static void setAggregationSubPageSize(PageSize pageSize) conf.aggregation_subpage_size_in_kb = pageSize.bytes() / 1024; } + public static void validateSaiSlowLogNumSlowestQueries() + { + // KATE the error message to be corrected after we decide on configuration/tooling + if (conf.sai_slow_log_num_slowest_queries < 1) + throw new ConfigurationException("sai_slow_log_num_slowest_queries must be greater than 0. To disable the slow log - use the enable/disable flag instead."); + } + public static ParameterizedClass getDefaultCompaction() { return conf != null ? conf.default_compaction : null; diff --git a/src/java/org/apache/cassandra/db/monitoring/SaiSlowLog.java b/src/java/org/apache/cassandra/db/monitoring/SaiSlowLog.java new file mode 100644 index 000000000000..fe7e7e5b02af --- /dev/null +++ b/src/java/org/apache/cassandra/db/monitoring/SaiSlowLog.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.monitoring; + +import java.beans.ConstructorProperties; +import java.util.Comparator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.index.sai.QueryContext; +import org.apache.cassandra.tracing.Tracing; + +public class SaiSlowLog +{ + private static final Logger logger = LoggerFactory.getLogger(SaiSlowLog.class); + + private final AtomicInteger numSlowestQueries = new AtomicInteger(); + // a structure that keeps the N slowest queries - don't use getter/setter so that it's not exposed over JMX + private final SaiSlowestQueriesQueue slowestQueries; + // KATE: make this false by default and decide on yaml property, -D flag, or JMX to enable; hook setEnabled() to change + private final AtomicBoolean enabled = new AtomicBoolean(true); + + static final Comparator SLOW_SAI_QUERY_COMPARATOR = (SlowSaiQuery o1, SlowSaiQuery o2) -> Long.compare(o2.getDuration(), o1.getDuration()); + + @VisibleForTesting + static SaiSlowLog instance = new SaiSlowLog(); + + public SaiSlowLog() + { + numSlowestQueries.set(DatabaseDescriptor.getSaiSlowLogNumSlowestQueries()); + slowestQueries = new SaiSlowestQueriesQueue(numSlowestQueries.get()); + } + + public boolean isEnabled() + { + return enabled.get(); + } + + public synchronized void setEnabled(boolean newValue) + { + boolean oldValue = this.enabled.get(); + + if (oldValue != newValue) + { + this.enabled.set(newValue); + logger.info("SAI Slow Log is now {}", newValue ? "enabled" : "disabled"); + } + } + + // KATE below two will be used to set and get the number of slowest queries; make it configurable through property/nodetool? + public int getNumSlowestQueries() + { + return numSlowestQueries.get(); + } + + public void setNumSlowestQueries(int numSlowestQueries) + { + int oldValue = this.numSlowestQueries.get(); + if (oldValue != numSlowestQueries) + { + if (numSlowestQueries < 1) + { + throw new IllegalArgumentException("sai_slow_log_num_slowest_queries must be > 0"); + } + this.numSlowestQueries.set(numSlowestQueries); + resizeMinMaxHeap(); + logger.info("sai_slow_log_num_slowest_queries set to {} (was {}).", numSlowestQueries, oldValue); + } + } + + private void resizeMinMaxHeap() + { + slowestQueries.resize(numSlowestQueries.get()); + } + + public List retrieveRecentSlowestCqlQueries() + { + List queries = slowestQueries.getAndReset(); + // there is no ordering guarantee and we want the slowest queries first + queries.sort(SLOW_SAI_QUERY_COMPARATOR); + return queries; + } + + /** + * Check if we should record a given operation in the SAI slow log. If so, extract + * the necessary detail from the query context and log it. + * @param queryContext operation to log + */ + public static void maybeRecord(QueryContext queryContext) + { + if (!instance.isEnabled()) + return; + + // KATE: Remove below line to avoid logging every query; it is used just during development/testing + logger.debug("Recording slow SAI query"); + UUID tracingSessionId = Tracing.isTracing() ? Tracing.instance.getSessionId() : null; + + instance.slowestQueries.addAsync(new SaiSlowLog.SlowSaiQuery(queryContext.totalQueryTimeNs(), + queryContext.optimizedPlan(), + null == tracingSessionId ? "" : tracingSessionId.toString())); + } + + public static final class SlowSaiQuery implements Comparable + { + private final long duration; + private final String optimizedPlan; + private final String tracingSessionId; + + @ConstructorProperties({ "duration", "optimizedPlan", "tracingSessionId" }) + public SlowSaiQuery(long duration, String optimizedPlan, String tracingSessionId) + { + this.duration = duration; + this.optimizedPlan = optimizedPlan; + this.tracingSessionId = tracingSessionId; + } + + public long getDuration() + { + return duration; + } + + // KATE: probably will be needed for testing? + public String getOptimizedPlan() + { + return optimizedPlan; + } + + // KATE: maybe needed for testing? + public String getTracingSessionId() + { + return tracingSessionId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SlowSaiQuery that = (SlowSaiQuery) o; + return duration == that.duration && + Objects.equal(optimizedPlan, that.optimizedPlan) && + Objects.equal(tracingSessionId, that.tracingSessionId); + } + + @Override + public int hashCode() + { + return Objects.hashCode(duration, optimizedPlan, tracingSessionId); + } + + @Override + public int compareTo(SlowSaiQuery other) + { + return Long.compare(this.duration, other.duration); + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder("SlowSaiQuery{\n"); + sb.append("duration=").append(duration).append("ns\n"); + sb.append("\noptimizedPlan:\n").append(optimizedPlan); + sb.append("\ntracingSessionId=").append(tracingSessionId); + sb.append('}'); + return sb.toString(); + } + } +} diff --git a/src/java/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueue.java b/src/java/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueue.java new file mode 100644 index 000000000000..d9d56ee2a02b --- /dev/null +++ b/src/java/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueue.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.monitoring; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.MinMaxPriorityQueue; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.jctools.queues.MpscArrayQueue; + +import static org.apache.cassandra.db.monitoring.SaiSlowLog.SLOW_SAI_QUERY_COMPARATOR; + +/** + * A thread-safe class for recording the N slowest SAI queries + *

+ * Allows an asynchronous, lock-free recording of new queries and + * blocking retrieval and resize. + *

+ * Internally it is backed by a {@link MinMaxPriorityQueue} ordered by + * {@link SaiSlowLog#SLOW_SAI_QUERY_COMPARATOR} that is configured to keep + * the N slowest queries and a buffer {@link MpscArrayQueue} for fast and + * lock-free recording of new slow queries. + *

+ * The priority queue is not synchronized. Instead, it is processed only in + * a single thread. The thread is responsible for periodically polling the buffer + * to update the priority queue and for executing additional operations + * like retrieval of the slow queries and resize of the priority queue + */ +@ThreadSafe +public class SaiSlowestQueriesQueue +{ + private static final int RECORDED_QUERIES_BUFFER_CAPACITY = CassandraRelevantProperties.SLOW_SAI_QUERY_BUFFER_SIZE.getInt(); + private static final int RECORDED_QUERIES_PROCESSING_DELAY_MS = CassandraRelevantProperties.SLOW_SAI_QUERY_BUFFER_PROCESSING_DELAY_MS.getInt(); + private static final int RECORDED_QUERIES_OPERATION_HARD_DEADLINE_MINUTES = 10; + + private final MpscArrayQueue recordedQueriesBuffer; + private Queue slowestQueries; + private final ScheduledExecutorService queueProcessor; + + public SaiSlowestQueriesQueue(int maxSize) + { + this(maxSize, RECORDED_QUERIES_PROCESSING_DELAY_MS); + } + + SaiSlowestQueriesQueue(int maxSize, int consumptionDelayMilliseconds) + { + recordedQueriesBuffer = new MpscArrayQueue<>(RECORDED_QUERIES_BUFFER_CAPACITY); + slowestQueries = MinMaxPriorityQueue.orderedBy(SLOW_SAI_QUERY_COMPARATOR) + .maximumSize(maxSize) + .create(); + queueProcessor = Executors.newSingleThreadScheduledExecutor(); + queueProcessor.scheduleWithFixedDelay(this::processNewlyAddedSlowQueries, consumptionDelayMilliseconds, consumptionDelayMilliseconds, TimeUnit.MILLISECONDS); + } + + /** + * Record a new slow sai query. The query will not be instantly available for retrieval. + */ + public void addAsync(SaiSlowLog.SlowSaiQuery slowSaiQuery) + { + // if the buffer is full - tough luck; we loose some slowSaiQueries + // if that's a problem we can tweak recorded queries-related system properties + recordedQueriesBuffer.offer(slowSaiQuery); + } + + /** + * Resize the priority queue. It will now hold up to newMaxSize slow queries + * @param newMaxSize the new max size of the priority queue + */ + void resize(int newMaxSize) + { + Future resizeFuture = queueProcessor.submit(() -> doResize(newMaxSize)); + try + { + resizeFuture.get(RECORDED_QUERIES_OPERATION_HARD_DEADLINE_MINUTES, TimeUnit.MINUTES); + } + catch (InterruptedException | ExecutionException | TimeoutException e) + { + throw new RuntimeException(e); + } + } + + /** + * Extracts all slow queries from the priority queue and returns them in a List. + * List order is unspecified + * + * @return list of the slowest sai queries + */ + List getAndReset() + { + Future> slowQueries = queueProcessor.submit(this::doGetAndReset); + try + { + return slowQueries.get(RECORDED_QUERIES_OPERATION_HARD_DEADLINE_MINUTES, TimeUnit.MINUTES); + } + catch (InterruptedException | ExecutionException | TimeoutException e) + { + throw new RuntimeException(e); + } + } + + private void processNewlyAddedSlowQueries() + { + for (SaiSlowLog.SlowSaiQuery slowQuery = recordedQueriesBuffer.poll(); + slowQuery != null; + slowQuery = recordedQueriesBuffer.poll()) + { + slowestQueries.add(slowQuery); + } + } + + private List doGetAndReset() + { + ArrayList queriesList = new ArrayList<>(slowestQueries.size()); + queriesList.addAll(slowestQueries); + slowestQueries.clear(); + return queriesList; + } + + private void doResize(int newMaxSize) + { + Queue oldQueries = slowestQueries; + slowestQueries = MinMaxPriorityQueue.orderedBy(SLOW_SAI_QUERY_COMPARATOR) + .maximumSize(newMaxSize) + .create(oldQueries); + } + + @VisibleForTesting + boolean bufferIsEmpty() throws ExecutionException, InterruptedException + { + // isEmpty is valid only in consumer thread + return queueProcessor.submit(recordedQueriesBuffer::isEmpty).get(); + } +} diff --git a/src/java/org/apache/cassandra/index/sai/QueryContext.java b/src/java/org/apache/cassandra/index/sai/QueryContext.java index 30f12b84d5ec..eb5e5aae1e5b 100644 --- a/src/java/org/apache/cassandra/index/sai/QueryContext.java +++ b/src/java/org/apache/cassandra/index/sai/QueryContext.java @@ -71,6 +71,8 @@ public class QueryContext // Null means the query execution order hasn't been decided yet. private FilterSortOrder filterSortOrder = null; + private String optimizedPlan; + @VisibleForTesting public QueryContext() { @@ -81,6 +83,7 @@ public QueryContext(long executionQuotaMs) { this.executionQuotaNano = TimeUnit.MILLISECONDS.toNanos(executionQuotaMs); this.queryStartTimeNanos = System.nanoTime(); + this.optimizedPlan = ""; } public long totalQueryTimeNs() @@ -88,6 +91,11 @@ public long totalQueryTimeNs() return System.nanoTime() - queryStartTimeNanos; } + public String optimizedPlan() + { + return optimizedPlan; + } + // setters public void addSstablesHit(long val) { @@ -150,6 +158,11 @@ public void setFilterSortOrder(FilterSortOrder filterSortOrder) this.filterSortOrder = filterSortOrder; } + public void setOptimizedPlan(String optimizedPlan) + { + this.optimizedPlan = optimizedPlan; + } + // getters public long sstablesHit() diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 72d6cd359807..0b9ae4ccac83 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -60,6 +60,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.db.monitoring.SaiSlowLog; import org.apache.cassandra.db.rows.BaseRowIterator; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -691,12 +692,14 @@ private static void releaseQuietly(SSTableIndex index) } /** - * Used to release all resources and record metrics when query finishes. + * Used to release all resources, record metrics, and update SAI slow query log when a query finishes. */ public void finish() { closeUnusedIterators(); if (tableQueryMetrics != null) tableQueryMetrics.record(queryContext); + + SaiSlowLog.maybeRecord(queryContext); } private void closeUnusedIterators() diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index 117ac94b822b..2e6401a00232 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -127,6 +127,7 @@ public UnfilteredPartitionIterator search(ReadExecutionController executionContr // Can't check for `command.isTopK()` because the planner could optimize sorting out if (plan.ordering() != null) { + queryContext.setOptimizedPlan(plan.toStringRecursive()); // TopK queries require a consistent view of the sstables and memtables in order to validate overwritten // rows. Acquire the view before building any of the iterators. try (var queryView = new QueryViewBuilder(cfs, controller.getOrderer(), controller.mergeRange(), queryContext).build()) @@ -144,6 +145,7 @@ public UnfilteredPartitionIterator search(ReadExecutionController executionContr } else { + queryContext.setOptimizedPlan(""); Iterator keysIterator = controller.buildIterator(plan); assert keysIterator instanceof RangeIterator; return new ResultRetriever((RangeIterator) keysIterator, filterTree, controller, executionController, queryContext); diff --git a/test/unit/org/apache/cassandra/db/monitoring/SaiSlowLogTest.java b/test/unit/org/apache/cassandra/db/monitoring/SaiSlowLogTest.java new file mode 100644 index 000000000000..858a93616e03 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/monitoring/SaiSlowLogTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.monitoring; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.index.sai.SAITester; + +import static org.junit.Assert.assertEquals; + +public class SaiSlowLogTest extends SAITester +{ + @Before + public void setup() throws Throwable + { + requireNetwork(); + + createTable(CREATE_TABLE_TEMPLATE); + createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1")); + createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2")); + + if (!execute("SELECT * FROM %s").isEmpty()) + { + return; + } + + for (int i = 0; i < 100; ++i) + { + execute("INSERT INTO %s(id1,v1,v2) VALUES (?, ?, ?)", i, i, Integer.toString(i % 5)); + } + flush(); + + execute("SELECT * FROM %s WHERE v1 >= 0 AND v1 < 10000"); + execute("SELECT * FROM %s WHERE v2 = '0'"); + + SaiSlowLog.instance = new SaiSlowLog(); + } + + @After + public void cleanUp() + { + SaiSlowLog.instance.retrieveRecentSlowestCqlQueries(); + } + + // very basic test to verify that the slowest queries are returned + // Need to add more tests for: + // verification of the log format - toString() + // logLongRunningSelect + // logLongRunningSelectWithTracingEnabled + // verify slowest queries are returned, correctly ordered + // test things are handled right when we try invalid num slowest queries + // verify when we decrease/increase the number of slowest queries the results + // overall test all edge cases for all new configurations work as expected + + @Test + public void verifySlowestQueriesAreReturned() throws Throwable + { + performQueries(10); + // KATE - this is a bit of a hack to ensure the slowest queries are processed; We should use something more + // robust than Thread.sleep; some polling mechanism maybe + Thread.sleep(2000); + + assertEquals(5, SaiSlowLog.instance.retrieveRecentSlowestCqlQueries().size()); + } + + private void performQueries(int queriesToWrite) + { + // we need to reset the most recent slowest queries + SaiSlowLog.instance.retrieveRecentSlowestCqlQueries(); + + for (int i = 0; i < queriesToWrite; i++) + executeNet("SELECT * FROM %s WHERE v1 >= 0 AND v1 < 10000"); + } +} diff --git a/test/unit/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueueTest.java b/test/unit/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueueTest.java new file mode 100644 index 000000000000..712f78bfb6ed --- /dev/null +++ b/test/unit/org/apache/cassandra/db/monitoring/SaiSlowestQueriesQueueTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.monitoring; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Test; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +import static java.lang.Thread.sleep; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; + +public class SaiSlowestQueriesQueueTest +{ + @Test + public void shouldMaintainMaxSize() throws InterruptedException, ExecutionException + { + // given + int maxSize = 16; + SaiSlowestQueriesQueue q = new SaiSlowestQueriesQueue(maxSize, 1); + + // when + for (int duration = 0; duration < maxSize * 10; duration++) + { + q.addAsync(slowQuery(duration)); + } + waitUntilNewQueriesAreProcessed(q); + + // then + assertThat(q.getAndReset(), hasSize(maxSize)); + } + + @Test + public void shouldEmptyOnGetAndReset() throws InterruptedException, ExecutionException + { + // given + SaiSlowestQueriesQueue q = new SaiSlowestQueriesQueue(10, 1); + + for (int duration = 0; duration < 100; duration++) + { + q.addAsync(slowQuery(duration)); + } + waitUntilNewQueriesAreProcessed(q); + + // when + q.getAndReset(); + + // then + assertThat(q.getAndReset(), hasSize(0)); + } + + @Test + public void shouldKeepSlowestQueries() throws InterruptedException, ExecutionException + { + // given + int maxSize = 16; + int maxDuration = 100; + SaiSlowestQueriesQueue q = new SaiSlowestQueriesQueue(maxSize, 1); + + ArrayList durations = IntStream.range(0, maxDuration) + .boxed().collect(Collectors.toCollection(ArrayList::new)); + Collections.shuffle(durations); + + // when + durations.forEach(duration -> q.addAsync(slowQuery(duration))); + waitUntilNewQueriesAreProcessed(q); + + // then + List slowest = q.getAndReset(); + for (int duration = maxDuration - maxSize; duration < maxDuration; duration++) + { + assertThat(slowest, hasItem(queryWithDuration(duration))); + } + } + + @Test + public void shouldRemoveFastestQueriesOnResize() throws InterruptedException, ExecutionException + { + // given + int maxSize = 16; + int resizedSize = 5; + SaiSlowestQueriesQueue q = new SaiSlowestQueriesQueue(maxSize, 1); + + for (int duration = 0; duration < maxSize; duration++) + { + q.addAsync(slowQuery(duration)); + } + waitUntilNewQueriesAreProcessed(q); + + // when + q.resize(resizedSize); + + // then + List slowest = q.getAndReset(); + for (int duration = maxSize - resizedSize; duration < maxSize; duration++) + { + assertThat(slowest, hasItem(queryWithDuration(duration))); + } + } + + @Test + public void shouldPreserveExistingQueriesOnResizeToBigger() throws InterruptedException, ExecutionException + { + // given + int maxSize = 16; + SaiSlowestQueriesQueue q = new SaiSlowestQueriesQueue(maxSize, 1); + + for (int duration = 0; duration < maxSize; duration++) + { + q.addAsync(slowQuery(duration)); + } + waitUntilNewQueriesAreProcessed(q); + + // when + q.resize(25); + + // then + List slowest = q.getAndReset(); + for (int duration = 0; duration < maxSize; duration++) + { + assertThat(slowest, hasItem(queryWithDuration(duration))); + } + } + + private void waitUntilNewQueriesAreProcessed(SaiSlowestQueriesQueue q) throws InterruptedException, ExecutionException + { + while (!q.bufferIsEmpty()) + sleep(1); + } + + private SaiSlowLog.SlowSaiQuery slowQuery(int i) + { + return new SaiSlowLog.SlowSaiQuery(i, + "optimizedPlan", + "tracingSessionId"); + } + + private Matcher queryWithDuration(long duration) + { + return new QueryWithDuration(duration); + } + + private static class QueryWithDuration extends TypeSafeMatcher + { + private final long duration; + + private QueryWithDuration(long duration) + { + this.duration = duration; + } + + protected boolean matchesSafely(SaiSlowLog.SlowSaiQuery slowSaiQuery) + { + return slowSaiQuery.getDuration() == duration; + } + + public void describeTo(Description description) + { + description.appendText("query with duration " + duration); + } + } +} From e9f120a6771d204afe01ba7746923eba132b2c61 Mon Sep 17 00:00:00 2001 From: Ekaterina Dimitrova Date: Wed, 18 Sep 2024 19:01:49 -0400 Subject: [PATCH 2/2] WIP redaction --- .../apache/cassandra/db/filter/RowFilter.java | 92 +++++++++++++++++-- .../cassandra/index/sai/plan/Expression.java | 45 +++++++-- .../apache/cassandra/index/sai/plan/Plan.java | 59 ++++++++---- .../index/sai/plan/QueryController.java | 3 +- .../plan/StorageAttachedIndexSearcher.java | 2 +- test/conf/logback-test.xml | 6 +- .../cassandra/index/sai/plan/PlanTest.java | 36 +++++++- .../sai/plan/QueryPlanRedactionTest.java | 62 +++++++++++++ 8 files changed, 266 insertions(+), 39 deletions(-) create mode 100644 test/unit/org/apache/cassandra/index/sai/plan/QueryPlanRedactionTest.java diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index d6ecb1f46828..51d50ad3fe10 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -250,6 +250,11 @@ public String toString() return root.toString(); } + public String toStringRedacted() + { + return root.toStringRedacted(); + } + private void warnIfFilterIsATree() { if (!root.children.isEmpty()) @@ -477,20 +482,38 @@ public boolean isSatisfiedBy(TableMetadata table, DecoratedKey key, Row row) @Override public String toString() + { + return toStringInternal(false); + } + + public String toStringRedacted() + { + return toStringInternal(true); + } + + private String toStringInternal(boolean redacted) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < expressions.size(); i++) { if (sb.length() > 0) sb.append(isDisjunction ? " OR " : " AND "); - sb.append(expressions.get(i)); + + if (!redacted) + sb.append(expressions.get(i)); + else + sb.append(expressions.get(i).toStringRedacted()); + } for (int i = 0; i < children.size(); i++) { if (sb.length() > 0) sb.append(isDisjunction ? " OR " : " AND "); sb.append("("); - sb.append(children.get(i)); + if (!redacted) + sb.append(children.get(i)); + else + sb.append(children.get(i).toStringRedacted()); sb.append(")"); } return sb.toString(); @@ -785,6 +808,16 @@ public boolean equals(Object o) && Objects.equal(this.value, that.value); } + public String toString() + { + return ""; + } + + public String toStringRedacted() + { + return ""; + } + @Override public int hashCode() { @@ -1046,8 +1079,8 @@ private boolean containsKey(TableMetadata metadata, DecoratedKey partitionKey, R } } - @Override - public String toString() + + private String toStringInternal(boolean redacted) { AbstractType type = column.type; switch (operator) @@ -1074,12 +1107,32 @@ public String toString() default: break; } - var valueString = type.getString(value); - if (valueString.length() > 9) - valueString = valueString.substring(0, 6) + "..."; + + String valueString; + if(redacted) + valueString = ""; + else + { + valueString = type.getString(value); + if (valueString.length() > 9) + valueString = valueString.substring(0, 6) + "..."; + } + return String.format("%s %s %s", column.name, operator, valueString); } + @Override + public String toString() + { + return toStringInternal(false); + } + + @Override + public String toStringRedacted() + { + return toStringInternal(true); + } + @Override protected Kind kind() { @@ -1185,9 +1238,24 @@ private boolean isSatisfiedByEq(TableMetadata metadata, DecoratedKey partitionKe @Override public String toString() + { + return toStringInternal(false); + } + + @Override + public String toStringRedacted() + { + return toStringInternal(true); + } + + private String toStringInternal(boolean redacted) { MapType mt = (MapType)column.type; - return String.format("%s[%s] %s %s", column.name, mt.nameComparator().getString(key), operator, mt.valueComparator().getString(value)); + + if(!redacted) + return String.format("%s[%s] %s %s", column.name, mt.nameComparator().getString(key), operator, mt.valueComparator().getString(value)); + + return String.format("%s[%s] %s %s", column.name, mt.nameComparator().getString(key), operator, ""); } @Override @@ -1362,6 +1430,13 @@ public String toString() distanceOperator, FloatType.instance.getString(distance)); } + @Override + public String toStringRedacted() + { + return String.format("GEO_DISTANCE(%s, %s) %s %s", column.name, "", + distanceOperator, ""); + } + @Override public boolean equals(Object o) { @@ -1432,6 +1507,7 @@ public ByteBuffer getValue() return value; } + @Override public String toString() { return String.format("expr(%s, %s)", diff --git a/src/java/org/apache/cassandra/index/sai/plan/Expression.java b/src/java/org/apache/cassandra/index/sai/plan/Expression.java index 2c0ca4d88742..e567a1e311c7 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/Expression.java +++ b/src/java/org/apache/cassandra/index/sai/plan/Expression.java @@ -236,7 +236,7 @@ public Expression add(Operator op, ByteBuffer value) case GT: operation = Op.RANGE; if (context.getDefinition().isReversedType()) - upper = new Bound(value, validator, upperInclusive); + upper = new Bound(value, validator, upperInclusive); else lower = new Bound(value, validator, lowerInclusive); break; @@ -345,6 +345,7 @@ public boolean isSatisfiedBy(ByteBuffer columnValue) /** * Returns the lower bound of the expression as a ByteComparable with an encoding based on the version and the * validator. + * * @param version the version of the index * @return */ @@ -363,6 +364,7 @@ public ByteComparable getEncodedLowerBoundByteComparable(Version version) /** * Returns the upper bound of the expression as a ByteComparable with an encoding based on the version and the * validator. + * * @param version the version of the index * @return */ @@ -395,6 +397,7 @@ private ByteComparable getBoundByteComparable(ByteBuffer unencodedBound, Version * {@link ByteBuffer}, but it does not apply the validator's encoding. We do this because we apply * {@link TypeUtil#encode(ByteBuffer, AbstractType)} before we find the min/max on an index and this method is * exposed publicly for determining if a bound is within an index's min/max. + * * @param version * @return */ @@ -408,6 +411,7 @@ public ByteBuffer getPartiallyEncodedLowerBound(Version version) * {@link ByteBuffer}, but it does not apply the validator's encoding. We do this because we apply * {@link TypeUtil#encode(ByteBuffer, AbstractType)} before we find the min/max on an index and this method is * exposed publicly for determining if a bound is within an index's min/max. + * * @param version * @return */ @@ -528,18 +532,41 @@ public float getEuclideanSearchThreshold() return boundedAnnEuclideanDistanceThreshold; } + public String toStringRedacted() + { + return toStringInternal(true); + } + public String toString() { - return String.format("Expression{name: %s, op: %s, lower: (%s, %s), upper: (%s, %s), exclusions: %s}", - context.getColumnName(), - operation, - lower == null ? "null" : validator.getString(lower.value.raw), - lower != null && lower.inclusive, - upper == null ? "null" : validator.getString(upper.value.raw), - upper != null && upper.inclusive, - Iterators.toString(Iterators.transform(exclusions.iterator(), validator::getString))); + return toStringInternal(false); + } + + private String toStringInternal(boolean redacted) + { + if(redacted) + { + return String.format("Expression{name: %s, op: %s, lower: %s, upper: %s, exclusions: %s}", + context.getColumnName(), + operation, + lower == null ? "null" : "", + upper == null ? "null" : "", + Iterators.toString(Iterators.transform(exclusions.iterator(), element -> ""))); + } + else { + return String.format("Expression{name: %s, op: %s, lower: (%s, %s), upper: (%s, %s), exclusions: %s}", + context.getColumnName(), + operation, + lower == null ? "null" : validator.getString(lower.value.raw), + lower != null && lower.inclusive, + upper == null ? "null" : validator.getString(upper.value.raw), + upper != null && upper.inclusive, + Iterators.toString(Iterators.transform(exclusions.iterator(), validator::getString))); + } } + + public String getIndexName() { return context.getIndexName(); diff --git a/src/java/org/apache/cassandra/index/sai/plan/Plan.java b/src/java/org/apache/cassandra/index/sai/plan/Plan.java index d86d7a328f1d..e6dc77911716 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/Plan.java +++ b/src/java/org/apache/cassandra/index/sai/plan/Plan.java @@ -44,7 +44,6 @@ import org.apache.cassandra.index.sai.utils.RangeUnionIterator; import org.apache.cassandra.index.sai.utils.TreeFormatter; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.tracing.Tracing; import static java.lang.Math.max; import static java.lang.Math.min; @@ -296,13 +295,34 @@ public final String toStringRecursive() return formatter.format(this); } + /** + * Formats the whole plan as a pretty tree with redacted information for slow query logging + */ + public final String toStringRecursiveRedacted() + { + TreeFormatter formatter = new TreeFormatter<>(Plan::toStringRedacted, Plan::subplans); + return formatter.format(this); + } + /** * Returns the string representation of this node only */ public final String toString() { - String title = title(); - String description = description(); + String title = title(false); + String description = description(false); + return (title.isEmpty()) + ? String.format("%s (%s)\n%s", getClass().getSimpleName(), cost(), description).stripTrailing() + : String.format("%s %s (%s)\n%s", getClass().getSimpleName(), title, cost(), description).stripTrailing(); + } + + /** + * Returns the redacted string representation of this node only (used in slow query logging) + */ + public final String toStringRedacted() + { + String title = title(true); + String description = description(true); return (title.isEmpty()) ? String.format("%s (%s)\n%s", getClass().getSimpleName(), cost(), description).stripTrailing() : String.format("%s %s (%s)\n%s", getClass().getSimpleName(), title, cost(), description).stripTrailing(); @@ -311,9 +331,9 @@ public final String toString() /** * Returns additional information specific to the node displayed in the first line. * The information is included in the output of {@link #toString()} and {@link #toStringRecursive()}. - * It is up to subclasses to implement it. + * It is up to subclasses to implement it. The redacted version is used in slow query logging. */ - protected String title() + protected String title(boolean redacted) { return ""; } @@ -321,9 +341,9 @@ protected String title() /** * Returns additional information specific to the node, displayed below the title. * The information is included in the output of {@link #toString()} and {@link #toStringRecursive()}. - * It is up to subclasses to implement it. + * It is up to subclasses to implement it. The redacted version is used in slow query logging. */ - protected String description() + protected String description(boolean redacted) { return ""; } @@ -337,8 +357,10 @@ protected String description() */ public final Plan optimize() { + // KATE: revert below, it was just for dirty testing and printing in the logs; we should keep on using here + // toStringRecursive(), all three places below if (logger.isTraceEnabled()) - logger.trace("Optimizing plan:\n{}", this.toStringRecursive()); + logger.trace("Optimizing plan:\n{}", this.toStringRecursiveRedacted()); Plan bestPlanSoFar = this; List leaves = nodesOfType(Leaf.class); @@ -349,14 +371,14 @@ public final Plan optimize() { Plan candidate = bestPlanSoFar.removeRestriction(leaf.id); if (logger.isTraceEnabled()) - logger.trace("Candidate query plan:\n{}", candidate.toStringRecursive()); + logger.trace("Candidate query plan:\n{}", candidate.toStringRecursiveRedacted()); if (candidate.fullCost() <= bestPlanSoFar.fullCost()) bestPlanSoFar = candidate; } if (logger.isTraceEnabled()) - logger.trace("Optimized plan:\n{}", bestPlanSoFar.toStringRecursive()); + logger.trace("Optimized plan:\n{}", bestPlanSoFar.toStringRecursiveRedacted()); return bestPlanSoFar; } @@ -771,20 +793,23 @@ public IndexScan(Factory factory, int id, Expression predicate, long matchingKey } @Override - protected final String title() + protected final String title(boolean redacted) { return String.format("of %s (sel: %.9f, step: %.1f)", getIndexName(), selectivity(), access.meanDistance()); } @Override - protected String description() + protected String description(boolean redacted ) { StringBuilder sb = new StringBuilder(); if (predicate != null) { sb.append("predicate: "); - sb.append(predicate); + if (redacted) + sb.append(predicate.toStringRedacted()); + else + sb.append(predicate); sb.append('\n'); } if (ordering != null) @@ -1524,9 +1549,11 @@ protected Filter withAccess(Access access) } @Override - protected String title() + protected String title(boolean redacted) { - return String.format("%s (sel: %.9f)", filter, selectivity() / source.get().selectivity()); + if (!redacted) + return String.format("%s (sel: %.9f)", filter.toString(), selectivity() / source.get().selectivity()); + return String.format("%s (sel: %.9f)", filter.toStringRedacted(), selectivity() / source.get().selectivity()); } } @@ -1592,7 +1619,7 @@ protected RowsIteration withAccess(Access access) } @Override - protected String title() + protected String title(boolean redacted) { return "" + limit; } diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 0b9ae4ccac83..a3936ee856d1 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -365,8 +365,9 @@ Plan buildPlan() if (optimizedPlan.contains(node -> node instanceof Plan.KeysSort)) queryContext.setFilterSortOrder(QueryContext.FilterSortOrder.SEARCH_THEN_ORDER); + // KATE: revert below, it was just for dirty testing and printing in the logs, we should not redact here if (logger.isTraceEnabled()) - logger.trace("Query execution plan:\n" + optimizedPlan.toStringRecursive()); + logger.trace("Query execution plan:\n" + optimizedPlan.toStringRecursiveRedacted()); if (Tracing.isTracing()) { diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index 2e6401a00232..8cb5c3953102 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -127,7 +127,7 @@ public UnfilteredPartitionIterator search(ReadExecutionController executionContr // Can't check for `command.isTopK()` because the planner could optimize sorting out if (plan.ordering() != null) { - queryContext.setOptimizedPlan(plan.toStringRecursive()); + queryContext.setOptimizedPlan(plan.toStringRecursiveRedacted()); // TopK queries require a consistent view of the sstables and memtables in order to validate overwritten // rows. Acquire the view before building any of the iterators. try (var queryView = new QueryViewBuilder(cfs, controller.getOrderer(), controller.mergeRange(), queryContext).build()) diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml index 6fd8bc59d02a..41d72c060bfa 100644 --- a/test/conf/logback-test.xml +++ b/test/conf/logback-test.xml @@ -48,7 +48,7 @@ %-5level [%thread] %date{ISO8601} %F:%L - %msg%n - DEBUG + TRACE @@ -59,7 +59,7 @@ - + @@ -72,7 +72,7 @@ - + diff --git a/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java b/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java index c2001a51c471..7c121869dc8a 100644 --- a/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java +++ b/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java @@ -97,6 +97,7 @@ private static Expression saiPred(String column, Expression.Op operation, boolea { Expression pred = Mockito.mock(Expression.class); Mockito.when(pred.toString()).thenReturn(operation.toString() + '(' + column + ')'); + Mockito.when(pred.toStringRedacted()).thenReturn(operation.toString() + '(' + column + ')'); Mockito.when(pred.getIndexName()).thenReturn(column + "_idx"); Mockito.when(pred.getOp()).thenReturn(operation); Mockito.when(pred.isLiteral()).thenReturn(isLiteral); @@ -107,6 +108,7 @@ private static RowFilter.Expression filerPred(String column, Operator operation) { RowFilter.Expression pred = Mockito.mock(RowFilter.Expression.class); Mockito.when(pred.toString()).thenReturn(column + ' ' + operation + " X"); + Mockito.when(pred.toStringRedacted()).thenReturn(column + ' ' + operation + " "); Mockito.when(pred.operator()).thenReturn(operation); return pred; } @@ -576,6 +578,34 @@ public void prettyPrint() " predicate: RANGE(pred4)\n", prettyStr); } + @Test + public void prettyPrintRedacted() + { + Plan.KeysIteration s1 = factory.indexScan(saiPred1, (long) (0.5 * factory.tableMetrics.rows)); + Plan.KeysIteration s2 = factory.indexScan(saiPred2, (long) (0.002 * factory.tableMetrics.rows)); + Plan.KeysIteration s3 = factory.indexScan(saiPred4, (long) (0.001 * factory.tableMetrics.rows)); + Plan.KeysIteration union = factory.union(Lists.newArrayList(factory.intersection(Lists.newArrayList(s1, s2)), s3)); + Plan.KeysIteration sort = factory.sort(union, ordering); + Plan.RowsIteration fetch = factory.fetch(sort); + Plan.RowsIteration filter = factory.recheckFilter(RowFilter.builder().add(pred1).add(pred2).add(pred4).build(), fetch); + Plan.RowsIteration limit = factory.limit(filter, 3); + + String prettyStr = limit.toStringRecursiveRedacted(); + + assertEquals("Limit 3 (rows: 3.0, cost/row: 3895.8, cost: 56001.1..67688.5)\n" + + " └─ Filter pred1 < AND pred2 < AND pred4 < (sel: 1.000000000) (rows: 3.0, cost/row: 3895.8, cost: 56001.1..67688.5)\n" + + " └─ Fetch (rows: 3.0, cost/row: 3895.8, cost: 56001.1..67688.5)\n" + + " └─ KeysSort (keys: 3.0, cost/key: 3792.4, cost: 56001.1..67378.2)\n" + + " └─ Union (keys: 1999.0, cost/key: 14.8, cost: 13500.0..43001.3)\n" + + " ├─ Intersection (keys: 1000.0, cost/key: 29.4, cost: 9000.0..38401.3)\n" + + " │ ├─ NumericIndexScan of pred2_idx (sel: 0.002000000, step: 1.0) (keys: 2000.0, cost/key: 0.1, cost: 4500.0..4700.0)\n" + + " │ │ predicate: RANGE(pred2)\n" + + " │ └─ NumericIndexScan of pred1_idx (sel: 0.500000000, step: 250.0) (keys: 2000.0, cost/key: 14.6, cost: 4500.0..33701.3)\n" + + " │ predicate: RANGE(pred1)\n" + + " └─ LiteralIndexScan of pred4_idx (sel: 0.001000000, step: 1.0) (keys: 1000.0, cost/key: 0.1, cost: 4500.0..4600.0)\n" + + " predicate: RANGE(pred4)\n", prettyStr); + } + @Test public void removeNeedlessIntersections() { @@ -867,6 +897,8 @@ private void testIntersectionsUnderLimit(Plan.TableMetrics metrics, List List resultIndexScans = optimizedPlan.nodesOfType(Plan.IndexScan.class); assertTrue("original:\n" + origPlan.toStringRecursive() + "optimized:\n" + optimizedPlan.toStringRecursive(), expectedIndexScanCount.contains(resultIndexScans.size())); + assertTrue("original:\n" + origPlan.toStringRecursiveRedacted() + "optimized:\n" + optimizedPlan.toStringRecursiveRedacted(), + expectedIndexScanCount.contains(resultIndexScans.size())); } @Test @@ -996,6 +1028,8 @@ private void testIntersectionsUnderAnnSort(Plan.TableMetrics metrics, List resultIndexScans = optimizedPlan.nodesOfType(Plan.IndexScan.class); assertTrue("original:\n" + origPlan.toStringRecursive() + "optimized:\n" + optimizedPlan.toStringRecursive(), expectedIndexScanCount.contains(resultIndexScans.size())); + assertTrue("original:\n" + origPlan.toStringRecursiveRedacted() + "optimized:\n" + optimizedPlan.toStringRecursiveRedacted(), + expectedIndexScanCount.contains(resultIndexScans.size())); } @Test @@ -1034,7 +1068,7 @@ public void testLazyAccessPropagation() Mockito.when(indexScan1.withAccess(Mockito.any())).thenReturn(indexScan1); Mockito.when(indexScan1.estimateCost()).thenReturn(new Plan.KeysIterationCost(20,0.0, 0.5)); Mockito.when(indexScan1.estimateSelectivity()).thenReturn(0.001); - Mockito.when(indexScan1.title()).thenReturn(""); + Mockito.when(indexScan1.title(false)).thenReturn(""); Plan.KeysIteration indexScan2 = factory.indexScan(saiPred2, (long) (0.01 * factory.tableMetrics.rows)); Plan.KeysIteration indexScan3 = factory.indexScan(saiPred3, (long) (0.5 * factory.tableMetrics.rows)); diff --git a/test/unit/org/apache/cassandra/index/sai/plan/QueryPlanRedactionTest.java b/test/unit/org/apache/cassandra/index/sai/plan/QueryPlanRedactionTest.java new file mode 100644 index 000000000000..1c9438265ad4 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/plan/QueryPlanRedactionTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.plan; + +import org.junit.Before; +import org.junit.Test; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.cassandra.index.sai.SAITester; +import org.apache.cassandra.transport.ProtocolVersion; + +public class QueryPlanRedactionTest extends SAITester +{ + @Before + public void setup() + { + requireNetwork(); + } + + private static final String KEYSPACE = "prepared_stmt_cleanup"; + private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE + + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"; + + @Test + public void testQuery() + { + String selectCQL2 = "SELECT * FROM t WHERE val : 'missing' OR (val : ? AND val : ?)"; + Session session = getSession(ProtocolVersion.V5); + session.execute(createKsStatement); + session.execute("Use prepared_stmt_cleanup"); + session.execute("CREATE TABLE t (id text PRIMARY KEY, val text)"); + session.execute("CREATE CUSTOM INDEX ON t(val) USING 'StorageAttachedIndex' WITH OPTIONS = {" + + "'index_analyzer': '{\n" + + "\t\"tokenizer\":{\"name\":\"ngram\", \"args\":{\"minGramSize\":\"2\", \"maxGramSize\":\"3\"}}," + + "\t\"filters\":[{\"name\":\"lowercase\"}]\n" + + "}'," + + "'query_analyzer': '{\n" + + "\t\"tokenizer\":{\"name\":\"whitespace\"},\n" + + "\t\"filters\":[{\"name\":\"porterstem\"}]\n" + + "}'};"); + + PreparedStatement preparedSelect2 = session.prepare(selectCQL2); + session.execute(preparedSelect2.bind("'quick'", "'dog'")).all().size(); + } +}