Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async decisions #372

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions lib/request-execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class RequestExecution {
this._connection = connection;
this._host = host;
} catch (err) {
return this._parent.handleNoHostAvailable(err, this);
this._parent.handleNoHostAvailable(err, this);
return;
}

// It could be a new connection from the pool, we should make sure it's in the correct keyspace.
Expand All @@ -126,7 +127,8 @@ class RequestExecution {
} catch (err) {
// When its a socket error, attempt to retry.
// Otherwise, rethrow the error to the user.
return this._handleError(err, RequestExecution._getErrorCode(err));
this._handleError(err, RequestExecution._getErrorCode(err));
return;
}
}

Expand Down Expand Up @@ -157,11 +159,12 @@ class RequestExecution {
}

if (errorCode !== errorCodes.none) {
return this._handleError(errorCode, err);
this._handleError(errorCode, err);
return;
}

if (response.schemaChange) {
return promiseUtils.toBackground(
promiseUtils.toBackground(
this._parent.client
.handleSchemaAgreementAndRefresh(this._connection, response.schemaChange)
.then(agreement => {
Expand All @@ -173,6 +176,7 @@ class RequestExecution {
this._parent.setCompleted(null, this._getResultSet(response, agreement));
})
);
return;
}

if (response.keyspaceSet) {
Expand Down Expand Up @@ -279,12 +283,13 @@ class RequestExecution {
return this._cancelled;
}

_handleError(errorCode, err) {
async _handleError(errorCode, err) {
this._parent.triedHosts[this._host.address] = err;
err['coordinator'] = this._host.address;

if (errorCode === errorCodes.serverErrorUnprepared) {
return this._prepareAndRetry(err.queryId);
this._prepareAndRetry(err.queryId);
return;
}

if (errorCode === errorCodes.socketError || errorCode === errorCodes.socketErrorBeforeRequestWritten) {
Expand All @@ -294,13 +299,18 @@ class RequestExecution {
this._host.checkHealth(this._connection);
}

const decisionInfo = this._getDecision(errorCode, err);
let decisionInfo = this._getDecision(errorCode, err);

if(decisionInfo.then) {
decisionInfo = await decisionInfo;
}

if (!decisionInfo || decisionInfo.decision === retry.RetryPolicy.retryDecision.rethrow) {
if (this._request instanceof requests.QueryRequest || this._request instanceof requests.ExecuteRequest) {
err['query'] = this._request.query;
}
return this._parent.setCompleted(err);
this._parent.setCompleted(err);
return;
}

const metrics = this._parent.client.metrics;
Expand All @@ -309,12 +319,13 @@ class RequestExecution {
metrics.onIgnoreError(err);

// Return an empty ResultSet
return this._parent.setCompleted(null, this._getResultSet(utils.emptyObject));
this._parent.setCompleted(null, this._getResultSet(utils.emptyObject));
return;
}

RequestExecution._invokeMetricsHandlerForRetry(errorCode, metrics, err);

return this._retry(decisionInfo.consistency, decisionInfo.useCurrentHost);
this._retry(decisionInfo.consistency, decisionInfo.useCurrentHost);
}

/**
Expand Down Expand Up @@ -436,10 +447,12 @@ class RequestExecution {
// All connections are busy (`BusyConnectionError`) or there isn't a ready connection in the pool (`Error`)
// The retry policy declared the intention to retry on the current host but its not available anymore.
// Use the next host
return promiseUtils.toBackground(this.restart());
promiseUtils.toBackground(this.restart());
return;
}

return this._sendOnConnection();
this._sendOnConnection();
return;
}

// Use the next host in the query plan to send the request in the background
Expand All @@ -462,15 +475,17 @@ class RequestExecution {
const info = this._parent.client.metadata.getPreparedById(queryId);

if (!info) {
return this._parent.setCompleted(new errors.DriverInternalError(
this._parent.setCompleted(new errors.DriverInternalError(
`Unprepared response invalid, id: 0x${queryId.toString('hex')}`));
return;
}

const version = this._connection.protocolVersion;

if (!types.protocolVersion.supportsKeyspaceInRequest(version) && info.keyspace && info.keyspace !== connection.keyspace) {
return this._parent.setCompleted(
this._parent.setCompleted(
new Error(`Query was prepared on keyspace ${info.keyspace}, can't execute it on ${connection.keyspace} (${info.query})`));
return;
}

const self = this;
Expand All @@ -483,7 +498,8 @@ class RequestExecution {

// There was a failure re-preparing on this connection.
// Execute the original request on the next connection and forget about the PREPARE-UNPREPARE flow.
return self._retry(undefined, false);
self._retry(undefined, false);
return;
}

// It's possible that when re-preparing we got new metadata (i.e. if schema changed), update cache.
Expand Down