Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List events #61

Merged
merged 11 commits into from
Dec 10, 2024
Merged
2 changes: 1 addition & 1 deletion .changeset/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"$schema": "https://unpkg.com/@changesets/[email protected]/schema.json",
"changelog": "@changesets/cli/changelog",
"commit": false,
"commit": ["@changesets/cli/commit", {}],
"fixed": [],
"linked": [],
"access": "public",
Expand Down
6 changes: 6 additions & 0 deletions cf-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# cf-eventhub

## 0.0.15

### Patch Changes

- e088cd6: New feature: list events

## 0.0.14

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion cf-eventhub/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "cf-eventhub",
"type": "module",
"version": "0.0.14",
"version": "0.0.15",
"keywords": [
"cloudflare"
],
Expand Down
18 changes: 17 additions & 1 deletion cf-eventhub/src/core/hub/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
type ResultedDispatch,
makeDispatchLost,
} from "../model";
import type { Repository } from "../repository";
import type { EventWithDispatches, Repository } from "../repository";
import type { EventPayload } from "../type";
import { type QueueMessage, enqueue } from "./queue";
import { type Config, findRoutes } from "./routing";
Expand Down Expand Up @@ -125,6 +125,22 @@ export class EventSink {
return result.value;
}

async listEvents(args?: {
maxItems?: number;
continuationToken?: string;
orderBy?: "CREATED_AT_ASC" | "CREATED_AT_DESC";
}): Promise<{ list: EventWithDispatches[]; continuationToken?: string }> {
const result = await this.repo.listEvents(
args?.maxItems || 10,
args?.continuationToken,
args?.orderBy,
);
if (result.isErr()) {
return Promise.reject(result.error);
}
return result.value;
}

