Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(billing): implement balance wallets refill #278

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@
"@opentelemetry/instrumentation-pino": "^0.41.0",
"@opentelemetry/sdk-node": "^0.52.1",
"@sentry/node": "^7.55.2",
"@supercharge/promise-pool": "^3.2.0",
"async-sema": "^3.1.1",
"axios": "^1.7.2",
"commander": "^12.1.0",
"cosmjs-types": "^0.9.0",
"dataloader": "^2.2.2",
"date-fns": "^2.29.2",
"date-fns-tz": "^1.3.6",
"dotenv": "^12.0.4",
"drizzle-orm": "^0.31.2",
"hono": "3.12.0",
"http-assert": "^1.5.0",
"http-errors": "^2.0.0",
"human-interval": "^2.0.1",
"js-sha256": "^0.9.0",
"lodash": "^4.17.21",
Expand All @@ -82,6 +85,8 @@
"devDependencies": {
"@akashnetwork/dev-config": "*",
"@faker-js/faker": "^8.4.1",
"@types/http-assert": "^1.5.5",
"@types/http-errors": "^2.0.4",
"@types/jest": "^29.5.12",
"@types/lodash": "^4.17.0",
"@types/memory-cache": "^0.2.2",
Expand Down
7 changes: 6 additions & 1 deletion apps/api/src/billing/config/env.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ const envSchema = z.object({
TRIAL_DEPLOYMENT_ALLOWANCE_AMOUNT: z.number({ coerce: true }),
TRIAL_FEES_ALLOWANCE_AMOUNT: z.number({ coerce: true }),
TRIAL_ALLOWANCE_DENOM: z.string(),
GAS_SAFETY_MULTIPLIER: z.number({ coerce: true }).default(1.5)
GAS_SAFETY_MULTIPLIER: z.number({ coerce: true }).default(1.5),
FEE_ALLOWANCE_REFILL_THRESHOLD: z.number({ coerce: true }),
DEPLOYMENT_ALLOWANCE_REFILL_THRESHOLD: z.number({ coerce: true }),
FEE_ALLOWANCE_REFILL_AMOUNT: z.number({ coerce: true }),
DEPLOYMENT_ALLOWANCE_REFILL_AMOUNT: z.number({ coerce: true }),
ALLOWANCE_REFILL_BATCH_SIZE: z.number({ coerce: true }).default(10)
});

export const envConfig = envSchema.parse(process.env);
22 changes: 5 additions & 17 deletions apps/api/src/billing/controllers/wallet/wallet.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { EncodeObject } from "@cosmjs/proto-signing";
import { PromisePool } from "@supercharge/promise-pool";
import pick from "lodash/pick";
import { singleton } from "tsyringe";

Expand All @@ -8,8 +7,8 @@ import { UserWalletRepository } from "@src/billing/repositories";
import type { CreateWalletRequestInput, SignTxRequestInput, SignTxResponseOutput } from "@src/billing/routes";
import { GetWalletQuery } from "@src/billing/routes/get-wallet-list/get-wallet-list.router";
import { ManagedUserWalletService, WalletInitializerService } from "@src/billing/services";
import { RefillService } from "@src/billing/services/refill/refill.service";
import { TxSignerService } from "@src/billing/services/tx-signer/tx-signer.service";
import { WithTransaction } from "@src/core/services";

// TODO: authorize endpoints below
@singleton()
Expand All @@ -18,10 +17,10 @@ export class WalletController {
private readonly walletManager: ManagedUserWalletService,
private readonly userWalletRepository: UserWalletRepository,
private readonly walletInitializer: WalletInitializerService,
private readonly signerService: TxSignerService
private readonly signerService: TxSignerService,
private readonly refillService: RefillService
) {}

@WithTransaction()
async create({ data: { userId } }: CreateWalletRequestInput): Promise<WalletOutputResponse> {
return {
data: await this.walletInitializer.initialize(userId)
Expand All @@ -41,18 +40,7 @@ export class WalletController {
};
}

async refillAll() {
const wallets = await this.userWalletRepository.find();
const { results, errors } = await PromisePool.withConcurrency(2)
.for(wallets)
.process(async wallet => {
return await this.walletManager.refill(wallet);
});

if (errors) {
console.log("DEBUG errors", errors);
}

console.log("DEBUG results", results);
async refillWallets() {
await this.refillService.refillAll();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { and, eq } from "drizzle-orm";
import { and, eq, lte, or } from "drizzle-orm";
import first from "lodash/first";
import { singleton } from "tsyringe";

Expand Down Expand Up @@ -60,7 +60,7 @@ export class UserWalletRepository {

async find(query?: Partial<DbUserWalletOutput>) {
const fields = query && (Object.keys(query) as Array<keyof DbUserWalletOutput>);
const where = fields.length ? and(...fields.map(field => eq(this.userWallet[field], query[field]))) : undefined;
const where = fields?.length ? and(...fields.map(field => eq(this.userWallet[field], query[field]))) : undefined;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drizzle recently added a .if() method for conditional where like this. Not necessary to use it, just sharing since it's missing from the doc.

return this.toOutputList(
await this.cursor.query.userWalletSchema.findMany({
Expand All @@ -69,6 +69,15 @@ export class UserWalletRepository {
);
}

async findDrainingWallets(thresholds = { fee: 0, deployment: 0 }, options?: Pick<ListOptions, "limit">) {
return this.toOutputList(
await this.cursor.query.userWalletSchema.findMany({
where: or(lte(this.userWallet.deploymentAllowance, thresholds.deployment.toString()), lte(this.userWallet.feeAllowance, thresholds.fee.toString())),
limit: options?.limit || 10
})
);
}

async findByUserId(userId: UserWalletOutput["userId"]) {
return this.toOutput(await this.cursor.query.userWalletSchema.findFirst({ where: eq(this.userWallet.userId, userId) }));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { OfflineSigner } from "@cosmjs/proto-signing";
import { HttpEndpoint, SigningStargateClient, SigningStargateClientOptions } from "@cosmjs/stargate";
import { CometClient, connectComet } from "@cosmjs/tendermint-rpc";
import type { BroadcastTxSyncResponse } from "@cosmjs/tendermint-rpc/build/comet38";

export type { BroadcastTxSyncResponse };

export class BatchSigningStargateClient extends SigningStargateClient {
public static async connectWithSigner(
endpoint: string | HttpEndpoint,
signer: OfflineSigner,
options: SigningStargateClientOptions = {}
): Promise<BatchSigningStargateClient> {
const cometClient = await connectComet(endpoint);
return this.createWithSigner(cometClient, signer, options);
}

public static async createWithSigner(
cometClient: CometClient,
signer: OfflineSigner,
options: SigningStargateClientOptions = {}
): Promise<BatchSigningStargateClient> {
return new BatchSigningStargateClient(cometClient, signer, options);
}

protected constructor(
cometClient: CometClient | undefined,
private readonly localSigner: OfflineSigner,
options: SigningStargateClientOptions
) {
super(cometClient, localSigner, options);
}

public async tmBroadcastTxSync(tx: Uint8Array): Promise<BroadcastTxSyncResponse> {
return (await this.forceGetCometClient().broadcastTxSync({ tx })) as BroadcastTxSyncResponse;
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { AllowanceHttpService } from "@akashnetwork/http-sdk";
import { stringToPath } from "@cosmjs/crypto";
import { DirectSecp256k1HdWallet, EncodeObject } from "@cosmjs/proto-signing";
import { calculateFee, GasPrice } from "@cosmjs/stargate";
import { DirectSecp256k1HdWallet } from "@cosmjs/proto-signing";
import { IndexedTx } from "@cosmjs/stargate";
import add from "date-fns/add";
import { singleton } from "tsyringe";

import { BillingConfig, InjectBillingConfig } from "@src/billing/providers";
import { MasterSigningClientService } from "@src/billing/services/master-signing-client/master-signing-client.service";
import { MasterWalletService } from "@src/billing/services/master-wallet/master-wallet.service";
import { RpcMessageService } from "@src/billing/services/rpc-message-service/rpc-message.service";
import { RpcMessageService, SpendingAuthorizationMsgOptions } from "@src/billing/services/rpc-message-service/rpc-message.service";
import { LoggerService } from "@src/core";
import { InternalServerException } from "@src/core/exceptions/internal-server.exception";

interface SpendingAuthorizationOptions {
address: string;
limits: {
deployment: number;
fees: number;
};
expiration: Date;
expiration?: Date;
}

@singleton()
Expand All @@ -32,7 +32,8 @@ export class ManagedUserWalletService {
@InjectBillingConfig() private readonly config: BillingConfig,
private readonly masterWalletService: MasterWalletService,
private readonly masterSigningClientService: MasterSigningClientService,
private readonly rpcMessageService: RpcMessageService
private readonly rpcMessageService: RpcMessageService,
private readonly allowanceHttpService: AllowanceHttpService
) {}

async createAndAuthorizeTrialSpending({ addressIndex }: { addressIndex: number }) {
Expand Down Expand Up @@ -63,56 +64,44 @@ export class ManagedUserWalletService {
}

async authorizeSpending(options: SpendingAuthorizationOptions) {
try {
const messages = this.rpcMessageService.getAllGrantMsgs({
granter: await this.masterWalletService.getFirstAddress(),
grantee: options.address,
denom: this.config.TRIAL_ALLOWANCE_DENOM,
expiration: options.expiration,
limits: options.limits
});
const fee = await this.estimateFee(messages, this.config.TRIAL_ALLOWANCE_DENOM);
const txResult = await this.masterSigningClientService.signAndBroadcast(messages, fee);

if (txResult.code !== 0) {
this.logger.error({ event: "SPENDING_AUTHORIZATION_FAILED", address: options.address, txResult });
throw new InternalServerException("Failed to authorize spending for address");
}

this.logger.debug({ event: "SPENDING_AUTHORIZED", address: options.address });
} catch (error) {
error.message = `Failed to authorize spending for address ${options.address}: ${error.message}`;
this.logger.error(error);

if (error.message.includes("fee allowance already exists")) {
this.logger.debug({ event: "SPENDING_ALREADY_AUTHORIZED", address: options.address });
await this.revokeSpending(options.address);

await this.authorizeSpending(options);
} else {
throw error;
}
}
const masterWalletAddress = await this.masterWalletService.getFirstAddress();
const msgOptions = {
granter: masterWalletAddress,
grantee: options.address,
denom: this.config.TRIAL_ALLOWANCE_DENOM,
expiration: options.expiration
};

await Promise.all([
this.authorizeDeploymentSpending({
...msgOptions,
limit: options.limits.deployment
}),
this.authorizeFeeSpending({
...msgOptions,
limit: options.limits.fees
})
]);

this.logger.debug({ event: "SPENDING_AUTHORIZED", address: options.address });
}

private async revokeSpending(address: string) {
const revokeMessage = this.rpcMessageService.getRevokeAllowanceMsg({
granter: await this.masterWalletService.getFirstAddress(),
grantee: address
});
private async authorizeFeeSpending(options: SpendingAuthorizationMsgOptions) {
const feeAllowances = await this.allowanceHttpService.getFeeAllowancesForGrantee(options.grantee);
const feeAllowance = feeAllowances.find(allowance => allowance.granter === options.granter);
const results: Promise<IndexedTx>[] = [];

const fee = await this.estimateFee([revokeMessage], this.config.TRIAL_ALLOWANCE_DENOM);
await this.masterSigningClientService.signAndBroadcast([revokeMessage], fee);
}
if (feeAllowance) {
results.push(this.masterSigningClientService.executeTx([this.rpcMessageService.getRevokeAllowanceMsg(options)]));
}

private async estimateFee(messages: readonly EncodeObject[], denom: string) {
const gasEstimation = await this.masterSigningClientService.simulate(messages, "allowance grant");
const estimatedGas = Math.round(gasEstimation * this.config.GAS_SAFETY_MULTIPLIER);
results.push(this.masterSigningClientService.executeTx([this.rpcMessageService.getFeesAllowanceGrantMsg(options)]));

return calculateFee(estimatedGas, GasPrice.fromString(`0.025${denom}`));
return await Promise.all(results);
}

async refill(wallet: any) {
return wallet;
private async authorizeDeploymentSpending(options: SpendingAuthorizationMsgOptions) {
const deploymentAllowanceMsg = this.rpcMessageService.getDepositDeploymentGrantMsg(options);
return await this.masterSigningClientService.executeTx([deploymentAllowanceMsg]);
}
}
Loading
Loading