Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-11070: Limited backport of CASSANDRA-19534 #1393

Open
wants to merge 8 commits into
base: cc-main-migration-release
Choose a base branch
from
9 changes: 9 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,15 @@ truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations.
# Lowest acceptable value is 10 ms.
request_timeout_in_ms: 10000
# Upper bound for how long any request received via native transport
# should be considered live and serviceable by the system. This is
# currently considered at two points: when the message is dequeued and
# executed by the NATIVE_TRANSPORT_REQUESTS stage, and when the message
# is dequeued and executed by an async stage if NATIVE_TRANSPORT_ASYNC_READ_WRITE_ENABLED
# is set to true. If the request is not completed within this time, an
# OverloadedException is thrown.
native_transport_timeout_in_ms: 12000


# Defensive settings for protecting Cassandra from true network partitions.
# See (CASSANDRA-14358) for details.
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class Config

public volatile long repair_prepare_message_timeout_in_ms = 10000L;

public volatile long native_transport_timeout_in_ms = 12000L;

public Integer streaming_connections_per_host = 1;
public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes

Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3706,4 +3706,14 @@ public static void setAnnBruteForceExpenseFactor(double factor)
Preconditions.checkArgument(factor <= StorageAttachedIndexOptions.MAXIMUM_ANN_BRUTE_FORCE_FACTOR, "ANN brute force expense factor must be at most " + StorageAttachedIndexOptions.MAXIMUM_ANN_BRUTE_FORCE_FACTOR);
conf.sai_options.ann_brute_force_factor = factor;
}

public static long getNativeTransportTimeout(TimeUnit timeUnit)
{
return timeUnit.convert(conf.native_transport_timeout_in_ms, TimeUnit.MILLISECONDS);
}

public static void setNativeTransportTimeout(long timeout, TimeUnit timeUnit)
{
conf.native_transport_timeout_in_ms = timeUnit.toMillis(timeout);
}
}
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.tracing.Tracing;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;

