Skip to content

Commit

Permalink
Add event-engine libs
Browse files Browse the repository at this point in the history
  • Loading branch information
codeliner committed May 4, 2023
1 parent 6d71723 commit a30f76e
Show file tree
Hide file tree
Showing 89 changed files with 5,937 additions and 357 deletions.
820 changes: 464 additions & 356 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@
},
"dependencies": {
"@swc/helpers": "~0.5.0",
"ajv": "^8.12.0",
"ajv-formats": "^2.1.1",
"axios": "^1.0.0",
"express": "~4.18.1",
"json-schema-to-ts": "^2.8.0",
"pg": "^8.10.0",
"pg-cursor": "^2.9.0",
"react": "18.2.0",
"react-dom": "18.2.0",
"react-router-dom": "6.8.1",
Expand Down
18 changes: 18 additions & 0 deletions packages/infrastructure/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
11 changes: 11 additions & 0 deletions packages/infrastructure/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# infrastructure

This library was generated with [Nx](https://nx.dev).

## Building

Run `nx build infrastructure` to build the library.

## Running unit tests

Run `nx test infrastructure` to execute the unit tests via [Jest](https://jestjs.io).
11 changes: 11 additions & 0 deletions packages/infrastructure/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* eslint-disable */
export default {
displayName: 'infrastructure',
preset: '../../jest.preset.js',
testEnvironment: 'node',
transform: {
'^.+\\.[tj]s$': ['ts-jest', { tsconfig: '<rootDir>/tsconfig.spec.json' }],
},
moduleFileExtensions: ['ts', 'js', 'html'],
coverageDirectory: '../../coverage/packages/infrastructure',
};
8 changes: 8 additions & 0 deletions packages/infrastructure/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "@event-engine/infrastructure",
"version": "0.0.1",
"dependencies": {
"pg": "^8.10.0",
"pg-cursor": "^2.9.0"
}
}
45 changes: 45 additions & 0 deletions packages/infrastructure/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"name": "infrastructure",
"$schema": "../../node_modules/nx/schemas/project-schema.json",
"sourceRoot": "packages/infrastructure/src",
"projectType": "library",
"targets": {
"build": {
"executor": "@nx/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/packages/infrastructure",
"tsConfig": "packages/infrastructure/tsconfig.lib.json",
"packageJson": "packages/infrastructure/package.json",
"main": "packages/infrastructure/src/index.ts",
"assets": ["packages/infrastructure/*.md"]
}
},
"publish": {
"command": "node tools/scripts/publish.mjs infrastructure {args.ver} {args.tag}",
"dependsOn": ["build"]
},
"lint": {
"executor": "@nx/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["packages/infrastructure/**/*.ts"]
}
},
"test": {
"executor": "@nx/jest:jest",
"outputs": ["{workspaceRoot}/coverage/{projectRoot}"],
"options": {
"jestConfig": "packages/infrastructure/jest.config.ts",
"passWithNoTests": true
},
"configurations": {
"ci": {
"ci": true,
"codeCoverage": true
}
}
}
},
"tags": []
}
Empty file.
197 changes: 197 additions & 0 deletions packages/infrastructure/src/lib/AggregateRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import {setMessageMetadata} from "@event-engine/messaging/message";
import {Event, EventVisibility, providesPublicEvent} from "@event-engine/messaging/event";
import {Command} from "@event-engine/messaging/command";
import {MultiModelStore} from "@event-engine/infrastructure/MultiModelStore";
import {asyncIteratorToArray} from "@event-engine/infrastructure/helpers/async-iterator-to-array";
import {MatchOperator, MetadataMatcher} from "@event-engine/infrastructure/EventStore";
import {NotFoundError} from "@event-engine/messaging/error/not-found-error";
import {Session} from "@event-engine/infrastructure/MultiModelStore/Session";

interface AggregateState {
[prop: string]: any;
}

interface AggregateStateDoc {
state: AggregateState;
version: number;
}

type AggregateStateFactory<S> = (state: object) => S;

export interface AggregateStateDocument<S = any> {
state: S;
version: number;
}

export interface EventMetadata {
aggregateType: string;
aggregateId: string;
aggregateVersion: number;
causationId: string;
causationName: string;
visibility: EventVisibility;
version: string;
}

export type ApplyFunction<S> = (aggregateState: S, event: Event) => S;

export const AggregateMeta = {
VERSION: "aggregateVersion",
TYPE: "aggregateType",
ID: "aggregateId",
}

