Skip to content

Commit

Permalink
make rate limit error code configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Feb 8, 2024
1 parent 42aa91c commit 089cf84
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.slack.kaldb.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -33,18 +33,25 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) {
this.rateLimitExceededErrorCode = 400;
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
}

@Post("/_bulk")
Expand Down Expand Up @@ -77,7 +84,8 @@ public HttpResponse addDocument(String bulkRequest) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
future.complete(
HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), response));
return HttpResponse.of(future);
}
}
Expand Down
6 changes: 5 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ private static Set<Service> getServices(
services.add(datasetRateLimitingService);

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
6 changes: 6 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,10 @@ message PreprocessorConfig {
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;

// Make the rate limit exceeded error code confugurable
// We default to 400 to prioritize fresh logs and drop excess logs
// Set this to 429 for clients to retry the request after a delay
// Only used when we use the bulk API
int32 rate_limit_exceeded_error_code = 11;
}
103 changes: 56 additions & 47 deletions kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer;
import com.slack.kaldb.bulkIngestApi.BulkIngestResponse;
import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.metadata.core.CuratorBuilder;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
import com.slack.kaldb.preprocessor.PreprocessorRateLimiter;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.testlib.MetricsUtil;
import com.slack.kaldb.testlib.TestKafkaServer;
import com.slack.kaldb.util.JsonUtil;
import com.slack.kaldb.util.TestingZKServer;
import com.slack.service.murron.trace.Trace;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
Expand All @@ -31,8 +34,8 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -67,7 +70,7 @@ public void bootstrapCluster() throws Exception {
Tracing.newBuilder().build();
meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

zkServer = new TestingServer();
zkServer = TestingZKServer.createTestingServer();
KaldbConfigs.ZookeeperConfig zkConfig =
KaldbConfigs.ZookeeperConfig.newBuilder()
.setZkConnectString(zkServer.getConnectString())
Expand Down Expand Up @@ -122,7 +125,8 @@ public void bootstrapCluster() throws Exception {
datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION);
bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);

bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
bulkApi =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, 400);
}

// I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and
Expand Down Expand Up @@ -195,7 +199,12 @@ public void testBulkApiBasic() throws Exception {
{ "index": {"_index": "testindex", "_id": "1"} }
{ "field1" : "value1" }
""";
updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length);
// use the way we calculate the throughput in the rate limiter to get the exact bytes
Map<String, List<Trace.Span>> docs =
BulkApiRequestParser.parseRequest(request1.getBytes(StandardCharsets.UTF_8));
int limit = PreprocessorRateLimiter.getSpanBytes(docs.get("testindex"));
// for some reason if we pass the exact limit, the rate limiter doesn't work as expected
updateDatasetThroughput(limit / 2);

// test with empty causes a parse exception
AggregatedHttpResponse response = bulkApi.addDocument("{}\n").aggregate().join();
Expand All @@ -208,49 +217,30 @@ public void testBulkApiBasic() throws Exception {

// test with request1 twice. first one should succeed, second one will fail because of rate
// limiter
CompletableFuture<AggregatedHttpResponse> response1 =
bulkApi
.addDocument(request1)
.aggregate()
.thenApply(
httpResponse -> {
assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
assertThat(httpResponse.status().code()).isEqualTo(OK.code());
BulkIngestResponse httpResponseObj = null;
try {
httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
} catch (IOException e) {
fail("", e);
}
assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
return httpResponse;
});

CompletableFuture<AggregatedHttpResponse> response2 =
bulkApi
.addDocument(request1)
.aggregate()
.thenApply(
httpResponse -> {
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
BulkIngestResponse httpResponseObj = null;
try {
httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
} catch (IOException e) {
fail("", e);
}
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
return httpResponse;
});

await().until(response1::isDone);
await().until(response2::isDone);
AggregatedHttpResponse httpResponse = bulkApi.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
assertThat(httpResponse.status().code()).isEqualTo(OK.code());
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
} catch (IOException e) {
fail("", e);
}

httpResponse = bulkApi.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(400);
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
} catch (IOException e) {
fail("", e);
}

// test with multiple indexes
String request2 =
Expand All @@ -267,6 +257,25 @@ public void testBulkApiBasic() throws Exception {
assertThat(responseObj.totalDocs()).isEqualTo(0);
assertThat(responseObj.failedDocs()).isEqualTo(0);
assertThat(responseObj.errorMsg()).isEqualTo("request must contain only 1 unique index");

BulkIngestApi bulkApi2 =
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
TOO_MANY_REQUESTS.code());
httpResponse = bulkApi2.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
} catch (IOException e) {
fail("", e);
}
}

@Test
Expand Down
34 changes: 34 additions & 0 deletions kaldb/src/test/java/com/slack/kaldb/util/TestingZKServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.slack.kaldb.util;

import static org.assertj.core.api.Assertions.fail;

import org.apache.curator.test.TestingServer;

/**
* This class is responsible for creating a testing ZK server for testing purposes. We create the ZK
* server in a separate thread to avoid blocking the main thread. This improves the reliability of
* the tests i.e I can put a debug point while running a test and ZKServer won't get blocked.
* ZkServer getting blocked leads to a session expiry which will cause curator(the client) to
* disconnect and call the runtime halter
*/
public class TestingZKServer {

public static TestingServer createTestingServer() throws InterruptedException {
ZKTestingServer zkTestingServer = new ZKTestingServer();
Thread.ofVirtual().start(zkTestingServer).join();
return zkTestingServer.zkServer;
}

private static class ZKTestingServer implements Runnable {
public TestingServer zkServer;

@Override
public void run() {
try {
zkServer = new TestingServer();
} catch (Exception e) {
fail("Failed to start ZK server", e);
}
}
}
}

0 comments on commit 089cf84

Please sign in to comment.