async getEvent(eventId: string): Promise<Event | null> {
const result = await this.repo.getEvent(eventId);
if (result.isErr()) {
Expand Down
21 changes: 21 additions & 0 deletions cf-eventhub/src/core/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import type {
OngoingDispatch,
} from "./model";

export type EventWithDispatches = Event & {
readonly dispatches: readonly Dispatch[];
};

export interface Repository {
/**
* Enter transactional scope and call `fn`.
Expand Down Expand Up @@ -87,4 +91,21 @@ export interface Repository {
"INTERNAL_SERVER_ERROR" | "INVALID_CONTINUATION_TOKEN"
>
>;

/**
* Get events and these dispatches.
* @param maxItems maximum number of items to be fetched
* @param continuationToken token returned in last call
* @param orderBy
*/
listEvents(
maxItems: number,
continuationToken?: string,
orderBy?: "CREATED_AT_ASC" | "CREATED_AT_DESC",
): Promise<
Result<
{ list: EventWithDispatches[]; continuationToken?: string },
"INTERNAL_SERVER_ERROR" | "INVALID_CONTINUATION_TOKEN"
>
>;
}
43 changes: 40 additions & 3 deletions cf-eventhub/src/dev/repository.node.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { beforeAll, describe, test } from "vitest";
import { assert, beforeAll, describe, test } from "vitest";

import type { CreatedEvent } from "../core/model";
import {
nextDate,
testRepositoryListEventDispatches,
testRepositoryListEventsAsc,
testRepositoryListEventsDesc,
testRepositoryListOngoingDispatches,
testRepositoryPersistsCompleteDispatch,
testRepositoryPersistsFailedDispatch,
Expand Down Expand Up @@ -43,10 +48,42 @@ describe("repositorytest", () => {
test("Rollback by exception", async () => {
await testRepositoryRollback(repo, "THROW");
});
});

describe("repositorytest.testRepositoryListOngoingDispatches", () => {
test("List ongoing dispatches", async () => {
await testRepositoryListOngoingDispatches(new DevRepository());
});

test("List events", async () => {
const repo = new DevRepository();
const result = await repo.createEvents([
{
payload: {
id: crypto.randomUUID(),
},
createdAt: await nextDate(),
},
{
payload: {
id: crypto.randomUUID(),
},
createdAt: await nextDate(),
},
{
payload: {
id: crypto.randomUUID(),
},
createdAt: await nextDate(),
},
]);
assert(result.isOk());
assert(result.value.length === 3);
const events = result.value as [CreatedEvent, CreatedEvent, CreatedEvent];

await testRepositoryListEventsAsc(repo, events);
await testRepositoryListEventsDesc(repo, events);
});

test("Dispatch and execution order of events", async () => {
await testRepositoryListEventDispatches(new DevRepository());
});
});
56 changes: 55 additions & 1 deletion cf-eventhub/src/dev/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
isNewDispatchExecution,
ongoingDispatch,
} from "../core/model";
import type { Repository } from "../core/repository";
import type { EventWithDispatches, Repository } from "../core/repository";

export class DevRepository implements Repository {
private mu: Mutex;
Expand Down Expand Up @@ -199,6 +199,60 @@ export class DevRepository implements Repository {
continuationToken: undefined,
});
}

async listEvents(
maxItems: number,
continuationToken?: string,
orderBy?: "CREATED_AT_ASC" | "CREATED_AT_DESC",
): Promise<
Result<
{ list: EventWithDispatches[]; continuationToken?: string },
"INTERNAL_SERVER_ERROR" | "INVALID_CONTINUATION_TOKEN"
>
> {
const order = orderBy || "CREATED_AT_ASC";
const values = [...this.events.values()];
if (order === "CREATED_AT_DESC") {
values.reverse();
}

let lastIndex = 0;
if (continuationToken) {
const lastId = decodeContinuationToken(continuationToken);
const last = values.findIndex((v) => v.id === lastId);
if (last < 0) {
return err("INVALID_CONTINUATION_TOKEN");
}
lastIndex = last;
}

const list = lastIndex ? values.slice(lastIndex + 1) : values;
const dispatches = [...this.dispatches.values()];
const result: EventWithDispatches[] = [];
for (const event of list) {
const eventDispatches = dispatches.filter((d) => d.eventId === event.id);
result.push({
...event,
dispatches: eventDispatches,
});
if (result.length > maxItems) {
break;
}
}

if (result.length > maxItems) {
return ok({
list: result.slice(0, -1),
continuationToken: encodeContinuationToken(
result[result.length - 2].id,
),
});
}
return ok({
list: result,
continuationToken: undefined,
});
}
}

const encodeContinuationToken = (id: string) => btoa(id);
Expand Down
19 changes: 18 additions & 1 deletion cf-eventhub/src/eventhub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { EventSink } from "./core/hub";
import { Config, type ConfigInput } from "./core/hub/routing";
import { DefaultLogger, type LogLevel, type Logger } from "./core/logger";
import type { Dispatch, Event, ResultedDispatch } from "./core/model";
import type { Repository } from "./core/repository";
import type { EventWithDispatches, Repository } from "./core/repository";
import type { EventPayload, RpcSerializable } from "./core/type";

export type RpcEnv = Record<string, unknown> & {
Expand Down Expand Up @@ -110,6 +110,23 @@ export abstract class RpcEventHub<
return this.sink.getEvent(eventId);
}

/**
* List events.
* @param args.maxItems Maximum number of events to list. Default is 10.
* @param args.continuationToken Continuation token for pagination.
* @param args.orderBy Sort order. Default is "CREATED_AT_ASC".
* @returns List of events and continuation token.
*/
async listEvents(args?: {
maxItems?: number;
continuationToken?: string;
orderBy?: "CREATED_AT_ASC" | "CREATED_AT_DESC";
}): Promise<
RpcSerializable<{ list: EventWithDispatches[]; continuationToken?: string }>
> {
return this.sink.listEvents(args);
}

/**
* Retry dispatch which is in any resulted status.
* @param args.dispatchId Dispatch ID to retry.
Expand Down
Loading
Loading