public class MutationVerbHandler implements IVerbHandler<Mutation>
{
public static final MutationVerbHandler instance = new MutationVerbHandler();
Expand All @@ -43,6 +46,13 @@ private void failed()

public void doVerb(Message<Mutation> message)
{
if (approxTime.now() > message.expiresAtNanos())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have something simlar for other verbs? Maybe reads?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reads handle this much more extensively through Monitorable implementation (https://issues.apache.org/jira/browse/CASSANDRA-7392). CASSANDRA-19534 added this as a very minimal load-shedding point for expired mutations, but reads already have much more extensive infrastructure to handle this.

{
Tracing.trace("Discarding mutation from {} (timed out)", message.from());
MessagingService.instance().metrics.recordDroppedMessage(message, message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS);
return;
}

// Check if there were any forwarding headers in this message
InetAddressAndPort from = message.respondTo();
InetAddressAndPort respondToAddress;
Expand Down
71 changes: 69 additions & 2 deletions src/java/org/apache/cassandra/metrics/ClientMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.cassandra.metrics;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import org.apache.cassandra.transport.*;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
Expand All @@ -44,9 +47,17 @@ public final class ClientMetrics

@SuppressWarnings({ "unused", "FieldCanBeLocal" })
private Gauge<Integer> pausedConnectionsGauge;

private Meter connectionPaused;

private Meter requestDiscarded;

public Meter timedOutBeforeProcessing;
public Meter timedOutBeforeAsyncProcessing;
public Timer queueTime; // time between Message creation and execution on NTR
public Counter totalQueueTime; // total queue time (in nanoseconds) for use in histogram timer
public Timer asyncQueueTime; // time between Message creation and execution on an async stage. This includes the time recorded in queueTime metric.
public Counter totalAsyncQueueTime; // total async queue time (in nanoseconds) for use in histogram timer

private Meter protocolException;
private Meter unknownException;

Expand All @@ -64,7 +75,11 @@ public void markAuthFailure()
authFailure.mark();
}

public void pauseConnection() { pausedConnections.incrementAndGet(); }
public void pauseConnection() {
connectionPaused.mark();
pausedConnections.incrementAndGet();
}

public void unpauseConnection() { pausedConnections.decrementAndGet(); }

public void markRequestDiscarded() { requestDiscarded.mark(); }
Expand All @@ -79,6 +94,40 @@ public List<ConnectedClient> allConnectedClients()
return clients;
}

public void markTimedOutBeforeProcessing()
{
timedOutBeforeProcessing.mark();
}

public void markTimedOutBeforeAsyncProcessing()
{
timedOutBeforeAsyncProcessing.mark();
}

/**
* Record time between Message creation and execution on NTR.
* @param value time elapsed
* @param unit time unit
*/
public void recordQueueTime(long value, TimeUnit unit)
{
queueTime.update(value, unit);
totalQueueTime.inc(TimeUnit.NANOSECONDS.convert(value, unit));
}

/**
* Record time between Message creation and execution on an async stage, if present.
* Note that this includes the queue time previously recorded before execution on the NTR stage,
* so for a given request, asyncQueueTime >= queueTime.
* @param value time elapsed
* @param unit time unit
*/
public void recordAsyncQueueTime(long value, TimeUnit unit)
{
asyncQueueTime.update(value, unit);
totalAsyncQueueTime.inc(TimeUnit.NANOSECONDS.convert(value, unit));
}

public void markProtocolException()
{
protocolException.mark();
Expand Down Expand Up @@ -117,9 +166,17 @@ public long getCount()
authFailure = registerMeter("AuthFailure");

pausedConnections = new AtomicInteger();
connectionPaused = registerMeter("ConnectionPaused");
pausedConnectionsGauge = registerGauge("PausedConnections", pausedConnections::get);
requestDiscarded = registerMeter("RequestDiscarded");

timedOutBeforeProcessing = registerMeter("TimedOutBeforeProcessing");
timedOutBeforeAsyncProcessing = registerMeter("TimedOutBeforeAsyncProcessing");
queueTime = registerTimer("QueueTime");
totalQueueTime = registerCounter("TotalQueueTime");
asyncQueueTime = registerTimer("AsyncQueueTime");
totalAsyncQueueTime = registerCounter("TotalAsyncQueueTime");

protocolException = registerMeter("ProtocolException");
unknownException = registerMeter("UnknownException");

Expand Down Expand Up @@ -188,4 +245,14 @@ private Meter registerMeter(String name)
{
return Metrics.meter(factory.createMetricName(name));
}

private Timer registerTimer(String name)
{
return Metrics.timer(factory.createMetricName(name));
}

private Counter registerCounter(String name)
{
return Metrics.counter(factory.createMetricName(name));
}
}
27 changes: 27 additions & 0 deletions src/java/org/apache/cassandra/transport/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;

Expand All @@ -34,10 +35,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.UUIDGen;

/**
Expand Down Expand Up @@ -201,6 +206,7 @@ public String debugString()
public static abstract class Request extends Message
{
private boolean tracingRequested;
private final long creationTimeNanos = MonotonicClock.approxTime.now();
jkni marked this conversation as resolved.
Show resolved Hide resolved

protected Request(Type type)
{
Expand All @@ -215,10 +221,31 @@ protected boolean isTraceable()
return false;
}

/**
* Returns the time elapsed since this request was created. Note that this is the total lifetime of the request
* in the system, so we expect increasing returned values across multiple calls to elapsedTimeSinceCreation.
*
* @param timeUnit the time unit in which to return the elapsed time
* @return the time elapsed since this request was created
*/
protected long elapsedTimeSinceCreation(TimeUnit timeUnit)
{
return timeUnit.convert(MonotonicClock.approxTime.now() - creationTimeNanos, TimeUnit.NANOSECONDS);
}

protected abstract CompletableFuture<Response> maybeExecuteAsync(QueryState queryState, long queryStartNanoTime, boolean traceRequest);

public final CompletableFuture<Response> execute(QueryState queryState, long queryStartNanoTime)
{
// at the time of the check, this is approximately the time spent in the NTR stage's queue
long elapsedTimeSinceCreation = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So async queue time includes sync queue time? Could you please make the relation clear in the comments?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do.

ClientMetrics.instance.recordQueueTime(elapsedTimeSinceCreation, TimeUnit.NANOSECONDS);
if (elapsedTimeSinceCreation > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS))
{
ClientMetrics.instance.markTimedOutBeforeProcessing();
return CompletableFuture.completedFuture(ErrorMessage.fromException(new OverloadedException("Query timed out before it could start")));
}

boolean shouldTrace = false;
UUID tracingSessionId = null;

Expand Down
26 changes: 24 additions & 2 deletions src/java/org/apache/cassandra/transport/messages/BatchMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableMap;

import io.netty.buffer.ByteBuf;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.cql3.BatchQueryOptions;
import org.apache.cassandra.cql3.CQLStatement;
Expand All @@ -38,7 +40,9 @@
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
Expand Down Expand Up @@ -225,9 +229,27 @@ public CompletableFuture<Response> maybeExecuteAsync(QueryState state, long quer
Optional<Stage> asyncStage = Stage.fromStatement(batch);
if (asyncStage.isPresent())
{
BatchStatement finalStatement = batch;
List<QueryHandler.Prepared> finalPrepared = prepared;
return CompletableFuture.supplyAsync(() -> handleRequest(state, queryStartNanoTime, handler, batch, batchOptions, queries, statements, finalPrepared, requestStartMillisTime),
return CompletableFuture.supplyAsync(() ->
{
try
{
// at the time of the check, this includes the time spent in the NTR queue, basic query parsing/set up,
// and any time spent in the queue for the async stage
long elapsedTime = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS);
ClientMetrics.instance.recordAsyncQueueTime(elapsedTime, TimeUnit.NANOSECONDS);
if (elapsedTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS))
{
ClientMetrics.instance.markTimedOutBeforeAsyncProcessing();
throw new OverloadedException("Query timed out before it could start");
}
}
catch (Exception e)
{
return handleException(state, finalPrepared, e);
}
return handleRequest(state, queryStartNanoTime, handler, batch, batchOptions, queries, statements, finalPrepared, requestStartMillisTime);
},
asyncStage.get().executor());
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@

import io.netty.buffer.ByteBuf;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryEvents;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.ResultSet;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
Expand Down Expand Up @@ -160,7 +163,26 @@ protected CompletableFuture<Response> maybeExecuteAsync(QueryState queryState, l
if (asyncStage.isPresent())
{
QueryHandler.Prepared finalPrepared = prepared;
return CompletableFuture.supplyAsync(() -> handleRequest(queryState, queryStartNanoTime, handler, queryOptions, statement, finalPrepared, requestStartMillisTime),
return CompletableFuture.supplyAsync(() ->
{
try
{
// at the time of the check, this includes the time spent in the NTR queue, basic query parsing/set up,
// and any time spent in the queue for the async stage
long elapsedTime = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS);
ClientMetrics.instance.recordAsyncQueueTime(elapsedTime, TimeUnit.NANOSECONDS);
if (elapsedTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS))
{
ClientMetrics.instance.markTimedOutBeforeAsyncProcessing();
throw new OverloadedException("Query timed out before it could start");
}
}
catch (Exception e)
{
return handleException(queryState, finalPrepared, e);
}
return handleRequest(queryState, queryStartNanoTime, handler, queryOptions, statement, finalPrepared, requestStartMillisTime);
},
asyncStage.get().executor());
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableMap;

import io.netty.buffer.ByteBuf;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryEvents;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
Expand Down Expand Up @@ -114,7 +118,26 @@ protected CompletableFuture<Response> maybeExecuteAsync(QueryState state, long q
if (asyncStage.isPresent())
{
CQLStatement finalStatement = statement;
return CompletableFuture.supplyAsync(() -> handleRequest(state, queryHandler, queryStartNanoTime, finalStatement, requestStartMillisTime),
return CompletableFuture.supplyAsync(() ->
{
try
{
// at the time of the check, this includes the time spent in the NTR queue, basic query parsing/set up,
// and any time spent in the queue for the async stage
long elapsedTime = elapsedTimeSinceCreation(TimeUnit.NANOSECONDS);
ClientMetrics.instance.recordAsyncQueueTime(elapsedTime, TimeUnit.NANOSECONDS);
if (elapsedTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS))
{
ClientMetrics.instance.markTimedOutBeforeAsyncProcessing();
throw new OverloadedException("Query timed out before it could start");
}
}
catch (Exception e)
{
return handleException(state, finalStatement, e);
}
return handleRequest(state, queryHandler, queryStartNanoTime, finalStatement, requestStartMillisTime);
},
asyncStage.get().executor());
}
else
Expand Down
Loading