Skip to content

Commit

Permalink
Merge pull request #4976 from RenxuanW/6.3-another
Browse files Browse the repository at this point in the history
Replace lower priority txn request when limit is hit (release-6.3).
  • Loading branch information
RenxuanW authored Jun 17, 2021
2 parents dacff5a + 98b1b61 commit c3ccd7d
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 16 deletions.
12 changes: 12 additions & 0 deletions fdbclient/Schemas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"p95":0.0,
"p99":0.0,
"p99.9":0.0
},
"batch":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
}
},
"read_latency_statistics":{
Expand Down
65 changes: 56 additions & 9 deletions fdbserver/MasterProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ struct ProxyStats {
LatencySample batchTxnGRVTimeInQueue;

LatencySample commitLatencySample;
LatencySample grvLatencySample;
LatencySample grvLatencySample; // GRV latency metric sample of default priority
LatencySample grvBatchLatencySample; // GRV latency metric sample of batched priority

LatencyBands commitLatencyBands;
LatencyBands grvLatencyBands;
Expand Down Expand Up @@ -179,6 +180,10 @@ struct ProxyStats {
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvBatchLatencySample("GRVBatchLatencyMetrics",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitBatchingWindowSize("CommitBatchingWindowSize",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
Expand Down Expand Up @@ -387,6 +392,31 @@ ACTOR Future<Void> getRate(UID myID,
}
}

// Respond with an unreadable version to the GetReadVersion request when the GRV limit is hit.
void proxyGRVThresholdExceeded(const GetReadVersionRequest* req, ProxyStats* stats) {
++stats->txnRequestErrors;
// FIXME: send an error instead of giving an unreadable version when the client can support the error:
// req.reply.sendError(proxy_memory_limit_exceeded());
GetReadVersionReply rep;
rep.version = 1;
rep.locked = true;
req->reply.send(rep);
if (req->priority == TransactionPriority::IMMEDIATE) {
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededSystem").suppressFor(60);
} else if (req->priority == TransactionPriority::DEFAULT) {
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededDefault").suppressFor(60);
} else {
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededBatch").suppressFor(60);
}
}

// Drop a GetReadVersion request from a queue, by responding an error to the request.
void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, ProxyStats* stats) {
proxyGRVThresholdExceeded(&queue->front(), stats);
queue->pop_front();
}

// Put a GetReadVersion request into the queue corresponding to its priority.
ACTOR Future<Void> queueTransactionStartRequests(Reference<AsyncVar<ServerDBInfo>> db,
Deque<GetReadVersionRequest>* systemQueue,
Deque<GetReadVersionRequest>* defaultQueue,
Expand All @@ -402,16 +432,30 @@ ACTOR Future<Void> queueTransactionStartRequests(Reference<AsyncVar<ServerDBInfo
loop choose {
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
bool canBeQueued = true;
if (stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() >
SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE) {
++stats->txnRequestErrors;
// FIXME: send an error instead of giving an unreadable version when the client can support the error:
// req.reply.sendError(proxy_memory_limit_exceeded());
GetReadVersionReply rep;
rep.version = 1;
rep.locked = true;
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
// When the limit is hit, try to drop requests from the lower priority queues.
if (req.priority == TransactionPriority::BATCH) {
canBeQueued = false;
} else if (req.priority == TransactionPriority::DEFAULT) {
if (!batchQueue->empty()) {
dropRequestFromQueue(batchQueue, stats);
} else {
canBeQueued = false;
}
} else {
if (!batchQueue->empty()) {
dropRequestFromQueue(batchQueue, stats);
} else if (!defaultQueue->empty()) {
dropRequestFromQueue(defaultQueue, stats);
} else {
canBeQueued = false;
}
}
}
if (!canBeQueued) {
proxyGRVThresholdExceeded(&req, stats);
} else {
stats->addRequest(req.transactionCount);
// TODO: check whether this is reasonable to do in the fast path
Expand Down Expand Up @@ -1758,6 +1802,9 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
double end = g_network->timer();
for (GetReadVersionRequest const& request : requests) {
double duration = end - request.requestTime();
if (request.priority == TransactionPriority::BATCH) {
stats->grvBatchLatencySample.addMeasurement(duration);
}
if (request.priority == TransactionPriority::DEFAULT) {
stats->grvLatencySample.addMeasurement(duration);
}
Expand Down
28 changes: 21 additions & 7 deletions fdbserver/Status.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,21 @@ struct RolesInfo {
obj["id"] = iface.id().shortString();
obj["role"] = role;
try {
JsonBuilderObject priorityStats;

// GRV Latency metrics are grouped according to priority (currently batch or default).
// Other priorities can be added in the future.
TraceEventFields const& grvLatencyMetrics = metrics.at("GRVLatencyMetrics");
if (grvLatencyMetrics.size()) {
JsonBuilderObject priorityStats;
// We only report default priority now, but this allows us to add other priorities if we want them
priorityStats["default"] = addLatencyStatistics(grvLatencyMetrics);
}
TraceEventFields const& grvBatchMetrics = metrics.at("GRVBatchLatencyMetrics");
if (grvBatchMetrics.size()) {
priorityStats["batch"] = addLatencyStatistics(grvBatchMetrics);
}

// Add GRV Latency metrics (for all priorities) to parent node.
if (priorityStats.size()) {
obj["grv_latency_statistics"] = priorityStats;
}

Expand Down Expand Up @@ -1764,11 +1774,15 @@ ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetri
ACTOR static Future<vector<std::pair<MasterProxyInterface, EventMap>>> getProxiesAndMetrics(
Reference<AsyncVar<ServerDBInfo>> db,
std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<std::pair<MasterProxyInterface, EventMap>> results = wait(
getServerMetrics(db->get().client.proxies,
address_workers,
std::vector<std::string>{
"GRVLatencyMetrics", "CommitLatencyMetrics", "GRVLatencyBands", "CommitLatencyBands", "CommitBatchingWindowSize" }));
vector<std::pair<MasterProxyInterface, EventMap>> results =
wait(getServerMetrics(db->get().client.proxies,
address_workers,
std::vector<std::string>{ "GRVLatencyMetrics",
"GRVBatchLatencyMetrics",
"CommitLatencyMetrics",
"GRVLatencyBands",
"CommitLatencyBands",
"CommitBatchingWindowSize" }));

return results;
}
Expand Down

0 comments on commit c3ccd7d

Please sign in to comment.