From 75f82e8decda153648e2d58d9ace65cce0161a62 Mon Sep 17 00:00:00 2001 From: Akamig Date: Tue, 21 Nov 2023 09:10:58 +0900 Subject: [PATCH 1/3] Add JobExecutionStore In Monitor/Observer --- src/index.ts | 6 ++++++ src/monitors/assets-transferred-monitor.ts | 4 +++- src/monitors/garage-unload-monitor.ts | 3 ++- src/monitors/ninechronicles-block-monitor.ts | 7 +++++-- src/observers/asset-downstream-observer.ts | 2 ++ src/observers/asset-transferred-observer.ts | 1 + 6 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/index.ts b/src/index.ts index 3f91a72..14d76d7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,8 +6,10 @@ import { AssetBurner } from "./asset-burner"; import { AssetTransfer } from "./asset-transfer"; import { getRequiredEnv } from "./env"; import { HeadlessGraphQLClient } from "./headless-graphql-client"; +import { IJobExecutionStore } from "./interfaces/job-execution-store"; import { IMonitorStateStore } from "./interfaces/monitor-state-store"; import { isBackgroundSyncTxpool } from "./interfaces/txpool"; +import { JobExecutionStore } from "./job-execution-store"; import { Minter } from "./minter"; import { getMonitorStateHandler } from "./monitor-state-handler"; import { AssetsTransferredMonitor } from "./monitors/assets-transferred-monitor"; @@ -44,6 +46,7 @@ const slackBot = new SlackBot( await Sqlite3MonitorStateStore.open( getRequiredEnv("MONITOR_STATE_STORE_PATH"), ); + const jobExecutionStore: IJobExecutionStore = new JobExecutionStore(); const upstreamAssetsTransferredMonitorMonitor = new AssetsTransferredMonitor( @@ -51,6 +54,7 @@ const slackBot = new SlackBot( monitorStateStore, "upstreamAssetTransferMonitor", ), + jobExecutionStore, upstreamGQLClient, Address.fromHex(getRequiredEnv("NC_VAULT_ADDRESS")), ); @@ -60,6 +64,7 @@ const slackBot = new SlackBot( monitorStateStore, "downstreamAssetTransferMonitor", ), + jobExecutionStore, downstreamGQLClient, Address.fromHex(getRequiredEnv("NC_VAULT_ADDRESS")), ); @@ -68,6 +73,7 @@ const slackBot = new SlackBot( monitorStateStore, "upstreamGarageUnloadMonitor", ), + jobExecutionStore, upstreamGQLClient, Address.fromHex(getRequiredEnv("NC_VAULT_ADDRESS")), Address.fromHex(getRequiredEnv("NC_VAULT_AVATAR_ADDRESS")), diff --git a/src/monitors/assets-transferred-monitor.ts b/src/monitors/assets-transferred-monitor.ts index c42d5bc..1e84dfb 100644 --- a/src/monitors/assets-transferred-monitor.ts +++ b/src/monitors/assets-transferred-monitor.ts @@ -1,4 +1,5 @@ import { Address } from "@planetarium/account"; +import { Job } from "@prisma/client"; import { IHeadlessGraphQLClient } from "../interfaces/headless-graphql-client"; import { IMonitorStateHandler } from "../interfaces/monitor-state-handler"; import { AssetTransferredEvent } from "../types/asset-transferred-event"; @@ -10,10 +11,11 @@ export class AssetsTransferredMonitor extends NineChroniclesMonitor extends Monitor< > { private readonly _monitorStateHandler: IMonitorStateHandler; private readonly _delayMilliseconds: number; + protected readonly _jobExecutionStore: IJobExecutionStore; protected readonly _headlessGraphQLClient: IHeadlessGraphQLClient; private latestBlockNumber: number | undefined; constructor( monitorStateHandler: IMonitorStateHandler, + jobExecutionStore: IJobExecutionStore, headlessGraphQLClient: IHeadlessGraphQLClient, delayMilliseconds: number = 15 * 1000, ) { @@ -24,12 +28,12 @@ export abstract class NineChroniclesMonitor extends Monitor< this._monitorStateHandler = monitorStateHandler; this._headlessGraphQLClient = headlessGraphQLClient; + this._jobExecutionStore = jobExecutionStore; this._delayMilliseconds = delayMilliseconds; } async *loop(): AsyncIterableIterator<{ blockHash: BlockHash; - planetID: string; events: (TEventData & TransactionLocation)[]; }> { const nullableLatestBlockHash = await this._monitorStateHandler.load(); @@ -58,7 +62,6 @@ export abstract class NineChroniclesMonitor extends Monitor< yield { blockHash, - planetID, events: await this.getEvents(blockIndex), }; diff --git a/src/observers/asset-downstream-observer.ts b/src/observers/asset-downstream-observer.ts index 213607a..a93e88b 100644 --- a/src/observers/asset-downstream-observer.ts +++ b/src/observers/asset-downstream-observer.ts @@ -2,6 +2,7 @@ import { Address } from "@planetarium/account"; import { IObserver } from "."; import { IAssetBurner } from "../interfaces/asset-burner"; import { IAssetTransfer } from "../interfaces/asset-transfer"; +import { IJobExecutionStore } from "../interfaces/job-execution-store"; import { ISlackMessageSender } from "../slack"; import { SlackBot } from "../slack/bot"; import { BridgeErrorEvent } from "../slack/messages/bridge-error-event"; @@ -17,6 +18,7 @@ export class AssetDownstreamObserver events: (AssetTransferredEvent & TransactionLocation)[]; }> { + private readonly _jobExecutionStore: IJobExecutionStore; private readonly _slackbot: ISlackMessageSender; private readonly _transfer: IAssetTransfer; private readonly _burner: IAssetBurner; diff --git a/src/observers/asset-transferred-observer.ts b/src/observers/asset-transferred-observer.ts index e1408db..0ea11ae 100644 --- a/src/observers/asset-transferred-observer.ts +++ b/src/observers/asset-transferred-observer.ts @@ -1,5 +1,6 @@ import { FungibleAssetValue } from "@planetarium/tx"; import { IObserver } from "."; +import { IJobExecutionStore } from "../interfaces/job-execution-store"; import { IMinter } from "../interfaces/minter"; import { ISlackMessageSender } from "../slack"; import { SlackBot } from "../slack/bot"; From 8fecbbb9831acdb576525e38f0c49198079bc943 Mon Sep 17 00:00:00 2001 From: Akamig Date: Tue, 21 Nov 2023 11:54:53 +0900 Subject: [PATCH 2/3] Job Execution Store Implementation --- src/interfaces/job-execution-store.ts | 15 ++++++ src/job-execution-store.ts | 74 +++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 src/interfaces/job-execution-store.ts create mode 100644 src/job-execution-store.ts diff --git a/src/interfaces/job-execution-store.ts b/src/interfaces/job-execution-store.ts new file mode 100644 index 0000000..e461e71 --- /dev/null +++ b/src/interfaces/job-execution-store.ts @@ -0,0 +1,15 @@ +import { Job, PrismaClient, ActionType } from "@prisma/client"; +import { GarageUnloadEvent } from "../types/garage-unload-event"; +import { IJobExecutionStore } from "./interfaces/job-execution-store"; +import { AssetTransferredEvent } from "../types/asset-transferred-event"; +import { TransactionLocation } from "../types/transaction-location"; + +export interface IJobExecutionStore { + putAssetTransferReq( + event: AssetTransferredEvent & TransactionLocation + ): Promise; + putGarageUnloadReq( + event: GarageUnloadEvent & TransactionLocation + ): Promise; + putJobExec(reqTxId: string, resTxId: string, dstPlanet: string, actionType: ActionType); +} diff --git a/src/job-execution-store.ts b/src/job-execution-store.ts new file mode 100644 index 0000000..890eac9 --- /dev/null +++ b/src/job-execution-store.ts @@ -0,0 +1,74 @@ +import { $Enums, PrismaClient } from "@prisma/client"; +import { IJobExecutionStore } from "./interfaces/job-execution-store"; +import { AssetTransferredEvent } from "./types/asset-transferred-event"; +import { GarageUnloadEvent } from "./types/garage-unload-event"; +import { TransactionLocation } from "./types/transaction-location"; + +export class JobExecutionStore implements IJobExecutionStore { + private _prisma: PrismaClient; + + constructor() { + this._prisma = new PrismaClient(); + } + async putAssetTransferReq( + event: AssetTransferredEvent & TransactionLocation + ) { + await this._prisma.request.create({ + data: { + req_tx_id: event.txId, + src_planet: event.planetID, + req_type: "TRANSFER_ASSETS", + timestamp: new Date(event.timestamp), + transfer: { + create: { + sender: event.sender.toString(), + recipient: event.memo, + ticker: "NCG", //...need to be adjusted in case of non-NCG bridging? + amount: event.amount.rawValue.toString(), + }, + }, + job: { + create: { + startedAt: new Date(event.timestamp) + } + } + }, + }); + } + async putGarageUnloadReq(event: GarageUnloadEvent & TransactionLocation) { + const parsed = JSON.parse(event.memo); + + await this._prisma.request.create({ + data: { + req_tx_id: event.txId, + src_planet: event.planetID, + req_type: "UNLOAD_GARAGE", + timestamp: new Date(event.timestamp), + garage: { + create: { + sender: event.signer, + recipient: parsed[0], + recipientAvatar: parsed[1], + FungibleAssetValues: JSON.stringify(event.fungibleAssetValues), + FungibleItems: JSON.stringify(event.fungibleItems), + }, + }, + job: { + create: { + startedAt: new Date(event.timestamp), + } + } + }, + }); + } + async putJobExec(reqTxId: string, resTxId: string, dstPlanet: string, actionType: $Enums.ActionType) { + await this._prisma.jobExecution.create({ + data: { + jobId: reqTxId, + dstPlanetId: dstPlanet, + transactionId: resTxId, + actionType: actionType, + } + }) + } +} From 54f7efbb6ec88e6634bda2a7c2c3bc0375091fcc Mon Sep 17 00:00:00 2001 From: Akamig Date: Mon, 20 Nov 2023 22:26:03 +0900 Subject: [PATCH 3/3] Use Prisma --- package.json | 2 ++ prisma/schema.prisma | 66 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 prisma/schema.prisma diff --git a/package.json b/package.json index 909b957..6c301f4 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "@types/jest": "^29.5.7", "@types/node": "^20.5.9", "jest": "^29.7.0", + "prisma": "^5.6.0", "ts-jest": "^29.1.1", "typescript": "^5.2.2" }, @@ -27,6 +28,7 @@ "@planetarium/account-aws-kms": "^3.7.0", "@planetarium/bencodex": "^0.2.2", "@planetarium/tx": "^3.7.0", + "@prisma/client": "5.6.0", "@sentry/node": "^7.76.0", "@slack/web-api": "^6.10.0", "@urql/core": "^4.2.0", diff --git a/prisma/schema.prisma b/prisma/schema.prisma new file mode 100644 index 0000000..0b74a6a --- /dev/null +++ b/prisma/schema.prisma @@ -0,0 +1,66 @@ +// This is your Prisma schema file, +// learn more about it in the docs: https://pris.ly/d/prisma-schema + +generator client { + provider = "prisma-client-js" +} + +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +enum ActionType { + BURN + MINT + TRANSFER +} + +enum TxResult { + INVALID + STAGING + SUCCESS + FAILURE +} + +model Job { + id String @id + sender String + recipient String + src_planet String + ticker String + amount String + createdAt DateTime @default(now()) + startedAt DateTime? + processedAt DateTime? + updatedAt DateTime @updatedAt + + executions JobExecution[] +} + +model Transaction { + txId String @id @unique + nonce BigInt @unique + raw Bytes + lastStatus TxResult? + statusUpdatedAt DateTime @default(now()) + + executions JobExecution[] +} + +model JobExecution { + jobId String + job Job @relation(fields: [jobId], references: [id]) + + transactionId String + transaction Transaction @relation(fields: [transactionId], references: [txId]) + + actionType ActionType + retries Int @default(0) + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@id([jobId, transactionId]) + @@unique([jobId, retries]) +}