diff --git a/ci/config.json.ci b/ci/config.json.ci index 9a244d1b1..1989820ac 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -1,5 +1,6 @@ { "chainId": "aura-testnet-2", + "chainName": "aura", "networkPrefixAddress": "aura", "consensusPrefixAddress": "valcons", "validatorPrefixAddress": "valoper", @@ -256,6 +257,17 @@ "milisecondInterval": 60000 } }, + "uploadBlockRawLogToS3": { + "key": "uploadBlockRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100, + "overwriteS3IfFound": true + }, + "uploadTransactionRawLogToS3": { + "key": "uploadTransactionRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, "jobCreateConstraintInTransactionPartition": { "jobRepeatCheckNeedCreateConstraint": { "millisecondRepeatJob": 600000 diff --git a/config.json b/config.json index c4b4f58c5..ccc862566 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,6 @@ { "chainId": "aura-testnet", + "chainName": "aura", "networkPrefixAddress": "aura", "consensusPrefixAddress": "valcons", "validatorPrefixAddress": "valoper", @@ -271,6 +272,17 @@ }, "statementTimeout": 600000 }, + "uploadBlockRawLogToS3": { + "key": "uploadBlockRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100, + "overwriteS3IfFound": true + }, + "uploadTransactionRawLogToS3": { + "key": "uploadTransactionRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, "jobCreateConstraintInTransactionPartition": { "jobRepeatCheckNeedCreateConstraint": { "millisecondRepeatJob": 600000 diff --git a/src/common/constant.ts b/src/common/constant.ts index 17130f968..cb65e2111 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -109,6 +109,7 @@ export const BULL_JOB_NAME = { JOB_CHECK_TRANSACTION_CONSTRAINT: 'job:check-need-create-transaction-constraint', JOB_CREATE_EVENT_CONSTRAIN: 'job:create-event-constraint', + UPLOAD_BLOCK_RAW_LOG_TO_S3: 'job:upload-block-raw-log-to-s3', JOB_CREATE_TRANSACTION_CONSTRAINT: 'job:create-transaction-constraint', }; @@ -379,6 +380,10 @@ export const SERVICE = { path: 'v1.HoroscopeHandlerService.getData', }, }, + UploadBlockRawLogToS3: { + key: 'UploadBlockRawLogToS3', + path: 'v1.UploadBlockRawLogToS3', + }, }, }; diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index 48392a5b9..6bbd99985 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -1,6 +1,7 @@ import { fromBech32 } from '@cosmjs/encoding'; import _ from 'lodash'; import { SemVer } from 'semver'; +import AWS from 'aws-sdk'; export default class Utils { public static isValidAddress(address: string, length = -1) { @@ -215,4 +216,55 @@ export default class Utils { const semver = new SemVer(version1); return semver.compare(version2); } + + public static async uploadDataToS3( + id: string, + s3Client: AWS.S3, + fileName: string, + contentType: string, + data: Buffer, + bucketName: string, + s3Gateway: string, + overwrite = false + ) { + const foundS3Object = await s3Client + .headObject({ + Bucket: bucketName, + Key: fileName, + }) + .promise() + .catch((err) => { + if (err.name === 'NotFound') { + return null; + } + console.error(err); + return err; + }); + if (foundS3Object) { + const err = `This S3 key is found in S3: ${fileName}`; + console.warn(err); + if (!overwrite) { + throw new Error(err); + } + } + + // eslint-disable-next-line consistent-return + return s3Client + .upload({ + Key: fileName, + Body: data, + Bucket: bucketName, + ContentType: contentType, + }) + .promise() + .then( + (response: { Location: string; Key: string }) => ({ + key: s3Gateway ? s3Gateway + response.Key : response.Key, + id, + }), + (err: string) => { + throw new Error(err); + } + ); + } } diff --git a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts new file mode 100644 index 000000000..de0248a41 --- /dev/null +++ b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts @@ -0,0 +1,101 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import Utils from '../../common/utils/utils'; +import { Block, BlockCheckpoint } from '../../models'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { Config, BULL_JOB_NAME, SERVICE } from '../../common'; +import config from '../../../config.json' assert { type: 'json' }; +import knex from '../../common/utils/db_connection'; +import { S3Service } from '../../common/utils/s3'; + +const s3Client = S3Service.connectS3(); +@Service({ + name: SERVICE.V1.UploadBlockRawLogToS3.key, + version: 1, +}) +export default class UploadBlockRawLogToS3 extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + jobName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + }) + async uplodaBlockRawLogToS3() { + const [startBlock, endBlock, updateBlockCheckpoint] = + await BlockCheckpoint.getCheckpoint( + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + [BULL_JOB_NAME.CRAWL_BLOCK], + config.uploadBlockRawLogToS3.key + ); + if (startBlock > endBlock) { + return; + } + this.logger.info(`startBlock: ${startBlock} to endBlock: ${endBlock}`); + const listBlock = await Block.query() + .select('height', 'hash', 'data') + .where('height', '>', startBlock) + .andWhere('height', '<=', endBlock); + const resultUploadS3 = ( + await Promise.all( + listBlock.map((block) => + Utils.uploadDataToS3( + block.height.toString(), + s3Client, + `rawlog/${config.chainName}/${config.chainId}/block/${block.height}`, + 'application/json', + Buffer.from(JSON.stringify(block.data)), + Config.BUCKET, + Config.S3_GATEWAY, + config.uploadBlockRawLogToS3.overwriteS3IfFound + ) + ) + ).catch((err) => { + this.logger.error(err); + throw err; + }) + ).filter((e) => e !== undefined); + + const stringListUpdate = resultUploadS3.map( + (item) => + `(${item?.id}, '${JSON.stringify({ + linkS3: item?.key, + })}'::json)` + ); + + await knex.transaction(async (trx) => { + if (resultUploadS3.length > 0) { + await knex.raw( + `UPDATE block SET data = temp.data from (VALUES ${stringListUpdate}) as temp(height, data) where temp.height = block.height` + ); + } + + updateBlockCheckpoint.height = endBlock; + await BlockCheckpoint.query() + .insert(updateBlockCheckpoint) + .onConflict('job_name') + .merge() + .transacting(trx); + }); + } + + async _start(): Promise { + this.createJob( + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.uploadBlockRawLogToS3.millisecondCrawl, + }, + } + ); + return super._start(); + } +}