From 4c0072f62ede449d44136e8db87972ceb90bc760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Lenksj=C3=B6?= <5889538+lenkan@users.noreply.github.com> Date: Fri, 12 Apr 2024 09:47:51 +0200 Subject: [PATCH] feat: method waiting for long running operation (#236) --- .../test-setup-single-client.test.ts | 2 +- .../integration-scripts/utils/test-util.ts | 91 ++----- src/keri/app/coring.ts | 58 ++++- test/app/coring.test.ts | 233 +++++++++++++++--- 4 files changed, 284 insertions(+), 100 deletions(-) diff --git a/examples/integration-scripts/test-setup-single-client.test.ts b/examples/integration-scripts/test-setup-single-client.test.ts index ecbf4e81..cf2f941b 100644 --- a/examples/integration-scripts/test-setup-single-client.test.ts +++ b/examples/integration-scripts/test-setup-single-client.test.ts @@ -1,7 +1,7 @@ import { SignifyClient } from 'signify-ts'; import { getOrCreateClients, getOrCreateIdentifier } from './utils/test-setup'; import { resolveEnvironment } from './utils/resolve-env'; -import { assertOperations, waitOperation } from './utils/test-util'; +import { assertOperations } from './utils/test-util'; let client: SignifyClient; let name1_id: string, name1_oobi: string; diff --git a/examples/integration-scripts/utils/test-util.ts b/examples/integration-scripts/utils/test-util.ts index 4a08262b..95feb44f 100644 --- a/examples/integration-scripts/utils/test-util.ts +++ b/examples/integration-scripts/utils/test-util.ts @@ -15,8 +15,8 @@ export function sleep(ms: number): Promise { export async function assertOperations( ...clients: SignifyClient[] ): Promise { - for (let client of clients) { - let operations = await client.operations().list(); + for (const client of clients) { + const operations = await client.operations().list(); expect(operations).toHaveLength(0); } } @@ -30,9 +30,9 @@ export async function assertOperations( export async function assertNotifications( ...clients: SignifyClient[] ): Promise { - for (let client of clients) { - let res = await client.notifications().list(); - let notes = res.notes.filter((i: any) => i.r === false); + for (const client of clients) { + const res = await client.notifications().list(); + const notes = res.notes.filter((i: { r: boolean }) => i.r === false); expect(notes).toHaveLength(0); } } @@ -46,9 +46,9 @@ export async function warnNotifications( ...clients: SignifyClient[] ): Promise { let count = 0; - for (let client of clients) { - let res = await client.notifications().list(); - let notes = res.notes.filter((i: any) => i.r === false); + for (const client of clients) { + const res = await client.notifications().list(); + const notes = res.notes.filter((i: { r: boolean }) => i.r === false); if (notes.length > 0) { count += notes.length; console.warn('notifications', notes); @@ -57,30 +57,6 @@ export async function warnNotifications( expect(count).toBeGreaterThan(0); // replace warnNotifications with assertNotifications } -/** - * Get status of operation. - * If parameter recurse is set then also checks status of dependent operations. - */ -async function getOperation( - client: SignifyClient, - name: string, - recurse?: boolean -): Promise> { - const result = await client.operations().get(name); - if (recurse === true) { - let i: Operation | undefined = result; - while (result.done && i?.metadata?.depends !== undefined) { - let depends: Operation = await client - .operations() - .get(i.metadata.depends.name); - result.done = result.done && depends.done; - i.metadata.depends = depends; - i = depends.metadata?.depends; - } - } - return result; -} - /** * Poll for operation to become completed. * Removes completed operation @@ -88,42 +64,29 @@ async function getOperation( export async function waitOperation( client: SignifyClient, op: Operation | string, - options: RetryOptions = {} + signal?: AbortSignal ): Promise> { - const ctrl = new AbortController(); - options.signal?.addEventListener('abort', (e: Event) => { - const s = e.target as AbortSignal; - ctrl.abort(s.reason); - }); - let name: string; if (typeof op === 'string') { - name = op; - } else if (typeof op === 'object' && 'name' in op) { - name = op.name; - } else { - throw new Error(); + op = await client.operations().get(op); } - const result: Operation = await retry(async () => { - let t: Operation; - try { - t = await getOperation(client, name, true); - } catch (e) { - ctrl.abort(e); - throw e; - } - if (t.done !== true) { - throw new Error(`Operation ${name} not done`); - } - console.log('DONE', name); - return t; - }, options); - let i: Operation | undefined = result; - while (i !== undefined) { - // console.log('DELETE', i.name); - await client.operations().delete(i.name); - i = i.metadata?.depends; + + op = await client + .operations() + .wait(op, { signal: signal ?? AbortSignal.timeout(30000) }); + await deleteOperations(client, op); + + return op; +} + +async function deleteOperations( + client: SignifyClient, + op: Operation +) { + if (op.metadata?.depends) { + await deleteOperations(client, op.metadata.depends); } - return result; + + await client.operations().delete(op.name); } export async function resolveOobi( diff --git a/src/keri/app/coring.ts b/src/keri/app/coring.ts index fab67518..fd5a1161 100644 --- a/src/keri/app/coring.ts +++ b/src/keri/app/coring.ts @@ -72,18 +72,27 @@ export interface Operation { response?: T; } +export interface OperationsDeps { + fetch( + pathname: string, + method: string, + body: unknown, + headers?: Headers + ): Promise; +} + /** * Operations * @remarks * Operations represent the status and result of long running tasks performed by KERIA agent */ export class Operations { - public client: SignifyClient; + public client: OperationsDeps; /** * Operations * @param {SignifyClient} client */ - constructor(client: SignifyClient) { + constructor(client: OperationsDeps) { this.client = client; } @@ -128,6 +137,51 @@ export class Operations { const method = 'DELETE'; await this.client.fetch(path, method, data); } + + /** + * Poll for operation to become completed. + */ + async wait( + op: Operation, + options: { + signal?: AbortSignal; + minSleep?: number; + maxSleep?: number; + increaseFactor?: number; + } = {} + ): Promise> { + const minSleep = options.minSleep ?? 10; + const maxSleep = options.maxSleep ?? 10000; + const increaseFactor = options.increaseFactor ?? 50; + + if (op.metadata?.depends?.done === false) { + await this.wait(op.metadata.depends, options); + } + + if (op.done === true) { + return op; + } + + let retries = 0; + + // eslint-disable-next-line no-constant-condition + while (true) { + op = await this.get(op.name); + + const delay = Math.max( + minSleep, + Math.min(maxSleep, 2 ** retries * increaseFactor) + ); + retries++; + + if (op.done === true) { + return op; + } + + await new Promise((resolve) => setTimeout(resolve, delay)); + options.signal?.throwIfAborted(); + } + } } /** diff --git a/test/app/coring.test.ts b/test/app/coring.test.ts index 2ebf6185..f25c8be0 100644 --- a/test/app/coring.test.ts +++ b/test/app/coring.test.ts @@ -1,11 +1,17 @@ import { strict as assert } from 'assert'; import libsodium from 'libsodium-wrappers-sumo'; -import { randomPasscode, randomNonce } from '../../src/keri/app/coring'; +import { + randomPasscode, + randomNonce, + Operations, + OperationsDeps, +} from '../../src/keri/app/coring'; import { SignifyClient } from '../../src/keri/app/clienting'; import { Authenticater } from '../../src/keri/core/authing'; import { Salter, Tier } from '../../src/keri/core/salter'; import fetchMock from 'jest-fetch-mock'; import 'whatwg-fetch'; +import { randomUUID } from 'crypto'; fetchMock.enableMocks(); @@ -202,38 +208,6 @@ describe('Coring', () => { assert.deepEqual(lastBody.oobialias, 'witness'); }); - it('Operations', async () => { - await libsodium.ready; - const bran = '0123456789abcdefghijk'; - - const client = new SignifyClient(url, bran, Tier.low, boot_url); - - await client.boot(); - await client.connect(); - - const ops = client.operations(); - - await ops.get('operationName'); - let lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]!; - assert.equal(lastCall[0]!, url + '/operations/operationName'); - assert.equal(lastCall[1]!.method, 'GET'); - - await ops.list(); - lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]!; - assert.equal(lastCall[0]!, url + '/operations?'); - assert.equal(lastCall[1]!.method, 'GET'); - - await ops.list('witness'); - lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]!; - assert.equal(lastCall[0]!, url + '/operations?type=witness'); - assert.equal(lastCall[1]!.method, 'GET'); - - await ops.delete('operationName'); - lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]!; - assert.equal(lastCall[0]!, url + '/operations/operationName'); - assert.equal(lastCall[1]!.method, 'DELETE'); - }); - it('Events and states', async () => { await libsodium.ready; const bran = '0123456789abcdefghijk'; @@ -294,3 +268,196 @@ describe('Coring', () => { ); }); }); + +describe('Operations', () => { + class MockClient implements OperationsDeps { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + fetch = jest.fn, [string, string, any]>(); + + constructor() {} + + operations() { + return new Operations(this); + } + + getLastMockRequest() { + const [pathname, method, body] = this.fetch.mock.lastCall ?? []; + + return { + path: pathname, + method: method, + body: body, + }; + } + } + + let client: MockClient; + beforeEach(async () => { + await libsodium.ready; + client = new MockClient(); + }); + + it('Can get operation by name', async () => { + await libsodium.ready; + + client.fetch.mockResolvedValue( + new Response(JSON.stringify({ name: randomUUID() }), { + status: 200, + }) + ); + await client.operations().get('operationName'); + const lastCall = client.getLastMockRequest(); + assert.equal(lastCall.path, '/operations/operationName'); + assert.equal(lastCall.method, 'GET'); + }); + + it('Can list operations', async () => { + client.fetch.mockResolvedValue( + new Response(JSON.stringify([]), { + status: 200, + }) + ); + await client.operations().list(); + const lastCall = client.getLastMockRequest(); + assert.equal(lastCall.path, '/operations?'); + assert.equal(lastCall.method, 'GET'); + }); + + it('Can list operations by type', async () => { + client.fetch.mockResolvedValue( + new Response(JSON.stringify([]), { + status: 200, + }) + ); + await client.operations().list('witness'); + const lastCall = client.getLastMockRequest(); + assert.equal(lastCall.path, '/operations?type=witness'); + assert.equal(lastCall.method, 'GET'); + }); + + it('Can delete operation by name', async () => { + client.fetch.mockResolvedValue( + new Response(JSON.stringify({}), { + status: 200, + }) + ); + await client.operations().delete('operationName'); + const lastCall = client.getLastMockRequest(); + assert.equal(lastCall.path, '/operations/operationName'); + assert.equal(lastCall.method, 'DELETE'); + }); + + describe('wait', () => { + it('does not wait for operation that is already "done"', async () => { + const name = randomUUID(); + client.fetch.mockResolvedValue( + new Response(JSON.stringify({ name }), { + status: 200, + }) + ); + + const op = { name, done: true }; + const result = await client.operations().wait(op); + assert.equal(client.fetch.mock.calls.length, 0); + assert.equal(op, result); + }); + + it('returns when operation is done after first call', async () => { + const name = randomUUID(); + client.fetch.mockResolvedValue( + new Response(JSON.stringify({ name, done: true }), { + status: 200, + }) + ); + + const op = { name, done: false }; + await client.operations().wait(op); + assert.equal(client.fetch.mock.calls.length, 1); + }); + + it('returns when operation is done after second call', async () => { + const name = randomUUID(); + client.fetch.mockResolvedValueOnce( + new Response(JSON.stringify({ name, done: false }), { + status: 200, + }) + ); + + client.fetch.mockResolvedValueOnce( + new Response(JSON.stringify({ name, done: true }), { + status: 200, + }) + ); + + const op = { name, done: false }; + await client.operations().wait(op, { maxSleep: 10 }); + assert.equal(client.fetch.mock.calls.length, 2); + }); + + it('throw if aborted', async () => { + const name = randomUUID(); + client.fetch.mockImplementation( + async () => + new Response(JSON.stringify({ name, done: false }), { + status: 200, + }) + ); + + const op = { name, done: false }; + + const controller = new AbortController(); + const promise = client + .operations() + .wait(op, { signal: controller.signal }) + .catch((e) => e); + + const abortError = new Error('Aborted'); + controller.abort(abortError); + + const error = await promise; + + assert.equal(error, abortError); + }); + + it('returns when child operation is also done', async () => { + const name = randomUUID(); + const nestedName = randomUUID(); + const depends = { name: nestedName, done: false }; + const op = { name, done: false, depends }; + + client.fetch.mockResolvedValueOnce( + new Response(JSON.stringify({ ...op, done: false }), { + status: 200, + }) + ); + + client.fetch.mockResolvedValueOnce( + new Response( + JSON.stringify({ + ...op, + depends: { ...depends, done: true }, + }), + { + status: 200, + } + ) + ); + + client.fetch.mockResolvedValueOnce( + new Response( + JSON.stringify({ + ...op, + done: true, + depends: { ...depends, done: true }, + }), + { + status: 200, + } + ) + ); + + await client.operations().wait(op, { maxSleep: 10 }); + assert.equal(client.fetch.mock.calls.length, 3); + }); + }); +});