Skip to content

Commit

Permalink
replace paginationMessageCid with just cursor, smae with pagination q…
Browse files Browse the repository at this point in the history
…uery object (#608)
  • Loading branch information
LiranCohen authored Nov 14, 2023
1 parent 947a372 commit 3f8b405
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 44 deletions.
2 changes: 1 addition & 1 deletion json-schemas/interface-methods/records-query.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"type": "number",
"minimum": 1
},
"messageCid": {
"cursor": {
"type": "string"
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/handlers/records-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ export class RecordsQueryHandler implements MethodHandler {
}

let recordsWrites: RecordsWriteMessageWithOptionalEncodedData[];
let paginationMessageCid: string|undefined;
let cursor: string|undefined;
// if this is an anonymous query and the filter supports published records, query only published records
if (RecordsQueryHandler.filterIncludesPublishedRecords(recordsQuery) && recordsQuery.author === undefined) {
const results = await this.fetchPublishedRecords(tenant, recordsQuery);
recordsWrites = results.messages as RecordsWriteMessageWithOptionalEncodedData[];
paginationMessageCid = results.paginationMessageCid;
cursor = results.cursor;
} else {
// authentication and authorization
try {
Expand All @@ -52,18 +52,18 @@ export class RecordsQueryHandler implements MethodHandler {
if (recordsQuery.author === tenant) {
const results = await this.fetchRecordsAsOwner(tenant, recordsQuery);
recordsWrites = results.messages as RecordsWriteMessageWithOptionalEncodedData[];
paginationMessageCid = results.paginationMessageCid;
cursor = results.cursor;
} else {
const results = await this.fetchRecordsAsNonOwner(tenant, recordsQuery);
recordsWrites = results.messages as RecordsWriteMessageWithOptionalEncodedData[];
paginationMessageCid = results.paginationMessageCid;
cursor = results.cursor;
}
}

return {
status : { code: 200, detail: 'OK' },
entries : recordsWrites,
paginationMessageCid
cursor
};
}

Expand Down Expand Up @@ -95,7 +95,7 @@ export class RecordsQueryHandler implements MethodHandler {
private async fetchRecordsAsOwner(
tenant: string,
recordsQuery: RecordsQuery
): Promise<{ messages: GenericMessage[], paginationMessageCid?: string }> {
): Promise<{ messages: GenericMessage[], cursor?: string }> {
const { dateSort, filter, pagination } = recordsQuery.message.descriptor;

// fetch all published records matching the query
Expand Down Expand Up @@ -131,7 +131,7 @@ export class RecordsQueryHandler implements MethodHandler {
*/
private async fetchRecordsAsNonOwner(
tenant: string, recordsQuery: RecordsQuery
): Promise<{ messages: GenericMessage[], paginationMessageCid?: string }> {
): Promise<{ messages: GenericMessage[], cursor?: string }> {
const { dateSort, pagination } = recordsQuery.message.descriptor;
const filters = [];

Expand Down Expand Up @@ -161,7 +161,7 @@ export class RecordsQueryHandler implements MethodHandler {
*/
private async fetchPublishedRecords(
tenant: string, recordsQuery: RecordsQuery
): Promise<{ messages: GenericMessage[], paginationMessageCid?: string }> {
): Promise<{ messages: GenericMessage[], cursor?: string }> {
const { dateSort, pagination } = recordsQuery.message.descriptor;
const filter = RecordsQueryHandler.buildPublishedRecordsFilter(recordsQuery);
const messageSort = this.convertDateSort(dateSort);
Expand Down
18 changes: 9 additions & 9 deletions src/store/message-store-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,26 @@ export class MessageStoreLevel implements MessageStore {
messageSort?: MessageSort,
pagination?: Pagination,
options?: MessageStoreOptions
): Promise<{ messages: GenericMessage[], paginationMessageCid?: string }> {
): Promise<{ messages: GenericMessage[], cursor?: string }> {
options?.signal?.throwIfAborted();

const messages: GenericMessage[] = [];
// note: injecting tenant into filters to allow querying with an "empty" filter.
// if there are no other filters present it will return all the messages the tenant.
const resultIds = await this.index.query(tenant, filters.map(f => ({ ...f, tenant })), options);

// as an optimization for large data sets, we are finding the message object which matches the paginationMessageCid here.
// as an optimization for large data sets, we are finding the message object which matches the cursor here.
// we can use this within the pagination function after sorting to determine the starting point of the array in a more efficient way.
let paginationMessage: GenericMessage | undefined;
for (const id of resultIds) {
const message = await this.get(tenant, id, options);
if (message) { messages.push(message); }
if (pagination?.messageCid && pagination.messageCid === id) {
if (pagination?.cursor && pagination.cursor === id) {
paginationMessage = message;
}
}

if (pagination?.messageCid !== undefined && paginationMessage === undefined) {
if (pagination?.cursor !== undefined && paginationMessage === undefined) {
return { messages: [] }; //if paginationMessage is not found, do not return any results
}

Expand All @@ -120,7 +120,7 @@ export class MessageStoreLevel implements MessageStore {
messages: GenericMessage[],
paginationMessage?: GenericMessage,
pagination: Pagination = { }
): Promise<{ messages: GenericMessage[], paginationMessageCid?: string } > {
): Promise<{ messages: GenericMessage[], cursor?: string } > {
const { limit } = pagination;
if (paginationMessage === undefined && limit === undefined) {
return { messages }; // return all without pagination pointer.
Expand All @@ -136,16 +136,16 @@ export class MessageStoreLevel implements MessageStore {
const end = limit === undefined ? undefined : start + limit;
const results = messages.slice(start, end);

// we only return a paginationMessageCid cursor if there are more results
// we only return a cursor cursor if there are more results
const hasMoreResults = end !== undefined && end < messages.length;
let paginationMessageCid: string|undefined;
let cursor: string|undefined;
if (hasMoreResults) {
// we extract the cid of the last message in the result set.
const lastMessage = results.at(-1);
paginationMessageCid = await Message.getCid(lastMessage!);
cursor = await Message.getCid(lastMessage!);
}

return { messages: results, paginationMessageCid };
return { messages: results, cursor };
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/types/message-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export interface MessageStore {
messageSort?: MessageSort,
pagination?: Pagination,
options?: MessageStoreOptions
): Promise<{ messages: GenericMessage[], paginationMessageCid?: string }>;
): Promise<{ messages: GenericMessage[], cursor?: string }>;

/**
* Deletes the message associated with the id provided.
Expand Down
9 changes: 7 additions & 2 deletions src/types/message-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ export type Filter = {
[property: string]: EqualFilter | OneOfFilter | RangeFilter
};

/**
* Pagination Options for querying messages.
*
* The cursor is the messageCid of the message you would like to pagination from.
*/
export type Pagination = {
messageCid?: string
limit?: number
cursor?: string;
limit?: number;
};

export enum SortOrder {
Expand Down
2 changes: 1 addition & 1 deletion src/types/records-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export type RecordsQueryMessage = GenericMessage & {

export type RecordsQueryReply = GenericMessageReply & {
entries?: RecordsQueryReplyEntry[];
paginationMessageCid?: string;
cursor?: string;
};

export type RecordsReadMessage = {
Expand Down
30 changes: 15 additions & 15 deletions tests/handlers/records-query.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ export function testRecordsQueryHandler(): void {

const limit = 5;
const results: RecordsQueryReplyEntry[] = [];
let messageCid;
let cursor;
while (true) {
const pageQuery = await TestDataGenerator.generateRecordsQuery({
author : alice,
Expand All @@ -1143,24 +1143,24 @@ export function testRecordsQueryHandler(): void {
},
pagination: {
limit: limit,
messageCid,
cursor,
},
});

const pageReply = await dwn.processMessage(alice.did, pageQuery.message);
expect(pageReply.status.code).to.equal(200);
messageCid = pageReply.paginationMessageCid;
cursor = pageReply.cursor;
expect(pageReply.entries?.length).to.be.lte(limit);
results.push(...pageReply.entries!);
if (messageCid === undefined) {
if (cursor === undefined) {
break;
}
}
expect(results.length).to.equal(messages.length);
expect(messages.every(({ message }) => results.map(e => (e as RecordsWriteMessage).recordId).includes(message.recordId)));
});

it('paginationMessageCid should match the messageCid of the last entry in the returned query', async () => {
it('cursor should match the messageCid of the last entry in the returned query', async () => {
const alice = await DidKeyResolver.generate();

const messages = await Promise.all(Array(6).fill({}).map(_ => TestDataGenerator.generateRecordsWrite({
Expand All @@ -1186,10 +1186,10 @@ export function testRecordsQueryHandler(): void {
const pageReply = await dwn.processMessage(alice.did, pageQuery.message);
expect(pageReply.status.code).to.equal(200);
expect(pageReply.entries?.length).to.be.lte(limit);
expect(pageReply.paginationMessageCid).to.exist;
expect(pageReply.cursor).to.exist;
const lastMessageWithAuthorization = messages.find(m => m.message.recordId === pageReply.entries?.at(-1)!.recordId)!;
const messageCid = await Message.getCid(lastMessageWithAuthorization.message);
expect(pageReply.paginationMessageCid).to.equal(messageCid);
expect(pageReply.cursor).to.equal(messageCid);
});

it('should allow an anonymous unauthenticated query to return published records', async () => {
Expand Down Expand Up @@ -1420,32 +1420,32 @@ export function testRecordsQueryHandler(): void {
expect(results.status.code).to.equal(200);
expect(results.entries?.length).to.equal(10, 'alice page 1');
const page1PaginationLastMessage = await Message.getCid(sortedMessages.at(9)!); // get messageCid from message with authorization.
expect(results.paginationMessageCid).to.equal(page1PaginationLastMessage, 'alice page 1');
expect(results.cursor).to.equal(page1PaginationLastMessage, 'alice page 1');

// page2 alice
const aliceQueryMessageDataPage2 = await TestDataGenerator.generateRecordsQuery({
author : alice,
filter : { schema },
dateSort : DateSort.CreatedAscending,
pagination : { limit: 10, messageCid: results.paginationMessageCid },
pagination : { limit: 10, cursor: results.cursor },
});
results = await dwn.processMessage(alice.did, aliceQueryMessageDataPage2.message) ;
expect(results.status.code).to.equal(200);
expect(results.entries?.length).to.equal(10, 'alice page 2');
const page2PaginationLastMessage = await Message.getCid(sortedMessages.at(19)!); // get messageCid from message with authorization.
expect(results.paginationMessageCid).to.equal(page2PaginationLastMessage, 'alice page 2');
expect(results.cursor).to.equal(page2PaginationLastMessage, 'alice page 2');

// page3 alice
const aliceQueryMessageDataPage3 = await TestDataGenerator.generateRecordsQuery({
author : alice,
filter : { schema },
dateSort : DateSort.CreatedAscending,
pagination : { limit: 10, messageCid: results.paginationMessageCid },
pagination : { limit: 10, cursor: results.cursor },
});
results = await dwn.processMessage(alice.did, aliceQueryMessageDataPage3.message) ;
expect(results.status.code).to.equal(200);
expect(results.entries?.length).to.equal(5, 'alice page 3');
expect(results.paginationMessageCid).to.not.exist;
expect(results.cursor).to.not.exist;

const bobs = (m: RecordsWriteMessage): boolean => {
return m.descriptor.recipient === bob.did || m.descriptor.published === true || Message.getSigner(m) === bob.did;
Expand All @@ -1467,19 +1467,19 @@ export function testRecordsQueryHandler(): void {
expect(results.status.code).to.equal(200);
expect(results.entries?.length).to.equal(10, 'bob page 1');
const page1BobPaginationLastMessage = await Message.getCid(bobSorted.at(9)!);
expect(results.paginationMessageCid).to.equal(page1BobPaginationLastMessage, 'bob page 1');
expect(results.cursor).to.equal(page1BobPaginationLastMessage, 'bob page 1');
bobRetrieved.push(...results.entries!);

const bobQueryMessagePage2 = await TestDataGenerator.generateRecordsQuery({
author : bob,
filter : { schema },
dateSort : DateSort.CreatedAscending,
pagination : { limit: 10, messageCid: results.paginationMessageCid },
pagination : { limit: 10, cursor: results.cursor },
});
results = await dwn.processMessage(alice.did, bobQueryMessagePage2.message) ;
expect(results.status.code).to.equal(200);
expect(results.entries?.length).to.equal(10, 'bob page 2');
expect(results.paginationMessageCid).to.not.exist;
expect(results.cursor).to.not.exist;
bobRetrieved.push(...results.entries!);

const compareRecordId = (a: GenericMessage, b:GenericMessage): boolean => {
Expand Down
14 changes: 7 additions & 7 deletions tests/store/message-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,11 @@ export function testMessageStore(): void {

// get all of the records
const allRecords = await messageStore.query(alice.did, [{}], {}, { limit: 10 });
expect(allRecords.paginationMessageCid).to.not.exist;
expect(allRecords.cursor).to.not.exist;

// get only partial records
const partialRecords = await messageStore.query(alice.did, [{}], {}, { limit: 5 });
expect(partialRecords.paginationMessageCid).to.exist.and.to.not.be.undefined;
expect(partialRecords.cursor).to.exist.and.to.not.be.undefined;
});

it('should return all records from the cursor onwards when no limit is provided', async () => {
Expand All @@ -411,7 +411,7 @@ export function testMessageStore(): void {
const offset = 5;
const cursor = await Message.getCid(sortedRecords[offset - 1].message);

const { messages: limitQuery } = await messageStore.query(alice.did, [{}], {}, { messageCid: cursor });
const { messages: limitQuery } = await messageStore.query(alice.did, [{}], {}, { cursor });
expect(limitQuery.length).to.equal(sortedRecords.slice(offset).length);
for (let i = 0; i < limitQuery.length; i++) {
const offsetIndex = i + offset;
Expand All @@ -435,7 +435,7 @@ export function testMessageStore(): void {
const cursor = await Message.getCid(sortedRecords[offset - 1].message);
const limit = 3;

const { messages: limitQuery } = await messageStore.query(alice.did, [{}], {}, { messageCid: cursor, limit });
const { messages: limitQuery } = await messageStore.query(alice.did, [{}], {}, { cursor, limit });
expect(limitQuery.length).to.equal(limit);
for (let i = 0; i < limitQuery.length; i++) {
const offsetIndex = i + offset;
Expand All @@ -456,10 +456,10 @@ export function testMessageStore(): void {
const results = [];
let cursor: string | undefined;
while (true) {
const { messages: limitQuery, paginationMessageCid } = await messageStore.query(alice.did, [{}], {}, { messageCid: cursor, limit });
const { messages: limitQuery, cursor: queryCursor } = await messageStore.query(alice.did, [{}], {}, { cursor, limit });
expect(limitQuery.length).to.be.lessThanOrEqual(limit);
results.push(...limitQuery);
cursor = paginationMessageCid;
cursor = queryCursor;
if (cursor === undefined) {
break;
}
Expand All @@ -482,7 +482,7 @@ export function testMessageStore(): void {
}

const limit = 4;
const { messages: limitQuery } = await messageStore.query(alice.did, [{}], {}, { messageCid: 'some-cursor', limit });
const { messages: limitQuery } = await messageStore.query(alice.did, [{}], {}, { cursor: 'some-cursor', limit });
expect(limitQuery.length).to.be.equal(0);
});
});
Expand Down

0 comments on commit 3f8b405

Please sign in to comment.