Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make rate limit error code configurable #765

Merged
merged 1 commit into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,4 @@ preprocessorConfig:
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
rateLimitExceededErrorCode: ${KALDB_PREPROCESSOR_RATE_LIMIT_EXCEEDED_ERROR_CODE:-400}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.slack.kaldb.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -33,18 +33,25 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) {
this.rateLimitExceededErrorCode = 400;
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
}

@Post("/_bulk")
Expand Down Expand Up @@ -77,7 +84,8 @@ public HttpResponse addDocument(String bulkRequest) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
future.complete(
HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), response));
return HttpResponse.of(future);
}
}
Expand Down
6 changes: 5 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ private static Set<Service> getServices(
services.add(datasetRateLimitingService);

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
6 changes: 6 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,10 @@ message PreprocessorConfig {
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;

// Make the rate limit exceeded error code confugurable
// We default to 400 to prioritize fresh logs and drop excess logs
// Set this to 429 for clients to retry the request after a delay
// Only used when we use the bulk API
int32 rate_limit_exceeded_error_code = 11;
}
103 changes: 56 additions & 47 deletions kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer;
import com.slack.kaldb.bulkIngestApi.BulkIngestResponse;
import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.metadata.core.CuratorBuilder;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
import com.slack.kaldb.preprocessor.PreprocessorRateLimiter;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.testlib.MetricsUtil;
import com.slack.kaldb.testlib.TestKafkaServer;
import com.slack.kaldb.util.JsonUtil;
import com.slack.kaldb.util.TestingZKServer;
import com.slack.service.murron.trace.Trace;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
Expand All @@ -31,8 +34,8 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -67,7 +70,7 @@ public void bootstrapCluster() throws Exception {
Tracing.newBuilder().build();
meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

zkServer = new TestingServer();
zkServer = TestingZKServer.createTestingServer();
KaldbConfigs.ZookeeperConfig zkConfig =
KaldbConfigs.ZookeeperConfig.newBuilder()
.setZkConnectString(zkServer.getConnectString())
Expand Down Expand Up @@ -122,7 +125,8 @@ public void bootstrapCluster() throws Exception {
datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION);
bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);

bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
bulkApi =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, 400);
}

// I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and
Expand Down Expand Up @@ -195,7 +199,12 @@ public void testBulkApiBasic() throws Exception {
{ "index": {"_index": "testindex", "_id": "1"} }
{ "field1" : "value1" }
""";
updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length);
// use the way we calculate the throughput in the rate limiter to get the exact bytes
Map<String, List<Trace.Span>> docs =
BulkApiRequestParser.parseRequest(request1.getBytes(StandardCharsets.UTF_8));
int limit = PreprocessorRateLimiter.getSpanBytes(docs.get("testindex"));
// for some reason if we pass the exact limit, the rate limiter doesn't work as expected
updateDatasetThroughput(limit / 2);

// test with empty causes a parse exception
AggregatedHttpResponse response = bulkApi.addDocument("{}\n").aggregate().join();
Expand All @@ -208,49 +217,30 @@ public void testBulkApiBasic() throws Exception {

// test with request1 twice. first one should succeed, second one will fail because of rate
// limiter
CompletableFuture<AggregatedHttpResponse> response1 =
bulkApi
.addDocument(request1)
.aggregate()
.thenApply(
httpResponse -> {
assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
assertThat(httpResponse.status().code()).isEqualTo(OK.code());
BulkIngestResponse httpResponseObj = null;
try {
httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
} catch (IOException e) {
fail("", e);
}
assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
return httpResponse;
});

CompletableFuture<AggregatedHttpResponse> response2 =
bulkApi
.addDocument(request1)
.aggregate()
.thenApply(
httpResponse -> {
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
BulkIngestResponse httpResponseObj = null;
try {
httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
} catch (IOException e) {
fail("", e);
}
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
return httpResponse;
});

await().until(response1::isDone);
await().until(response2::isDone);
AggregatedHttpResponse httpResponse = bulkApi.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
assertThat(httpResponse.status().code()).isEqualTo(OK.code());
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
} catch (IOException e) {
fail("", e);
}

httpResponse = bulkApi.addDocument(request1).aggregate().join();
vthacker marked this conversation as resolved.
Show resolved Hide resolved
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(400);
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
} catch (IOException e) {
fail("", e);
}

// test with multiple indexes
String request2 =
Expand All @@ -267,6 +257,25 @@ public void testBulkApiBasic() throws Exception {
assertThat(responseObj.totalDocs()).isEqualTo(0);
assertThat(responseObj.failedDocs()).isEqualTo(0);
assertThat(responseObj.errorMsg()).isEqualTo("request must contain only 1 unique index");

BulkIngestApi bulkApi2 =
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
TOO_MANY_REQUESTS.code());
httpResponse = bulkApi2.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
} catch (IOException e) {
fail("", e);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public void testParseKaldbJsonConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);
assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false);
assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(400);

final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
config.getPreprocessorConfig().getKafkaConfig();
Expand Down Expand Up @@ -476,6 +477,7 @@ public void testParseKaldbYamlConfigFile() throws IOException {
assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");

assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true);
assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(429);

final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
Expand Down
34 changes: 34 additions & 0 deletions kaldb/src/test/java/com/slack/kaldb/util/TestingZKServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.slack.kaldb.util;

import static org.assertj.core.api.Assertions.fail;

import org.apache.curator.test.TestingServer;

/**
* This class is responsible for creating a testing ZK server for testing purposes. We create the ZK
* server in a separate thread to avoid blocking the main thread. This improves the reliability of
* the tests i.e I can put a debug point while running a test and ZKServer won't get blocked.
* ZkServer getting blocked leads to a session expiry which will cause curator(the client) to
* disconnect and call the runtime halter
*/
public class TestingZKServer {

public static TestingServer createTestingServer() throws InterruptedException {
ZKTestingServer zkTestingServer = new ZKTestingServer();
Thread.ofVirtual().start(zkTestingServer).join();
return zkTestingServer.zkServer;
}

private static class ZKTestingServer implements Runnable {
public TestingServer zkServer;

@Override
public void run() {
try {
zkServer = new TestingServer();
} catch (Exception e) {
fail("Failed to start ZK server", e);
}
}
}
}
3 changes: 2 additions & 1 deletion kaldb/src/test/resources/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@
"dataTransformer": "api_log",
"rateLimiterMaxBurstSeconds": 2,
"bootstrapServers": "localhost:9092",
"useBulkApi": false
"useBulkApi": false,
"rateLimitExceededErrorCode": 400
},
"clusterConfig": {
"clusterName": "test_kaldb_json_cluster",
Expand Down
1 change: 1 addition & 0 deletions kaldb/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ preprocessorConfig:
rateLimiterMaxBurstSeconds: 2
bootstrapServers: localhost:9092
useBulkApi: true
rateLimitExceededErrorCode: 429

clusterConfig:
clusterName: "test_kaldb_cluster"
Expand Down
Loading