Skip to content

Commit

Permalink
Merge pull request #143 from Netflix/2167-source-job-has-incorrect-su…
Browse files Browse the repository at this point in the history
…bscriptions

RTDI-2167 source job has incorrect subscriptions
  • Loading branch information
codyrioux authored Mar 10, 2022
2 parents d048a3d + 1f24186 commit de88e88
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public class MantisSSEConstants {
public static final String ID = "id";

public static final String MQL = "mql";

public static final String TARGET_JOB = "sourceJobName";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import io.reactivx.mantis.operators.DropOperator;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -142,6 +143,43 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request,

String uniqueClientId = socketAddrStr;

Tag[] tags = new Tag[2];
final String clientId = Optional.ofNullable(uniqueClientId).orElse("none");
final String sockAddr = Optional.ofNullable(socketAddrStr).orElse("none");
tags[0] = new BasicTag("clientId", clientId);
tags[1] = new BasicTag("sockAddr", sockAddr);

Metrics sseSinkMetrics = new Metrics.Builder()
.id("ServerSentEventRequestHandler", tags)
.addCounter("processedCounter")
.addCounter("pingCounter")
.addCounter("errorCounter")
.addCounter("droppedCounter")
.addCounter("flushCounter")
.addCounter("sourceJobNameMismatchRejection")
.build();


final Counter msgProcessedCounter = sseSinkMetrics.getCounter("processedCounter");
final Counter pingCounter = sseSinkMetrics.getCounter("pingCounter");
final Counter errorCounter = sseSinkMetrics.getCounter("errorCounter");
final Counter droppedWrites = sseSinkMetrics.getCounter("droppedCounter");
final Counter flushCounter = sseSinkMetrics.getCounter("flushCounter");
final Counter sourceJobNameMismatchRejectionCounter = sseSinkMetrics.getCounter("sourceJobNameMismatchRejection");


if (queryParameters != null && queryParameters.containsKey(MantisSSEConstants.TARGET_JOB)) {
String targetJob = queryParameters.get(MantisSSEConstants.TARGET_JOB).get(0);
String currentJob = this.context.getWorkerInfo().getJobClusterName();
if (!currentJob.equalsIgnoreCase(targetJob)) {
LOG.info("Rejecting connection from {}. Client is targeting job {} but this is job {}.", uniqueClientId, targetJob, currentJob);
sourceJobNameMismatchRejectionCounter.increment();
response.setStatus(HttpResponseStatus.BAD_REQUEST);
response.writeStringAndFlush("data: " + MantisSSEConstants.TARGET_JOB + " is " + targetJob + " but this is " + currentJob + "." + TWO_NEWLINES);
return response.close();
}
}

if (queryParameters != null && queryParameters.containsKey(CLIENT_ID_PARAM)) {
// enablePings
uniqueClientId = queryParameters.get(CLIENT_ID_PARAM).get(0);
Expand Down Expand Up @@ -202,28 +240,6 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
? queryParameters.get(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER).get(0).getBytes()
: null;

Tag[] tags = new Tag[2];
final String clientId = Optional.ofNullable(uniqueClientId).orElse("none");
final String sockAddr = Optional.ofNullable(socketAddrStr).orElse("none");
tags[0] = new BasicTag("clientId", clientId);
tags[1] = new BasicTag("sockAddr", sockAddr);

Metrics sseSinkMetrics = new Metrics.Builder()
.id("ServerSentEventRequestHandler", tags)
.addCounter("processedCounter")
.addCounter("pingCounter")
.addCounter("errorCounter")
.addCounter("droppedCounter")
.addCounter("flushCounter")
.build();


final Counter msgProcessedCounter = sseSinkMetrics.getCounter("processedCounter");
final Counter pingCounter = sseSinkMetrics.getCounter("pingCounter");
final Counter errorCounter = sseSinkMetrics.getCounter("errorCounter");
final Counter droppedWrites = sseSinkMetrics.getCounter("droppedCounter");
final Counter flushCounter = sseSinkMetrics.getCounter("flushCounter");

// get predicate, defaults to return true for all T
Func1<T, Boolean> filterFunction = new Func1<T, Boolean>() {
@Override
Expand Down

0 comments on commit de88e88

Please sign in to comment.