Skip to content

Commit

Permalink
feat(deployment): implements managed wallet balances collection for t…
Browse files Browse the repository at this point in the history
…op up

As a part of this:
- implements allowance http service pagination limit
- also adds error handling for individual owner processing

refs #395
  • Loading branch information
ygrishajev committed Oct 31, 2024
1 parent 0b08cd3 commit 3149a61
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 54 deletions.
30 changes: 27 additions & 3 deletions apps/api/src/core/repositories/base.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,42 @@ export abstract class BaseRepository<
return this.toOutput(first(items));
}

async find(query?: Partial<Output>, select?: Array<keyof Output>) {
async find(query?: Partial<Output>, options?: { select?: Array<keyof Output>; limit?: number; offset?: number }) {
const params: DBQueryConfig<"many", true> = {
where: this.queryToWhere(query)
};

if (select) {
params.columns = select.reduce((acc, field) => ({ ...acc, [field]: true }), {});
if (options?.select) {
params.columns = options.select.reduce((acc, field) => ({ ...acc, [field]: true }), {});
}

if (options?.limit) {
params.limit = options.limit;
}

if (options?.offset) {
params.offset = options.offset;
}

return this.toOutputList(await this.queryCursor.findMany(params));
}

async paginate(options: { select?: Array<keyof Output>; limit?: number; query?: Partial<Output> }, cb: (page: Output[]) => Promise<void>) {
let offset = 0;
let hasNextPage = true;
const limit = options?.limit || 100;

while (hasNextPage) {
const items = await this.find(options.query, { select: options.select, offset, limit });
offset += items.length;
hasNextPage = items.length === limit;

if (items.length) {
await cb(items);
}
}
}

async updateById(id: Output["id"], payload: Partial<Input>, options?: MutationOptions): Promise<Output>;
async updateById(id: Output["id"], payload: Partial<Input>): Promise<void>;
async updateById(id: Output["id"], payload: Partial<Input>, options?: MutationOptions): Promise<void | Output> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { AllowanceHttpService, BalanceHttpService, DeploymentAllowance } from "@akashnetwork/http-sdk";
import { PromisePool } from "@supercharge/promise-pool";
import { singleton } from "tsyringe";

import { InjectWallet } from "@src/billing/providers/wallet.provider";
import { UserWalletOutput, UserWalletRepository } from "@src/billing/repositories";
import { MasterWalletService } from "@src/billing/services";
import { LoggerService } from "@src/core";
import { InjectSentry, Sentry } from "@src/core/providers/sentry.provider";
import { SentryEventService } from "@src/core/services/sentry-event/sentry-event.service";
import { DrainingDeploymentOutput, LeaseRepository } from "@src/deployment/repositories/lease/lease.repository";

interface Balances {
Expand All @@ -16,43 +16,55 @@ interface Balances {
isManaged: boolean;
}

type OwnerType = "CUSTODIAL" | "MANAGED";

@singleton()
export class TopUpDeploymentsService {
private readonly CONCURRENCY = 10;

private UNLIMITED_FEES_MANAGED_BALANCE = 1000000;

private readonly logger = new LoggerService({ context: TopUpDeploymentsService.name });

constructor(
private readonly userWalletRepository: UserWalletRepository,
private readonly allowanceHttpService: AllowanceHttpService,
private readonly balanceHttpService: BalanceHttpService,
@InjectWallet("MANAGED") private readonly managedMasterWalletService: MasterWalletService,
@InjectWallet("UAKT_TOP_UP") private readonly uaktMasterWalletService: MasterWalletService,
@InjectWallet("USDC_TOP_UP") private readonly usdtMasterWalletService: MasterWalletService,
private readonly leaseRepository: LeaseRepository
private readonly leaseRepository: LeaseRepository,
@InjectSentry() private readonly sentry: Sentry,
private readonly sentryEventService: SentryEventService
) {}

async topUpDeployments() {
const wallets = [this.uaktMasterWalletService, this.usdtMasterWalletService];

const topUpAllManagedDeployments = wallets.map(async wallet => {
const topUpAllCustodialDeployments = wallets.map(async wallet => {
const address = await wallet.getFirstAddress();
await this.allowanceHttpService.paginateDeploymentGrantsForGrantee(address, async grants => {
await PromisePool.withConcurrency(this.CONCURRENCY)
.for(grants)
.process(async grant => this.topUpForGrant(grant));
await this.allowanceHttpService.paginateDeploymentGrants({ side: "grantee", address, limit: this.CONCURRENCY }, async grants => {
await Promise.all(
grants.map(async grant => {
await this.execWithErrorHandler(() => this.topUpForGrant("CUSTODIAL", grant));
})
);
});
});
await Promise.all(topUpAllManagedDeployments);

await this.paginateManagedWallets(async userWallets => {
await Promise.all(userWallets.map(async userWallet => this.topUpForManagedWallet(userWallet)));
await Promise.all(topUpAllCustodialDeployments);

const address = await this.managedMasterWalletService.getFirstAddress();
await this.allowanceHttpService.paginateDeploymentGrants({ side: "granter", address, limit: this.CONCURRENCY }, async grants => {
await Promise.all(
grants.map(async grant => {
await this.execWithErrorHandler(() => this.topUpForGrant("MANAGED", grant));
})
);
});
}

private async topUpForGrant(grant: DeploymentAllowance) {
const balances = await this.collectCustodialWalletBalances(grant);
const owner = grant.granter;
private async topUpForGrant(ownerType: OwnerType, grant: DeploymentAllowance) {
const owner = ownerType === "CUSTODIAL" ? grant.granter : grant.grantee;
const balances = await this.collectWalletBalances(ownerType, grant);
this.logger.debug({ event: "BALANCES_COLLECTED", granter: owner, grantee: grant.grantee, balances });

const drainingDeployments = await this.retrieveDrainingDeployments(owner);
Expand All @@ -63,52 +75,37 @@ export class TopUpDeploymentsService {
});
}

private async collectCustodialWalletBalances(grant: DeploymentAllowance): Promise<Balances> {
private async collectWalletBalances(ownerType: OwnerType, grant: DeploymentAllowance): Promise<Balances> {
const denom = grant.authorization.spend_limit.denom;
const deploymentLimit = parseFloat(grant.authorization.spend_limit.amount);

const feesAllowance = await this.allowanceHttpService.getFeeAllowanceForGranterAndGrantee(grant.granter, grant.grantee);
const feesSpendLimit = feesAllowance.allowance.spend_limit.find(limit => limit.denom === denom);
const feesLimit = feesSpendLimit ? parseFloat(feesSpendLimit.amount) : 0;
const sides = [grant.granter, grant.grantee];

const isManaged = ownerType === "MANAGED";

if (isManaged) {
sides.reverse();
}

const { amount } = await this.balanceHttpService.getBalance(grant.granter, "uakt");
const feesLimit = isManaged ? this.UNLIMITED_FEES_MANAGED_BALANCE : await this.retrieveFeesLimit(sides[0], sides[1], denom);

const { amount } = await this.balanceHttpService.getBalance(sides[0], denom);
const balance = parseFloat(amount);

return {
denom,
feesLimit: feesLimit,
feesLimit,
deploymentLimit,
balance,
isManaged: false
isManaged
};
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
private async paginateManagedWallets(cb: (page: UserWalletOutput[]) => Promise<void>) {
this.logger.debug({ event: "PAGINATING_MANAGED_WALLETS", warning: "Not implemented yet" });
}

private async topUpForManagedWallet(userWallet: UserWalletOutput) {
const balances = await this.collectManagedWalletBalances(userWallet);
this.logger.debug({ event: "BALANCES_COLLECTED", wallet: userWallet, balances });

const drainingDeployments = await this.retrieveDrainingDeployments(userWallet.address);

drainingDeployments.map(async deployment => {
const topUpAmount = await this.calculateTopUpAmount(deployment);
this.validateTopUpAmount(topUpAmount, balances);
});
}
private async retrieveFeesLimit(granter: string, grantee: string, denom: string) {
const feesAllowance = await this.allowanceHttpService.getFeeAllowanceForGranterAndGrantee(granter, grantee);
const feesSpendLimit = feesAllowance.allowance.spend_limit.find(limit => limit.denom === denom);

private async collectManagedWalletBalances(userWallet: UserWalletOutput): Promise<Balances> {
this.logger.debug({ event: "CALCULATING_MANAGE_WALLET_BALANCES", userWallet, warning: "Not implemented yet" });
return {
denom: "usdc",
feesLimit: 0,
deploymentLimit: 0,
balance: 0,
isManaged: true
};
return feesSpendLimit ? parseFloat(feesSpendLimit.amount) : 0;
}

private async retrieveDrainingDeployments(owner: string): Promise<DrainingDeploymentOutput[]> {
Expand All @@ -132,4 +129,13 @@ export class TopUpDeploymentsService {
private async topUpManagedDeployment() {
this.logger.debug({ event: "TOPPING_UP_MANAGED_DEPLOYMENT", warning: "Not implemented yet" });
}

private async execWithErrorHandler(cb: () => Promise<void>) {
try {
await cb();
} catch (error) {
const sentryEventId = this.sentry.captureEvent(this.sentryEventService.toEvent(error));
this.logger.error({ event: "TOP_UP_FAILED", error: error.stack, sentryEventId });
}
}
}
9 changes: 6 additions & 3 deletions packages/http-sdk/src/allowance/allowance-http.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,18 @@ export class AllowanceHttpService extends HttpService {
return allowances.grants;
}

async paginateDeploymentGrantsForGrantee(address: string, cb: (page: DeploymentAllowanceResponse["grants"]) => Promise<void>) {
async paginateDeploymentGrants(
options: { side: "granter" | "grantee"; address: string; limit: number },
cb: (page: DeploymentAllowanceResponse["grants"]) => Promise<void>
) {
let nextPageKey: string | null;

do {
const response = this.extractData(
await this.get<DeploymentAllowanceResponse>(
`cosmos/authz/v1beta1/grants/grantee/${address}`,
`cosmos/authz/v1beta1/grants/${options.side}/${options.address}`,
nextPageKey && {
params: { "pagination.key": nextPageKey }
params: { "pagination.key": nextPageKey, "pagination.limit": options.limit }
}
)
);
Expand Down

0 comments on commit 3149a61

Please sign in to comment.