diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index d9ce9d5b19b8..933704d269a5 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -998,6 +998,15 @@ truncate_request_timeout_in_ms: 60000 # The default timeout for other, miscellaneous operations. # Lowest acceptable value is 10 ms. request_timeout_in_ms: 10000 +# Upper bound for how long any request received via native transport +# should be considered live and serviceable by the system. This is +# currently considered at two points: when the message is dequeued and +# executed by the NATIVE_TRANSPORT_REQUESTS stage, and when the message +# is dequeued and executed by an async stage if NATIVE_TRANSPORT_ASYNC_READ_WRITE_ENABLED +# is set to true. If the request is not completed within this time, an +# OverloadedException is thrown. +native_transport_timeout_in_ms: 12000 + # Defensive settings for protecting Cassandra from true network partitions. # See (CASSANDRA-14358) for details. diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 1b0152520e1b..b27f3bfc6c18 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -113,6 +113,8 @@ public class Config public volatile long repair_prepare_message_timeout_in_ms = 10000L; + public volatile long native_transport_timeout_in_ms = 12000L; + public Integer streaming_connections_per_host = 1; public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 36e2d8835f87..bed0c22250fd 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3706,4 +3706,14 @@ public static void setAnnBruteForceExpenseFactor(double factor) Preconditions.checkArgument(factor <= StorageAttachedIndexOptions.MAXIMUM_ANN_BRUTE_FORCE_FACTOR, "ANN brute force expense factor must be at most " + StorageAttachedIndexOptions.MAXIMUM_ANN_BRUTE_FORCE_FACTOR); conf.sai_options.ann_brute_force_factor = factor; } + + public static long getNativeTransportTimeout(TimeUnit timeUnit) + { + return timeUnit.convert(conf.native_transport_timeout_in_ms, TimeUnit.MILLISECONDS); + } + + public static void setNativeTransportTimeout(long timeout, TimeUnit timeUnit) + { + conf.native_transport_timeout_in_ms = timeUnit.toMillis(timeout); + } } diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index ddaf03788a38..22f90fa6b865 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -26,6 +26,9 @@ import org.apache.cassandra.net.ParamType; import org.apache.cassandra.tracing.Tracing; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.utils.MonotonicClock.approxTime; + public class MutationVerbHandler implements IVerbHandler { public static final MutationVerbHandler instance = new MutationVerbHandler(); @@ -43,6 +46,13 @@ private void failed() public void doVerb(Message message) { + if (approxTime.now() > message.expiresAtNanos()) + { + Tracing.trace("Discarding mutation from {} (timed out)", message.from()); + MessagingService.instance().metrics.recordDroppedMessage(message, message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS); + return; + } + // Check if there were any forwarding headers in this message InetAddressAndPort from = message.respondTo(); InetAddressAndPort respondToAddress; diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java index 29ae693c9308..5f642bb46b8c 100644 --- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java @@ -19,12 +19,15 @@ package org.apache.cassandra.metrics; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Timer; import org.apache.cassandra.transport.*; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -44,9 +47,17 @@ public final class ClientMetrics @SuppressWarnings({ "unused", "FieldCanBeLocal" }) private Gauge pausedConnectionsGauge; - + private Meter connectionPaused; + private Meter requestDiscarded; + public Meter timedOutBeforeProcessing; + public Meter timedOutBeforeAsyncProcessing; + public Timer queueTime; // time between Message creation and execution on NTR + public Counter totalQueueTime; // total queue time (in nanoseconds) for use in histogram timer + public Timer asyncQueueTime; // time between Message creation and execution on an async stage. This includes the time recorded in queueTime metric. + public Counter totalAsyncQueueTime; // total async queue time (in nanoseconds) for use in histogram timer + private Meter protocolException; private Meter unknownException; @@ -64,7 +75,11 @@ public void markAuthFailure() authFailure.mark(); } - public void pauseConnection() { pausedConnections.incrementAndGet(); } + public void pauseConnection() { + connectionPaused.mark(); + pausedConnections.incrementAndGet(); + } + public void unpauseConnection() { pausedConnections.decrementAndGet(); } public void markRequestDiscarded() { requestDiscarded.mark(); } @@ -79,6 +94,40 @@ public List allConnectedClients() return clients; } + public void markTimedOutBeforeProcessing() + { + timedOutBeforeProcessing.mark(); + } + + public void markTimedOutBeforeAsyncProcessing() + { + timedOutBeforeAsyncProcessing.mark(); + } + + /** + * Record time between Message creation and execution on NTR. + * @param value time elapsed + * @param unit time unit + */ + public void recordQueueTime(long value, TimeUnit unit) + { + queueTime.update(value, unit); + totalQueueTime.inc(TimeUnit.NANOSECONDS.convert(value, unit)); + } + + /** + * Record time between Message creation and execution on an async stage, if present. + * Note that this includes the queue time previously recorded before execution on the NTR stage, + * so for a given request, asyncQueueTime >= queueTime. + * @param value time elapsed + * @param unit time unit + */ + public void recordAsyncQueueTime(long value, TimeUnit unit) + { + asyncQueueTime.update(value, unit); + totalAsyncQueueTime.inc(TimeUnit.NANOSECONDS.convert(value, unit)); + } + public void markProtocolException() { protocolException.mark(); @@ -117,9 +166,17 @@ public long getCount() authFailure = registerMeter("AuthFailure"); pausedConnections = new AtomicInteger(); + connectionPaused = registerMeter("ConnectionPaused"); pausedConnectionsGauge = registerGauge("PausedConnections", pausedConnections::get); requestDiscarded = registerMeter("RequestDiscarded"); + timedOutBeforeProcessing = registerMeter("TimedOutBeforeProcessing"); + timedOutBeforeAsyncProcessing = registerMeter("TimedOutBeforeAsyncProcessing"); + queueTime = registerTimer("QueueTime"); + totalQueueTime = registerCounter("TotalQueueTime"); + asyncQueueTime = registerTimer("AsyncQueueTime"); + totalAsyncQueueTime = registerCounter("TotalAsyncQueueTime"); + protocolException = registerMeter("ProtocolException"); unknownException = registerMeter("UnknownException"); @@ -188,4 +245,14 @@ private Meter registerMeter(String name) { return Metrics.meter(factory.createMetricName(name)); } + + private Timer registerTimer(String name) + { + return Metrics.timer(factory.createMetricName(name)); + } + + private Counter registerCounter(String name) + { + return Metrics.counter(factory.createMetricName(name)); + } } diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 627e52342e2c..69cfcc3e84d6 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; @@ -34,10 +35,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.*; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.UUIDGen; /** @@ -201,6 +206,7 @@ public String debugString() public static abstract class Request extends Message { private boolean tracingRequested; + private final long creationTimeNanos = MonotonicClock.approxTime.now(); protected Request(Type type) { @@ -215,10 +221,31 @@ protected boolean isTraceable() return false; } + /** + * Returns the time elapsed since this request was created. Note that this is the total lifetime of the request + * in the system, so we expect increasing returned values across multiple calls to elapsedTimeSinceCreation. + * + * @param timeUnit the time unit in which to return the elapsed time + * @return the time elapsed since this request was created + */ + protected long elapsedTimeSinceCreation(TimeUnit timeUnit) + { + return timeUnit.convert(MonotonicClock.approxTime.now() - creationTimeNanos, TimeUnit.NANOSECONDS); + } + protected abstract CompletableFuture maybeExecuteAsync(QueryState queryState, long queryStartNanoTime, boolean traceRequest); public final CompletableFuture execute(QueryState queryState, long queryStartNanoTime) { + // at the time of the check, this is approximately the time spent in the NTR stage's queue + long elapsedTimeSinceCreation = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS); + ClientMetrics.instance.recordQueueTime(elapsedTimeSinceCreation, TimeUnit.NANOSECONDS); + if (elapsedTimeSinceCreation > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS)) + { + ClientMetrics.instance.markTimedOutBeforeProcessing(); + return CompletableFuture.completedFuture(ErrorMessage.fromException(new OverloadedException("Query timed out before it could start"))); + } + boolean shouldTrace = false; UUID tracingSessionId = null; diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 25a30102063b..be1b01d294ba 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -22,11 +22,13 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.Attributes; import org.apache.cassandra.cql3.BatchQueryOptions; import org.apache.cassandra.cql3.CQLStatement; @@ -38,7 +40,9 @@ import org.apache.cassandra.cql3.statements.BatchStatement; import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.OverloadedException; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; @@ -225,9 +229,27 @@ public CompletableFuture maybeExecuteAsync(QueryState state, long quer Optional asyncStage = Stage.fromStatement(batch); if (asyncStage.isPresent()) { - BatchStatement finalStatement = batch; List finalPrepared = prepared; - return CompletableFuture.supplyAsync(() -> handleRequest(state, queryStartNanoTime, handler, batch, batchOptions, queries, statements, finalPrepared, requestStartMillisTime), + return CompletableFuture.supplyAsync(() -> + { + try + { + // at the time of the check, this includes the time spent in the NTR queue, basic query parsing/set up, + // and any time spent in the queue for the async stage + long elapsedTime = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS); + ClientMetrics.instance.recordAsyncQueueTime(elapsedTime, TimeUnit.NANOSECONDS); + if (elapsedTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS)) + { + ClientMetrics.instance.markTimedOutBeforeAsyncProcessing(); + throw new OverloadedException("Query timed out before it could start"); + } + } + catch (Exception e) + { + return handleException(state, finalPrepared, e); + } + return handleRequest(state, queryStartNanoTime, handler, batch, batchOptions, queries, statements, finalPrepared, requestStartMillisTime); + }, asyncStage.get().executor()); } else diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index fd3278d789bd..20407d9294ac 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.QueryEvents; @@ -33,7 +34,9 @@ import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.ResultSet; import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.exceptions.OverloadedException; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; @@ -160,7 +163,26 @@ protected CompletableFuture maybeExecuteAsync(QueryState queryState, l if (asyncStage.isPresent()) { QueryHandler.Prepared finalPrepared = prepared; - return CompletableFuture.supplyAsync(() -> handleRequest(queryState, queryStartNanoTime, handler, queryOptions, statement, finalPrepared, requestStartMillisTime), + return CompletableFuture.supplyAsync(() -> + { + try + { + // at the time of the check, this includes the time spent in the NTR queue, basic query parsing/set up, + // and any time spent in the queue for the async stage + long elapsedTime = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS); + ClientMetrics.instance.recordAsyncQueueTime(elapsedTime, TimeUnit.NANOSECONDS); + if (elapsedTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS)) + { + ClientMetrics.instance.markTimedOutBeforeAsyncProcessing(); + throw new OverloadedException("Query timed out before it could start"); + } + } + catch (Exception e) + { + return handleException(queryState, finalPrepared, e); + } + return handleRequest(queryState, queryStartNanoTime, handler, queryOptions, statement, finalPrepared, requestStartMillisTime); + }, asyncStage.get().executor()); } else diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 300ce5cbe106..995fc70a83c3 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -19,17 +19,21 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryEvents; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.exceptions.OverloadedException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; @@ -114,7 +118,26 @@ protected CompletableFuture maybeExecuteAsync(QueryState state, long q if (asyncStage.isPresent()) { CQLStatement finalStatement = statement; - return CompletableFuture.supplyAsync(() -> handleRequest(state, queryHandler, queryStartNanoTime, finalStatement, requestStartMillisTime), + return CompletableFuture.supplyAsync(() -> + { + try + { + // at the time of the check, this includes the time spent in the NTR queue, basic query parsing/set up, + // and any time spent in the queue for the async stage + long elapsedTime = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS); + ClientMetrics.instance.recordAsyncQueueTime(elapsedTime, TimeUnit.NANOSECONDS); + if (elapsedTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS)) + { + ClientMetrics.instance.markTimedOutBeforeAsyncProcessing(); + throw new OverloadedException("Query timed out before it could start"); + } + } + catch (Exception e) + { + return handleException(state, finalStatement, e); + } + return handleRequest(state, queryHandler, queryStartNanoTime, finalStatement, requestStartMillisTime); + }, asyncStage.get().executor()); } else diff --git a/test/unit/org/apache/cassandra/transport/NativeTransportTimeoutTest.java b/test/unit/org/apache/cassandra/transport/NativeTransportTimeoutTest.java new file mode 100644 index 000000000000..58cc00965611 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/NativeTransportTimeoutTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.OverloadedException; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.metrics.ClientMetrics; +import org.assertj.core.api.Assertions; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + +import static org.apache.cassandra.utils.MonotonicClock.approxTime; + +@RunWith(BMUnitRunner.class) +public class NativeTransportTimeoutTest extends CQLTester +{ + static Semaphore EXECUTE_BARRIER; + static Semaphore WAIT_BARRIER; + + @Test + @BMRules(rules = { @BMRule(name = "Delay Message execution on NTR stage", + targetClass = "org.apache.cassandra.transport.Message$Request", + targetMethod = "execute", + targetLocation = "AT ENTRY", + condition = "$this.getCustomPayload() != null", + action = "org.apache.cassandra.transport.NativeTransportTimeoutTest.WAIT_BARRIER.release(); " + + "org.apache.cassandra.transport.NativeTransportTimeoutTest.EXECUTE_BARRIER.acquire(); " + + "flag(Thread.currentThread());"), + @BMRule(name = "Mock NTR timeout from Request.execute", + targetClass = "org.apache.cassandra.config.DatabaseDescriptor", + targetMethod = "getNativeTransportTimeout", + targetLocation = "AT ENTRY", + condition = "flagged(Thread.currentThread()) && callerEquals(\"Message$Request.execute\", true)", + action = "clear(Thread.currentThread()); " + + "return 10000000;") }) + public void testNativeTransportLoadShedding() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)"); + Statement statement = new SimpleStatement("SELECT * FROM " + KEYSPACE + '.' + currentTable()); + doTestLoadShedding(false, statement); + } + + @Test + @BMRules(rules = { @BMRule(name = "Delay elapsedTimeSinceCreationCheck from async stage", + targetClass = "org.apache.cassandra.transport.Message$Request", + targetMethod = "elapsedTimeSinceCreation", + targetLocation = "AT ENTRY", + condition = "$this.getCustomPayload() != null && !callerEquals(\"Message$Request.execute\", true)", + action = "org.apache.cassandra.transport.NativeTransportTimeoutTest.WAIT_BARRIER.release(); " + + "org.apache.cassandra.transport.NativeTransportTimeoutTest.EXECUTE_BARRIER.acquire(); " + + "flag(Thread.currentThread());"), + @BMRule(name = "Mock native transport timeout from async stage", + targetClass = "org.apache.cassandra.config.DatabaseDescriptor", + targetMethod = "getNativeTransportTimeout", + targetLocation = "AT ENTRY", + condition = "flagged(Thread.currentThread()) && callerMatches(\".*maybeExecuteAsync.*\", true)", + action = "clear(Thread.currentThread()); " + + "return 10000000;") }) + public void testAsyncStageLoadShedding() throws Throwable + { + CassandraRelevantProperties.NATIVE_TRANSPORT_ASYNC_READ_WRITE_ENABLED.setBoolean(true); + + try + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)"); + + Statement statement = new SimpleStatement("SELECT * FROM " + KEYSPACE + '.' + currentTable()); + doTestLoadShedding(true, statement); + + Statement insert1 = new SimpleStatement("INSERT INTO " + KEYSPACE + '.' + currentTable() + " (pk, v) VALUES (1, 'foo')"); + Statement insert2 = new SimpleStatement("INSERT INTO " + KEYSPACE + '.' + currentTable() + " (pk, v) VALUES (2, 'bar')"); + statement = new BatchStatement().add(insert1).add(insert2); + doTestLoadShedding(true, statement); + + PreparedStatement ps = sessionNet().prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable()); + doTestLoadShedding(true, ps.bind()); + } + finally + { + CassandraRelevantProperties.NATIVE_TRANSPORT_ASYNC_READ_WRITE_ENABLED.setBoolean(false); + } + } + + private void doTestLoadShedding(boolean useAsyncStages, Statement statement) throws InterruptedException + { + EXECUTE_BARRIER = new Semaphore(0); + WAIT_BARRIER = new Semaphore(0); + + Meter timedOutMeter; + Timer queueTimer; + + Session session = sessionNet(); + + // custom payload used to make detection of this statement easy early in byteman rules + statement.setOutgoingPayload(Collections.singletonMap("sentinel", ByteBuffer.wrap(new byte[0]))); + + if (useAsyncStages) + { + timedOutMeter = ClientMetrics.instance.timedOutBeforeAsyncProcessing; + queueTimer = ClientMetrics.instance.asyncQueueTime; + } + else + { + timedOutMeter = ClientMetrics.instance.timedOutBeforeProcessing; + queueTimer = ClientMetrics.instance.queueTime; + } + + long initialTimedOut = timedOutMeter.getCount(); + + ResultSetFuture rsf = session.executeAsync(statement); + WAIT_BARRIER.acquire(); + Thread.sleep(10 + TimeUnit.MILLISECONDS.convert(approxTime.error(), TimeUnit.NANOSECONDS) * 2); + EXECUTE_BARRIER.release(); + + Assertions.assertThatThrownBy(rsf::get).hasCauseInstanceOf(OverloadedException.class); + Assert.assertEquals(initialTimedOut + 1, timedOutMeter.getCount()); + Assert.assertTrue(queueTimer.getSnapshot().get999thPercentile() > TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS)); + } +}