export class AggregateRepository<T> {
public readonly aggregateType: string;
public readonly aggregateIdentifier: string;
protected readonly store: MultiModelStore;
protected readonly eventStream: string;
protected readonly aggregateCollection: string;
protected readonly applyFunctions: {[eventName: string]: ApplyFunction<T>};
protected readonly stateFactory: AggregateStateFactory<T>;
protected readonly publicStream: string;
protected nextSession: Session | undefined;

constructor(
store: MultiModelStore,
eventStream: string,
aggregateCollection: string,
aggregateType: string,
aggregateIdentifier: string,
applyFunctions: {[eventName: string]: ApplyFunction<T>},
stateFactory: AggregateStateFactory<T>,
publicStream = "public_stream"
) {
this.store = store;
this.eventStream = eventStream;
this.aggregateCollection = aggregateCollection;
this.aggregateType = aggregateType;
this.aggregateIdentifier = aggregateIdentifier;
this.applyFunctions = applyFunctions;
this.stateFactory = stateFactory;
this.publicStream = publicStream;
}

public useSessionForNextSave(session: Session): void {
this.nextSession = session;
}

public async save(events: Event[], aggregateState: Readonly<AggregateState>, expectedVersion: number, command: Command): Promise<boolean> {
// eslint-disable-next-line no-prototype-builtins
if(!aggregateState.hasOwnProperty(this.aggregateIdentifier)) {
throw Error(`Missing aggregate identifier "${this.aggregateIdentifier}" in aggregate state: ` + JSON.stringify(aggregateState));
}

const arId = aggregateState[this.aggregateIdentifier];
let aggregateEventsCount = 0;

const writeModelEvents: Event[] = [];
const publicEvents: Event[] = [];

events.forEach((evt, index) => {
if(evt.meta.visibility === "service") {
evt = setMessageMetadata(evt, AggregateMeta.ID, arId);
evt = setMessageMetadata(evt, AggregateMeta.TYPE, this.aggregateType);
evt = setMessageMetadata(evt, AggregateMeta.VERSION, expectedVersion + aggregateEventsCount + 1);
aggregateEventsCount++;
}
evt = setMessageMetadata(evt, 'causationId', command.uuid);
evt = setMessageMetadata(evt, 'causationName', command.name);

for (const k in command.meta) {
evt = setMessageMetadata(evt, k, command.meta[k]);
}

if(evt.meta.visibility === "service") {
writeModelEvents.push(evt);

if(providesPublicEvent(evt)) {
publicEvents.push(evt.toPublicEvent());
}
} else {
publicEvents.push(evt);
}

events[index] = evt;
});

const session = this.nextSession || this.store.beginSession();

this.nextSession = undefined;

if(writeModelEvents.length) {
session.appendEventsTo(this.eventStream, writeModelEvents, {
'aggregateId': arId,
'aggregateType': this.aggregateType,
}, expectedVersion)

session.upsertDocument(this.aggregateCollection, arId, {
state: aggregateState,
version: expectedVersion + events.length,
});
}

if(publicEvents.length) {
session.appendEventsTo(this.publicStream, publicEvents);
}

return this.store.commitSession(session);
}

public async loadState(aggregateId: string, untilVersion?: number): Promise<[T, number]> {
let maybeVersionMatcher: MetadataMatcher = {};
let aggregateState: AggregateState = this.stateFactory({});
let aggregateVersion = 0;

if(!untilVersion) {
const doc = await this.store.loadDoc<AggregateStateDoc>(this.aggregateCollection, aggregateId);

if(doc) {
aggregateState = {...aggregateState, ...doc.state};
aggregateVersion = doc.version;
maybeVersionMatcher = {'aggregateVersion': {op: MatchOperator.GT, val: aggregateVersion}};
}
} else {
maybeVersionMatcher = {'aggregateVersion': {op: MatchOperator.LTE, val: untilVersion}};
}

return new Promise<[T, number]>((resolve, reject) => {
this.store.loadEvents<any, EventMetadata>(this.eventStream, {
'aggregateId': aggregateId,
'aggregateType': this.aggregateType,
...maybeVersionMatcher
}).then(async (eventsItr) => {
const events = await asyncIteratorToArray(eventsItr);

const [finalState, finalVersion] = this.applyEvents(aggregateState as T, aggregateVersion, events);

if(finalVersion === 0) {
reject(new NotFoundError(`Aggregate of type ${this.aggregateType} with id: ${aggregateId} not found.`));
return;
}

resolve([finalState, finalVersion]);
});
})
}

public applyEvents(arState: T, arVersion: number, events: Iterable<Event<any, EventMetadata>>): [T, number] {
for (const evt of events) {
if(evt.meta.visibility !== "service") {
continue;
}

// eslint-disable-next-line no-prototype-builtins
if(!this.applyFunctions.hasOwnProperty(evt.name)) {
throw Error(`Missing aggregate apply function for event ${evt.name}`);
}

const applyFunc: ApplyFunction<T> = this.applyFunctions[evt.name];

arState = applyFunc(arState, evt) as T;
arVersion = evt.meta.aggregateVersion;
}

return [arState, arVersion];
}
}
35 changes: 35 additions & 0 deletions packages/infrastructure/src/lib/DocumentStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {Filter} from "./DocumentStore/Filter";
import {Index} from "@event-engine/infrastructure/DocumentStore/Index";

