Skip to content

Commit

Permalink
EventsQuery interface and LevelIndex enhancements. (#625)
Browse files Browse the repository at this point in the history
This PR contains a new interface method, EventsQuery, which is primarily used to query for filtered events in order to enable SelectiveSync.

EventLogLevel:
EventLogLevel now takes an additional property indexes to it's append() function. These indexes are key-value indexes in the same format that we use withe the MessageStore index.

IndexLevel:
To enable this functionality IndexLevel has been enhanced to become a more general purpose LevelDB Index that supports sorting as well as a cursor pointer property.

* dwn selevtive sync wip, copy-pasta from index level

* first pass at POC for selective sync

* eventlog query with watermark

* EventsQuery interface

* update after rebase

* events query interfaces and handler

* index immutable RecordsWrite properties of a RecordsDelete

* additional indexing for event log/sync

* complete tests added to handler

* refactor and move tests

* events get coverage

* increase coverage clean up query functions

* range filter support, extract event within iterator

* additional event log indexing tests

* refactor indexes to use common methods, remove cidlog in favor of cidindex

* simply sync

* refactor before consolidation

* initial pass at abstraction for IndexStore within the sync log store

* abstracting functionality for index store

* refactor cleanup

* continue refactor and test effort

* continue refactoring effort

* test index level

* complete test coverage for index level

* migrate functionality from MessageStoreIndex to new IndexLevel

* increase test case coverage

* clean up filter object

* use new IndexLevel cursor functionality for MessageStore pagination

* clean up, comments

* remove selective sync specific tests, added to coverage and comments

* clean up tests and legibility

* improved code legibility and comments

* update after rebase

* add more testing for sort/cursor, finalized sorting logic

* add tests for sort/cursor when inserted out of order

* move events get schema into the interfaces folder

* export FilteredQuery

* export EventsQuery Message and Reply types, add processMessage overload

* sortProperty vs sort

* deletes all indexed keys

* review chanes

* key -> item ID rename where appropriate (#571)

* more item ID renames (#572)

* sync tests, need to make more clear

* review suggestions

* scenario test and review suggestions

* Disambiguating meaning of cursor (#575)

* Disambiguating meaning of cursor

* comment update

Co-authored-by:     Diane Huxley <[email protected]>

---------

Co-authored-by: Liran Cohen <[email protected]>
Co-authored-by: Diane Huxley <[email protected]>

* sort and pagination within the index level implementation

* rip out property indexing

* clean up index level and apply to message store and event log

* clean up tests and add comments

* cleanup and comments

* pagination bench tests

* add to scenario test

* update benchmark tests

* some cleanup

* change benchmark filters for pagination

* create sorted indexes for all indexed properties

* filter query for index

* support both query paths, update records query tests

* clean up and add some comments

* clean up, two paths

* cleanup after rebase, add unimplemented missing test

* remove circular deps

* filter selector/augmentor

* add index utils

* fix circular dep

* clean up

* update tests to use top-level query, will addd more tests

* fix EventsQuery/cursor check

* move filter selector into a class, renamed index utility to FilterUtility

* added tests for filter utility

* move FilterSelector class to util/filters, updated tests to cover level index, need to refactor tests a bit more

* moved index types into their own file, fixes circular deps, cleaner placement

* simplify index-level, remove external watermark using messageCid as a cursor for events

* filter selector tresting, needs some rules removed/changed

* events query for author across message types

* review suggestions

* replace watermark with cursor where necessary

* update match comments

* update watermark comments

* address PR comments

* Remove Nested Object Indexing (#621)

* only index strings, numbers, and booleans

* clean up any usages of uknown in filters

* clean up matchRange filter

* move encoding value/numbers into IndexLevel

* fix async test

* FilterIndex to KeyValues type

* remove faltten

* refactoring (#622)

* Renames (#626)

* review suggestions

* removed uneeeded indexing, remove refernces to Record type in favor of KeyValues when building indexes

* remove circular dep

* update tests

* update EventsFilter types, error handling, convertFilter/parse

* added tests for nonsensicle pobilshed filter

* update test suite

* allow indexing of empty strings

* scaffold events query scenario tests

* updated tests for filter reducer

* fix message store benchmark

* filter reducer coverage

* test for one of

* add testing, filter type needs work

* fix circular dep

* rename method

* Modified code such that in-memory-paging supports empty filter and array (#632)

* Modified code such that in-memory-paging supports empty filter and array

* Removed the need for FilterSelector

* remove author filter and tests

* test filter selector

* move encoding tests to index-level, add tests for convert range criterion

* protocol events tests

* more event filter tests

* more scenario test coverage

* contextId scenario testing, add comments

* updated tests to isFilterConcise

---------

Co-authored-by: Henry Tsai <[email protected]>
Co-authored-by: Diane Huxley <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2023
1 parent 066e74b commit 641888e
Show file tree
Hide file tree
Showing 64 changed files with 4,892 additions and 1,113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ INDEX
# location for index specific levelDB data storage levelDB data storage for non-browser tests
TEST-INDEX
BENCHMARK-INDEX
BENCHMARK-BLOCK
# folders used by code coverage
.nyc_output/
coverage
Expand Down
27 changes: 19 additions & 8 deletions benchmarks/store/index/index-level.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { IndexLevel } from '../../../dist/esm/src/store/index-level.js';
import { v4 as uuid } from 'uuid';

const tenant = 'did:xyz:alice';

// create

const createStart = Date.now();
Expand All @@ -21,31 +23,40 @@ console.log('clear - before', clearBeforeEnd - clearBeforeStart);
// put

const putStart = Date.now();
await Promise.all(Array(10_000).fill().map(() => index.put(uuid(), {
test : 'foo',
number : Math.random()
})));
await Promise.all(Array(10_000).fill().map((_,i) => {
const id = uuid();
const doc = { test: 'foo', number: Math.random() };
return index.put(tenant, id, doc, doc, { index: i, number: Math.random(), id });
}));
const putEnd = Date.now();
console.log('put', putEnd - putStart);

// query - equal

const queryEqualStart = Date.now();
await index.query({
await index.query(tenant, [{
'test': 'foo'
});
}], { sortProperty: 'id' });
const queryEqualEnd = Date.now();
console.log('query - equal', queryEqualEnd - queryEqualStart);

// query - range

const queryRangeStart = Date.now();
await index.query({
await index.query(tenant, [{
'number': { gte: 0.5 }
});
}],{ sortProperty: 'id' });
const queryRangeEnd = Date.now();
console.log('query - range', queryRangeEnd - queryRangeStart);

const multipleRangeStart = Date.now();
await index.query(tenant, [
{ 'number': { lte: 0.1 } },
{ 'number': { gte: 0.5 } }
],{ sortProperty: 'id' });
const multipleRangeEnd = Date.now();
console.log('query - multiple range', multipleRangeEnd - multipleRangeStart);

// clear - after

const clearAfterStart = Date.now();
Expand Down
11 changes: 11 additions & 0 deletions benchmarks/store/index/search-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ await index.QUERY({ AND: [ {
const queryRangeEnd = Date.now();
console.log('query - range', queryRangeEnd - queryRangeStart);

const multipleRangeStart = Date.now();
await index.QUERY({ AND: [ {
FIELD : 'number',
VALUE : { LTE: '0.1' }
},{
FIELD : 'number',
VALUE : { GTE: '0.5' }
} ] });
const multipleRangeEnd = Date.now();
console.log('query - multiple range', multipleRangeEnd - multipleRangeStart);

// clear - after

const clearAfterStart = Date.now();
Expand Down
203 changes: 173 additions & 30 deletions benchmarks/store/message/message-store-level.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
import { MessageStoreLevel } from '../../../dist/esm/src/store/message-store-level.js';
import { SortDirection } from '../../../dist/esm/src/types/query-types.js';
import { TestDataGenerator } from '../../../dist/esm/tests/utils/test-data-generator.js';
import { Time } from '../../../dist/esm/src/utils/time.js';

const tenant = 'did:xyz:alice';
console.log('message store benchmarks');

const items = 10_000;

// pre-generate messages
const insertMessages = Array(10_000).fill().map((_,i) => {
const insertMessages = Array(items).fill().map((_,i) => {
// random schema from 1-5
const schemaId = Math.floor(Math.random() * 5) + 1;
const schema = `schema${schemaId}`;

//random protocol from 1-10
const protocolId = Math.floor(Math.random() * 9);
const protocol = `proto${protocolId}`;

const bobId = i % 25;
const recipient = `bob${bobId + 1}`;
const author = i % 50 === 0 ? 'bob1' : 'alice';
const published = i % 100 === 0 ? true : false;

let year;
const mod = i % 3;
let schema, year;
switch (mod) {
case 0:
schema = 'schema1';
year = 2022;
break;
case 1:
schema = 'schema2';
year = 2023;
break;
default:
schema = 'schema3';
year = 2024;
}

Expand All @@ -35,72 +48,202 @@ const insertMessages = Array(10_000).fill().map((_,i) => {
const indexes = {
...message.descriptor,
schema,
protocol,
dateCreated,
recipient,
author,
published,
};
return { message, indexes };
});

// create
const createStart = Date.now();
const messageStore = new MessageStoreLevel({
location: 'BENCHMARK-INDEX'
blockstoreLocation : 'BENCHMARK-BLOCK',
indexLocation : 'BENCHMARK-INDEX',
});
await messageStore.open();
const createEnd = Date.now();
console.log('\tcreate\t\t\t:', createEnd - createStart);
console.log('\tcreate\t\t\t\t:', createEnd - createStart, 'ms');

// clear - before

const clearBeforeStart = Date.now();
await messageStore.clear();
const clearBeforeEnd = Date.now();
console.log('\tclear - before\t\t:', clearBeforeEnd - clearBeforeStart);
console.log('\tclear - before\t\t\t:', clearBeforeEnd - clearBeforeStart, 'ms');

// put
const putStart = Date.now();
await Promise.all(insertMessages.map(({ message, indexes }) => messageStore.put(tenant, message, indexes)));
const putEnd = Date.now();
console.log('\tput\t\t\t:', putEnd - putStart);
console.log('\tput\t\t\t\t:', putEnd - putStart, 'ms');

const firstDayOf2024 = Time.createTimestamp({ year: 2024, month: 1, day: 1 });

// advanced query
const ascOrder = { messageTimestamp: SortDirection.Ascending };
const descOrder = { messageTimestamp: SortDirection.Descending };

// paginate 10 pages of 20 results for a specific schema
// note: published: true is a smaller subset so will perform better if index optimizes for equality filter
let page = 0;
let paginationMessageCid = undefined;
let messages = [];
let results = [];
const paginationStart = Date.now();
while (page < 10) {
page++;
({ messages, paginationMessageCid } = await messageStore.query(tenant, [
{ published: true, schema: 'schema2', protocol: 'proto6' }
], ascOrder, { limit: 20, paginationMessageCid } ));
results.push(...messages);
if (paginationMessageCid === undefined) {
break;
}
}
const paginationEnd = Date.now();
console.log('\tpagination small subset\t\t:', paginationEnd - paginationStart, 'ms', 'results ', results.length);

// query - equal
// descending order
results = [];
page = 0;
paginationMessageCid = undefined;
const paginationDescStart = Date.now();
while (page < 10) {
page++;
({ messages, paginationMessageCid } = await messageStore.query(tenant, [
{ published: true, schema: 'schema2', protocol: 'proto6' }
], descOrder, { limit: 20, paginationMessageCid } ));
results.push(...messages);
if (paginationMessageCid === undefined) {
break;
}
}
const paginationDescEnd = Date.now();
console.log('\tpagination small subset des\t:', paginationDescEnd - paginationDescStart, 'ms', ' results', results.length);

// filter for a larger result set.
results = [];
page = 0;
paginationMessageCid = undefined;
const paginationLargeStart = Date.now();
while (page < 10) {
page++;
({ messages, paginationMessageCid } = await messageStore.query(tenant, [
{ published: true, schema: 'schema2', protocol: 'proto6' },
{ published: false, schema: 'schema2', protocol: 'proto6' }
], ascOrder, { limit: 20, paginationMessageCid } ));
results.push(...messages);
if (paginationMessageCid === undefined) {
break;
}
}
const paginationLargeEnd = Date.now();
console.log('\tpagination large subset\t\t:', paginationLargeEnd - paginationLargeStart, 'ms', ' results', results.length);

// ascending multiple filters. similar to non-owner query
results = [];
page = 0;
paginationMessageCid = undefined;
const paginationNonOwnerStart = Date.now();
while (page < 10) {
page++;
({ messages, paginationMessageCid } = await messageStore.query(tenant, [
{ schema: 'schema2', published: false, author: 'bob1', protocol: 'proto6' },
{ schema: 'schema2', published: true, protocol: 'proto6' },
{ schema: 'schema2', published: false, recipient: 'bob1', protocol: 'proto6' },
], ascOrder, { limit: 20, paginationMessageCid } ));
results.push(...messages);
if (paginationMessageCid === undefined) {
break;
}
}
const paginationNonOwnerEnd = Date.now();
console.log('\tpagination non owner\t\t:', paginationNonOwnerEnd - paginationNonOwnerStart, 'ms', ' results', results.length);

// descending multiple filters. similar to non-owner query
results = [];
page = 0;
paginationMessageCid = undefined;
const paginationDescNonOwnerStart = Date.now();
while (page < 10) {
page++;
({ messages, paginationMessageCid } = await messageStore.query(tenant, [
{ schema: 'schema2', published: false, author: 'bob1', protocol: 'proto6' },
{ schema: 'schema2', published: true, protocol: 'proto6' },
{ schema: 'schema2', published: false, recipient: 'bob1', protocol: 'proto6' },
], descOrder, { limit: 20, paginationMessageCid } ));
results.push(...messages);
if (paginationMessageCid === undefined) {
break;
}
}
const paginationDescNonOwnerEnd = Date.now();
console.log('\tpagination desc non owner\t:', paginationDescNonOwnerEnd - paginationDescNonOwnerStart, 'ms', ' results', results.length);

const smallResultSetStart = Date.now();
({ messages } = await messageStore.query(tenant, [{ published: true, recipient: 'bob1' }]));
const smallResultSetEnd = Date.now();
console.log('\tquery asc - small set equal\t:', smallResultSetEnd - smallResultSetStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

const lastDayOf2022 = Time.createTimestamp({ year: 2022, month: 12, day: 31 });
const lastDayOf2023 = Time.createTimestamp({ year: 2023, month: 12, day: 31 });
const queryRangeStart = Date.now();
({ messages } = await messageStore.query(tenant, [{
dateCreated: { gt: lastDayOf2022, lt: lastDayOf2023 }
}]));
const queryRangeEnd = Date.now();
console.log('\tquery - range\t\t\t:', queryRangeEnd - queryRangeStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

// larger result set
const queryEqualStart = Date.now();
let { messages } = await messageStore.query(tenant, [{ schema: 'schema2' }]);
({ messages } = await messageStore.query(tenant, [{ schema: 'schema2' }]));
const queryEqualEnd = Date.now();
console.log('\tquery - equal\t\t:', queryEqualEnd - queryEqualStart);
console.log('\t\tresults count\t:', messages.length);
console.log('\tquery - equal\t\t\t:', queryEqualEnd - queryEqualStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

// query - equal multiple
// multiple queries
const multipleEqualStart = Date.now();
({ messages } = await messageStore.query(tenant, [{ schema: 'schema2' }, { schema: 'schema1' }]));
({ messages } = await messageStore.query(tenant, [{ schema: ['schema2', 'schema1'] }, { published: true }]));
const multipleEqualEnd = Date.now();
console.log('\tquery - multiple equal\t:', multipleEqualEnd - multipleEqualStart);
console.log('\t\tresults count\t:', messages.length);
console.log('\tquery - multiple equal\t\t:', multipleEqualEnd - multipleEqualStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

// query - range
const lastDayOf2022 = Time.createTimestamp({ year: 2022, month: 12, day: 31 });
const queryRangeStart = Date.now();
//range queries
// gt
const queryGTRangeStart = Date.now();
({ messages } = await messageStore.query(tenant, [{
dateCreated: { gt: lastDayOf2022 }
}]));
const queryRangeEnd = Date.now();
console.log('\tquery - range\t\t:', queryRangeEnd - queryRangeStart);
console.log('\t\tresults count\t:', messages.length);
const queryGTRangeEnd = Date.now();
console.log('\tquery - gt range\t\t:', queryGTRangeEnd - queryGTRangeStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

// lt
const queryLTRangeStart = Date.now();
({ messages } = await messageStore.query(tenant, [{
dateCreated: { lt: lastDayOf2022 }
}]));
const queryLTRangeEnd = Date.now();
console.log('\tquery - lt range\t\t:', queryLTRangeEnd - queryLTRangeStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

// query - range multiple
const multipleRangeStart = Date.now();
const firstDayOf2024 = Time.createTimestamp({ year: 2024, month: 1, day: 1 });
const lastDayOf2023 = Time.createTimestamp({ year: 2023, month: 12, day: 31 });
({ messages } = await messageStore.query(tenant, [
{ dateCreated: { gt: lastDayOf2022 } },
{ dateCreated: { lt: firstDayOf2024, gt: lastDayOf2023 } }
{ dateCreated: { lt: firstDayOf2024, gt: lastDayOf2023 } },
]));
const multipleRangeEnd = Date.now();
console.log('\tquery - multiple range\t:', multipleRangeEnd - multipleRangeStart);
console.log('\t\tresults count\t:', messages.length);

console.log('\tquery - multiple range\t\t:', multipleRangeEnd - multipleRangeStart, 'ms');
console.log('\t\tresults count\t\t:', messages.length);

// clear - after
const clearAfterStart = Date.now();
await messageStore.clear();
const clearAfterEnd = Date.now();
console.log('\tclear - after\t\t:', clearAfterEnd - clearAfterStart);
console.log('\tclear - after\t\t\t:', clearAfterEnd - clearAfterStart, 'ms');
6 changes: 5 additions & 1 deletion build/compile-validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import Authorization from '../json-schemas/authorization.json' assert { type: 'j
import AuthorizationDelegatedGrant from '../json-schemas/authorization-delegated-grant.json' assert { type: 'json' };
import AuthorizationOwner from '../json-schemas/authorization-owner.json' assert { type: 'json' };
import Definitions from '../json-schemas/definitions.json' assert { type: 'json' };
import EventsGet from '../json-schemas/events/events-get.json' assert { type: 'json' };
import EventsFilter from '../json-schemas/interface-methods/events-filter.json' assert { type: 'json' };
import EventsGet from '../json-schemas/interface-methods/events-get.json' assert { type: 'json' };
import EventsQuery from '../json-schemas/interface-methods/events-query.json' assert { type: 'json' };
import GeneralJwk from '../json-schemas/jwk/general-jwk.json' assert { type: 'json' };
import GeneralJws from '../json-schemas/general-jws.json' assert { type: 'json' };
import GenericSignaturePayload from '../json-schemas/signature-payloads/generic-signature-payload.json' assert { type: 'json' };
Expand Down Expand Up @@ -54,7 +56,9 @@ const schemas = {
RecordsQuery,
RecordsWrite,
RecordsWriteUnidentified,
EventsFilter,
EventsGet,
EventsQuery,
Definitions,
GeneralJwk,
GeneralJws,
Expand Down
Loading

0 comments on commit 641888e

Please sign in to comment.