Skip to content

Commit

Permalink
Merge pull request #3919 from Agnul97/fix-dataStoreApi
Browse files Browse the repository at this point in the history
FIX - LimitExceeded value on Data Metric/Message/Client/Channel GET APIs when more than 10k documents stored
  • Loading branch information
Coduz authored Dec 11, 2023
2 parents 25e12b2 + c6c0bd5 commit 469aee2
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,20 @@ Feature: Datastore tests
Then The message list "MessageInfo" have limitExceed value false
And I delete all indices

Scenario: Create 10k messages and more, test if limitExceeded parameter is right when doing queries

Given I login as user with name "kapua-sys" and password "kapua-password"
And I select account "kapua-sys"
When I store 10000 messages in bulk mode to the index "1-data-message-2023-1"
And I refresh all indices
When I query for the current account messages with limit 10000 and offset 0 and store them as "MessageInfo"
Then The message list "MessageInfo" have limitExceed value false
When I store 1 messages in bulk mode to the index "1-data-message-2023-1"
And I refresh all indices
When I query for the current account messages with limit 10000 and offset 0 and store them as "MessageInfo"
Then The message list "MessageInfo" have limitExceed value true
And I delete all indices

@teardown
Scenario: Stop full docker environment
Given Stop full docker environment
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class ResultList<T> {

private final List<T> result;
private final long totalCount;
private boolean totalHitsExceedsCount; //true iff in ES there are actually more than 10k hits

/**
* Constructor.
Expand All @@ -48,6 +49,14 @@ public void add(T object) {
result.add(object);
}

public void setTotalHitsExceedsCount(boolean value) {
this.totalHitsExceedsCount=value;
}

public boolean getTotalHitsExceedsCount() {
return this.totalHitsExceedsCount;
}

/**
* Gets the {@link List} of results.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ private ElasticsearchKeywords() {
static final String KEY_HITS = "hits";
static final String KEY_TOTAL = "total";
static final String KEY_VALUE = "value";
static final String KEY_RELATION = "relation";
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,18 @@ public <T> ResultList<T> query(TypeDescriptor typeDescriptor, Object query, Clas

long totalCount = 0;
ArrayNode resultsNode = null;
String totalRelation = null;

Request request = new Request(ElasticsearchKeywords.ACTION_GET, ElasticsearchResourcePaths.search(typeDescriptor));
request.setJsonEntity(json);
Response queryResponse = restCallTimeoutHandler(() -> getClient().performRequest(request), typeDescriptor.getIndex(), "QUERY");

if (isRequestSuccessful(queryResponse)) {
JsonNode responseNode = readResponseAsJsonNode(queryResponse);

JsonNode hitsNode = responseNode.path(ElasticsearchKeywords.KEY_HITS);

totalCount = hitsNode.path(ElasticsearchKeywords.KEY_TOTAL).path(ElasticsearchKeywords.KEY_VALUE).asLong();
totalRelation = hitsNode.path(ElasticsearchKeywords.KEY_TOTAL).path(ElasticsearchKeywords.KEY_RELATION).asText();
if (totalCount > Integer.MAX_VALUE) {
throw new ClientException(ClientErrorCodes.ACTION_ERROR, CLIENT_HITS_MAX_VALUE_EXCEEDED);
}
Expand All @@ -262,6 +265,9 @@ public <T> ResultList<T> query(TypeDescriptor typeDescriptor, Object query, Clas
}

ResultList<T> resultList = new ResultList<>(totalCount);
if (totalRelation != null) {
resultList.setTotalHitsExceedsCount(!totalRelation.equals("eq"));
}
Object queryFetchStyle = getModelConverter().getFetchStyle(query);
if (resultsNode != null && !resultsNode.isEmpty()) {
for (JsonNode result : resultsNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ protected ElasticsearchClient<?> getElasticsearchClient() throws ClientUnavailab
return DatastoreClientFactory.getElasticsearchClient();
}

protected <T extends Storable> void setLimitExceed(StorableQuery query, StorableListResult<T> list) {
protected <T extends Storable> void setLimitExceed(StorableQuery query, boolean hitsExceedsTotalCount, StorableListResult<T> list) {
int offset = query.getOffset() != null ? query.getOffset() : 0;
if (query.getLimit() != null && list.getTotalCount() > offset + query.getLimit()) {
list.setLimitExceeded(true);
if (query.getLimit() != null) {
if (hitsExceedsTotalCount || //pre-condition: there are more than 10k documents in ES && query limit is <= 10k
list.getTotalCount() > offset + query.getLimit()) {
list.setLimitExceeded(true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.kapua.service.datastore.model.ChannelInfoListResult;
import org.eclipse.kapua.service.datastore.model.query.ChannelInfoQuery;
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
Expand Down Expand Up @@ -193,8 +194,9 @@ public ChannelInfoListResult query(ChannelInfoQuery query) throws KapuaIllegalAr

String indexName = SchemaUtil.getChannelIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
ChannelInfoListResult result = new ChannelInfoListResultImpl(getElasticsearchClient().query(typeDescriptor, query, ChannelInfo.class));
setLimitExceed(query, result);
ResultList<ChannelInfo> rl = getElasticsearchClient().query(typeDescriptor, query, ChannelInfo.class);
ChannelInfoListResult result = new ChannelInfoListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.kapua.service.datastore.model.ClientInfoListResult;
import org.eclipse.kapua.service.datastore.model.query.ClientInfoQuery;
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
Expand Down Expand Up @@ -188,8 +189,9 @@ public ClientInfoListResult query(ClientInfoQuery query) throws KapuaIllegalArgu

String indexName = SchemaUtil.getClientIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ClientInfoSchema.CLIENT_TYPE_NAME);
ClientInfoListResultImpl result = new ClientInfoListResultImpl(getElasticsearchClient().query(typeDescriptor, query, ClientInfo.class));
setLimitExceed(query, result);
ResultList<ClientInfo> rl = getElasticsearchClient().query(typeDescriptor, query, ClientInfo.class);
ClientInfoListResult result = new ClientInfoListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,9 @@ public MessageListResult query(MessageQuery query)

String dataIndexName = SchemaUtil.getDataIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(dataIndexName, MessageSchema.MESSAGE_TYPE_NAME);
MessageListResult result = new MessageListResultImpl(getElasticsearchClient().query(typeDescriptor, query, DatastoreMessage.class));
setLimitExceed(query, result);
ResultList<DatastoreMessage> rl = getElasticsearchClient().query(typeDescriptor, query, DatastoreMessage.class);
MessageListResult result = new MessageListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.BulkUpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.BulkUpdateResponse;
import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
Expand Down Expand Up @@ -257,8 +258,9 @@ public MetricInfoListResult query(MetricInfoQuery query) throws KapuaIllegalArgu

String indexNme = SchemaUtil.getMetricIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(indexNme, MetricInfoSchema.METRIC_TYPE_NAME);
MetricInfoListResult result = new MetricInfoListResultImpl(getElasticsearchClient().query(typeDescriptor, query, MetricInfo.class));
setLimitExceed(query, result);
ResultList<MetricInfo> rl = getElasticsearchClient().query(typeDescriptor, query, MetricInfo.class);
MetricInfoListResult result = new MetricInfoListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.eclipse.kapua.service.elasticsearch.client.ElasticsearchClient;
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.IndexRequest;
import org.eclipse.kapua.service.elasticsearch.client.rest.ElasticsearchResourcePaths;
import org.eclipse.kapua.service.storable.model.StorableListResult;
import org.eclipse.kapua.service.storable.model.id.StorableId;
import org.eclipse.kapua.service.storable.model.id.StorableIdFactory;
Expand All @@ -104,11 +105,14 @@
import org.eclipse.kapua.service.storable.model.query.predicate.AndPredicate;
import org.eclipse.kapua.service.storable.model.query.predicate.RangePredicate;
import org.eclipse.kapua.service.storable.model.query.predicate.TermPredicate;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.DateFormat;
Expand Down Expand Up @@ -686,6 +690,21 @@ public void insertRandomMessagesIntoDatastoreWithCurrentTimestamps(String msgLis
stepData.put(idListKey, tmpList);
}

//with this method I insert in bulk mode an hard-coded message, in order to speed up the insertion process
@When("I store {int} messages in bulk mode to the index {string}")
public void bulkInsert(int nMessages, String index) throws IOException {
StringBuilder body = new StringBuilder();
for (int i=0; i<nMessages; i++) {
body.append("{ \"index\":{}}\n");
body.append("{\"device_id\":\"6782593496741240747\",\"channel_parts\":[\"genericMetric\"],\"scope_id\":\"1\",\"channel\":\"genericMetric\",\"received_on\":\"2018-10-01T16:43:04.115Z\",\"ip_address\":\"127.0.0.1\",\"metrics\":{\"metric\":{\"int\":2}},\"sent_on\":null,\"body\":null,\"captured_on\":null,\"client_id\":\"samSulekBulkingHeavy\",\"timestamp\":\"2018-10-01T13:48:36.946Z\",\"sort\":[1701784116946]}\n");
}
Request request = new Request("POST", index + ElasticsearchResourcePaths.getBulkPath());
request.setJsonEntity(body.toString());
RestClient cl = (RestClient)elasticsearchClient.getClient();
cl.performRequest(request);
}


@Given("I set the database to device timestamp indexing")
public void setDatabaseToDeviceTimestampIndexing() throws KapuaException {
Account account = (Account) stepData.get(LAST_ACCOUNT);
Expand Down

0 comments on commit 469aee2

Please sign in to comment.