export type Sort = 'asc' | 'desc';
export type PartialSelect = Array<string|[string, string]>;
export type SortOrder = Array<{
prop: string;
sort: Sort;
}>

export interface DocumentStore {
addCollection: (collectionName: string, index?: Index) => Promise<void>;
hasCollection: (collectionName: string) => Promise<boolean>;
dropCollection: (collectionName: string) => Promise<void>;

addCollectionIndex: (collectionName: string, index: Index) => Promise<void>;
hasCollectionIndex: (collectionName: string, index: Index) => Promise<boolean>;
dropCollectionIndex: (collectionName: string, index: Index) => Promise<void>;

addDoc: (collectionName: string, docId: string, doc: object, metadata?: object) => Promise<void>;
updateDoc: (collectionName: string, docId: string, docOrSubset: object, metadata?: object) => Promise<void>;
upsertDoc: (collectionName: string, docId: string, docOrSubset: object, metadata?: object) => Promise<void>;
replaceDoc: (collectionName: string, docId: string, doc: object, metadata?: object) => Promise<void>;
getDoc: <D extends object>(collectionName: string, docId: string) => Promise<D | null>;
getPartialDoc: <D extends object>(collectionName: string, docId: string, partialSelect: PartialSelect) => Promise<D | null>;
deleteDoc: (collectionName: string, docId: string) => Promise<void>;

updateMany: (collectionName: string, filter: any, docOrSubset: object, metadata?: object) => Promise<void>;
replaceMany: (collectionName: string, filter: any, doc: object, metadata?: object) => Promise<void>;
deleteMany: (collectionName: string, filter: Filter) => Promise<void>;
findDocs: <D extends object>(collectionName: string, filter: Filter, skip?: number, limit?: number, orderBy?: SortOrder) => Promise<AsyncIterable<[string, D]>>;
findPartialDocs: <D extends object>(collectionName: string, partialSelect: PartialSelect, filter: Filter, skip?: number, limit?: number, orderBy?: SortOrder) => Promise<AsyncIterable<[string, D]>>;
findDocIds: (collectionName: string, filter: Filter, skip?: number, limit?: number, orderBy?: SortOrder) => Promise<string[]>;
countDocs: (collectionName: string, filter: Filter) => Promise<number>;
}
6 changes: 6 additions & 0 deletions packages/infrastructure/src/lib/DocumentStore/Filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {FilterProcessor} from "@event-engine/infrastructure/DocumentStore/FilterProcessor";

export interface Filter {
processWith: (processor: FilterProcessor) => any;
}

14 changes: 14 additions & 0 deletions packages/infrastructure/src/lib/DocumentStore/Filter/AndFilter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {Filter} from "../Filter";
import {FilterProcessor} from "@event-engine/infrastructure/DocumentStore/FilterProcessor";

export class AndFilter implements Filter {
public readonly internalFilters: Filter[];

constructor(aFilter: Filter, bFilter: Filter, ...otherFilters: Filter[]) {
this.internalFilters = [aFilter, bFilter, ...otherFilters];
}

processWith(processor: FilterProcessor): any {
return processor.processAndFilter(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {Filter} from "../Filter";
import {FilterProcessor} from "@event-engine/infrastructure/DocumentStore/FilterProcessor";

export class AnyFilter implements Filter {
processWith(processor: FilterProcessor): any {
return processor.processAnyFilter(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {Filter} from "../Filter";
import {FilterProcessor} from "@event-engine/infrastructure/DocumentStore/FilterProcessor";

export class AnyOfDocIdFilter implements Filter {
public readonly valList: any[];

constructor(valList: any[]) {
this.valList = valList;
}

processWith(processor: FilterProcessor): any {
return processor.processAnyOfDocIdFilter(this);
}
}
Loading

0 comments on commit a30f76e

Please sign in to comment.