Skip to content

Commit

Permalink
Merge pull request wso2#2083 from malakaganga/fix_passthrough_rem_dis…
Browse files Browse the repository at this point in the history
…card_master

Set New Buffer for Error Responses in Pipe
  • Loading branch information
malakaganga authored May 10, 2023
2 parents 0e7d377 + 1abf4eb commit 0656fd0
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.apache.synapse.transport.http.conn.SynapseDebugInfoHolder;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.apache.synapse.transport.passthru.config.PassThroughConfiguration;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.util.RelayUtils;

Expand All @@ -66,6 +67,8 @@ public class ClientWorker implements Runnable {
/** the axis2 message context of the request */
private MessageContext requestMessageContext;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response) {
this(targetConfiguration, outMsgCtx, response, Collections.emptyList());
}
Expand Down Expand Up @@ -239,7 +242,11 @@ public void run() {
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
}
try {

// If an error has happened in the request processing, consumes the data in pipe completely and discard it
// If the consumeAndDiscard property is set to true
if (response.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) {
RelayUtils.discardRequestMessage(requestMessageContext);
}
if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
if(cType == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SourceContext {

private SourceConfiguration sourceConfiguration;

private boolean isPipeMarkedToBeConsumed = false;
private boolean isSourceRequestMarkedToBeDiscarded = false;

private ProtocolState state = ProtocolState.REQUEST_READY;

Expand Down Expand Up @@ -72,12 +72,12 @@ public ProtocolState getState() {
return state;
}

public boolean isPipeMarkedToBeConsumed() {
return isPipeMarkedToBeConsumed;
public boolean isSourceRequestMarkedToBeDiscarded() {
return isSourceRequestMarkedToBeDiscarded;
}

public void setPipeMarkedToBeConsumed(boolean isPipeDiscarded) {
this.isPipeMarkedToBeConsumed = isPipeDiscarded;
public void setIsSourceRequestMarkedToBeDiscarded(boolean isPipeDiscarded) {
this.isSourceRequestMarkedToBeDiscarded = isPipeDiscarded;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void inputReady(NHttpServerConnection conn,
// inputReady is already called prior to suspendInput method is called in TargetHandler.
SourceContext sourceContext = (SourceContext)
conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);
if (sourceContext != null && sourceContext.isPipeMarkedToBeConsumed()) {
if (sourceContext != null && sourceContext.isSourceRequestMarkedToBeDiscarded()) {
return;
}
handleInvalidState(conn, "Request message body data received");
Expand Down Expand Up @@ -368,18 +368,23 @@ private void dropSourceConnection(NHttpServerConnection conn) {
public void responseReady(NHttpServerConnection conn) {
try {
ProtocolState protocolState = SourceContext.getState(conn);
if (protocolState.compareTo(ProtocolState.REQUEST_DONE) < 0) {
return;
}
SourceContext sourceContext = (SourceContext)
conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);

if (protocolState.compareTo(ProtocolState.CLOSING) >= 0) {
informWriterError(conn);
return;
}

if (protocolState != ProtocolState.REQUEST_DONE) {
handleInvalidState(conn, "Writing a response");
return;
if (sourceContext != null && sourceContext.isSourceRequestMarkedToBeDiscarded() && protocolState != ProtocolState.REQUEST_READY) {
SourceContext.updateState(conn, ProtocolState.REQUEST_DONE);
} else {
if (protocolState.compareTo(ProtocolState.REQUEST_DONE) < 0) {
return;
}
if (protocolState != ProtocolState.REQUEST_DONE) {
handleInvalidState(conn, "Writing a response");
return;
}
}

// because the duplex nature of http core we can reach hear without a actual response
Expand Down Expand Up @@ -443,21 +448,21 @@ public void outputReady(NHttpServerConnection conn,
if(protocolState == ProtocolState.WSDL_RESPONSE_DONE){
//decrement request count for wsdl responses
metrics.requestServed();
// we need to shut down if the shutdown flag is set
HttpContext context = conn.getContext();
ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(
// we need to shut down if the shutdown flag is set
HttpContext context = conn.getContext();
ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(
"synapse.response-source-buffer");
int bytesWritten = outBuf.produceContent(encoder);
int bytesWritten = outBuf.produceContent(encoder);
if (metrics != null && bytesWritten > 0) {
metrics.incrementBytesSent(bytesWritten);
}

conn.requestInput();
if(outBuf instanceof SimpleOutputBuffer && !((SimpleOutputBuffer)outBuf).hasData()){
sourceConfiguration.getSourceConnections().releaseConnection(conn);
sourceConfiguration.getSourceConnections().releaseConnection(conn);
}
endTransaction(conn);
return;
return;
}


Expand Down Expand Up @@ -509,7 +514,7 @@ public void outputReady(NHttpServerConnection conn,
} else {
bytesSent = response.write(conn, encoder);
}
if (encoder.isCompleted()) {
if (encoder.isCompleted()) {
HttpContext context = conn.getContext();
long departure = System.currentTimeMillis();
context.setAttribute(PassThroughConstants.RES_TO_CLIENT_WRITE_END_TIME,departure);
Expand All @@ -519,8 +524,8 @@ public void outputReady(NHttpServerConnection conn,
logCorrelationRoundTrip(context, request);
}
updateMetricsView(context);
}
endTransaction(conn);
}
endTransaction(conn);
metrics.incrementBytesSent(bytesSent);
} catch (IOException e) {
logIOException(conn, e);
Expand Down Expand Up @@ -573,7 +578,7 @@ else if (e.getMessage() != null) {
}

public void timeout(NHttpServerConnection conn) {
boolean isTimeoutOccurred = false;
boolean isTimeoutOccurred = false;
ProtocolState state = SourceContext.getState(conn);
Map<String, String> logDetails = getLoggingInfo(conn, state);

Expand Down Expand Up @@ -615,7 +620,7 @@ public void timeout(NHttpServerConnection conn) {
}
} else if (state == ProtocolState.REQUEST_DONE) {
informWriterError(conn);
isTimeoutOccurred = true;
isTimeoutOccurred = true;
metrics.timeoutOccured();
log.warn(
"STATE_DESCRIPTION = Socket Timeout occurred after accepting the request headers and the request "
Expand All @@ -634,9 +639,9 @@ public void timeout(NHttpServerConnection conn) {
SourceContext.updateState(conn, ProtocolState.CLOSED);

sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
if (isTimeoutOccurred) {
rollbackTransaction(conn);
}
if (isTimeoutOccurred) {
rollbackTransaction(conn);
}
}

public void closed(NHttpServerConnection conn) {
Expand All @@ -649,7 +654,7 @@ public void closed(NHttpServerConnection conn) {
getConnectionLoggingInfo(conn));
}
} else if (state == ProtocolState.REQUEST_BODY || state == ProtocolState.REQUEST_HEAD) {
isFault = true;
isFault = true;
informReaderError(conn);
log.warn("STATE_DESCRIPTION = Connection closed while server accepting request headers but prior to "
+ "finish reading the request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails
Expand All @@ -662,7 +667,7 @@ public void closed(NHttpServerConnection conn) {
logHttpRequestErrorInCorrelationLog(conn, "Connection Closed in " + state.name());
}
} else if (state == ProtocolState.RESPONSE_BODY || state == ProtocolState.RESPONSE_HEAD) {
isFault = true;
isFault = true;
informWriterError(conn);
log.warn("STATE_DESCRIPTION = Connection closed while server writing the response headers or body, "
+ "INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", "
Expand All @@ -673,7 +678,7 @@ public void closed(NHttpServerConnection conn) {
logHttpRequestErrorInCorrelationLog(conn, "Connection Closed in " + state.name());
}
} else if (state == ProtocolState.REQUEST_DONE) {
isFault = true;
isFault = true;
informWriterError(conn);
log.warn("STATE_DESCRIPTION = Connection closed after server accepting the request headers and the "
+ "request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", "
Expand All @@ -700,7 +705,7 @@ public void endOfInput(NHttpServerConnection conn) throws IOException {
}

public void exception(NHttpServerConnection conn, Exception ex) {
boolean isFault = false;
boolean isFault = false;
if (ex instanceof IOException) {
/*
* If the flow is SSE we have already set references to target connection and targetConnections
Expand Down Expand Up @@ -780,10 +785,10 @@ public void exception(NHttpServerConnection conn, Exception ex) {
sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
isFault = true;
}
if (isFault) {
rollbackTransaction(conn);
}

if (isFault) {
rollbackTransaction(conn);
}
}

private Map<String, String> getLoggingInfo(NHttpServerConnection conn, ProtocolState state) {
Expand Down Expand Up @@ -947,42 +952,42 @@ public SourceRequest getSourceRequest(NHttpServerConnection conn) throws IOExcep
metrics.incrementMessagesReceived();
return request;
}
private void rollbackTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.rollbackTransaction(false,
serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private void endTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.endTransaction(false, serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private String getConnectionLoggingInfo(NHttpServerConnection conn) {

private void rollbackTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.rollbackTransaction(false,
serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private void endTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.endTransaction(false, serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private String getConnectionLoggingInfo(NHttpServerConnection conn) {
if (conn instanceof LoggingNHttpServerConnection) {
IOSession session = ((LoggingNHttpServerConnection) conn).getIOSession();
if (session != null) {
return " Remote Address : " + session.getRemoteAddress();
}
}
return "";
return "";
}

private String getClientConnectionInfo(NHttpServerConnection conn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.connections.HostConnections;
import org.apache.synapse.transport.passthru.jmx.PassThroughTransportMetricsCollector;
import org.apache.synapse.transport.passthru.util.RelayUtils;

import java.io.IOException;
import java.net.SocketAddress;
Expand Down Expand Up @@ -424,9 +423,8 @@ public void responseReceived(NHttpClientConnection conn) {
sourceConn.suspendInput();
SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);
if (sourceContext != null) {
sourceContext.setPipeMarkedToBeConsumed(true);
sourceContext.setIsSourceRequestMarkedToBeDiscarded(true);
}
SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE);
SourceContext.get(sourceConn).setShutDown(true);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ public interface PassThroughConfigPNames {
*/
public String DISABLE_KEEPALIVE = "http.connection.disable.keepalive";


/**
* Define property to mark If an error has happened in the request processing,
* should consumes the data in pipe completely and discard.
*/
public String CONSUME_AND_DISCARD = "consume_and_discard";

/**
* Defines the time interval for idle connection removal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class PassThroughConfiguration {
private static final int DEFAULT_CONNECTION_GRACE_TIME = 10000;
private Boolean isKeepAliveDisabled = null;

private Boolean isConsumeAndDiscard = true;

//additional rest dispatch handlers
private static final String REST_DISPATCHER_SERVICE="rest.dispatcher.service";
// URI configurations that determine if it requires custom rest dispatcher
Expand Down Expand Up @@ -121,6 +123,13 @@ public boolean isKeepAliveDisabled() {
return isKeepAliveDisabled;
}

public boolean isConsumeAndDiscard() {
isConsumeAndDiscard =
ConfigurationBuilderUtil.getBooleanProperty(PassThroughConfigPNames.CONSUME_AND_DISCARD,
true, props);
return isConsumeAndDiscard;
}

public int getMaxActiveConnections() {
return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.C_MAX_ACTIVE, DEFAULT_MAX_ACTIVE_CON,
props);
Expand Down
Loading

0 comments on commit 0656fd0

Please sign in to comment.