From 94c62ce1ec41d410d63a82ce06c6b20c9768e570 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Thu, 11 Jan 2024 13:38:39 +0700 Subject: [PATCH 1/3] feat: upload raw log block to s3 --- ci/config.json.ci | 10 ++ config.json | 10 ++ src/common/constant.ts | 5 + src/common/utils/utils.ts | 48 +++++++++ .../upload_block_raw_log_to_s3.service.ts | 98 +++++++++++++++++++ 5 files changed, 171 insertions(+) create mode 100644 src/services/crawl-block/upload_block_raw_log_to_s3.service.ts diff --git a/ci/config.json.ci b/ci/config.json.ci index 84facb0fc..2dcba985c 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -255,5 +255,15 @@ "indexNamePattern": "%(tx_id|block_height)%", "milisecondInterval": 60000 } + }, + "uploadBlockRawLogToS3": { + "key": "uploadBlockRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, + "uploadTransactionRawLogToS3": { + "key": "uploadTransactionRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 } } diff --git a/config.json b/config.json index 9f8c9913b..f4d8b9fc1 100644 --- a/config.json +++ b/config.json @@ -270,5 +270,15 @@ "millisecondRepeatJob": 600000 }, "statementTimeout": 600000 + }, + "uploadBlockRawLogToS3": { + "key": "uploadBlockRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, + "uploadTransactionRawLogToS3": { + "key": "uploadTransactionRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 } } diff --git a/src/common/constant.ts b/src/common/constant.ts index 1df131576..fb627c8fa 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -107,6 +107,7 @@ export const BULL_JOB_NAME = { JOB_CHECK_NEED_CREATE_CONSTRAINT: 'job:check-need-create-constraint', JOB_CHECK_EVENT_CONSTRAINT: 'job:check-need-create-event-constraint', JOB_CREATE_EVENT_CONSTRAIN: 'job:create-event-constraint', + UPLOAD_BLOCK_RAW_LOG_TO_S3: 'job:upload-block-raw-log-to-s3', }; export const SERVICE = { @@ -372,6 +373,10 @@ export const SERVICE = { path: 'v1.HoroscopeHandlerService.getData', }, }, + UploadBlockRawLogToS3: { + key: 'UploadBlockRawLogToS3', + path: 'v1.UploadBlockRawLogToS3', + }, }, }; diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index 48392a5b9..f988c4161 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -1,6 +1,7 @@ import { fromBech32 } from '@cosmjs/encoding'; import _ from 'lodash'; import { SemVer } from 'semver'; +import AWS from 'aws-sdk'; export default class Utils { public static isValidAddress(address: string, length = -1) { @@ -215,4 +216,51 @@ export default class Utils { const semver = new SemVer(version1); return semver.compare(version2); } + + public static async uploadDataToS3( + id: string, + s3Client: AWS.S3, + fileName: string, + contentType: string, + data: Buffer, + bucketName: string, + s3Gateway: string, + overwrite = false + ) { + const foundS3Object = await s3Client + .headObject({ + Bucket: bucketName, + Key: fileName, + }) + .promise() + .catch((err) => { + if (err.name === 'NotFound') { + return null; + } + console.error(err); + return err; + }); + if (foundS3Object && !overwrite) { + return; + } + + // eslint-disable-next-line consistent-return + return s3Client + .upload({ + Key: fileName, + Body: data, + Bucket: bucketName, + ContentType: contentType, + }) + .promise() + .then( + (response: { Location: string; Key: string }) => ({ + key: s3Gateway ? s3Gateway + response.Key : response.Key, + id, + }), + (err: string) => { + throw new Error(err); + } + ); + } } diff --git a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts new file mode 100644 index 000000000..e85165607 --- /dev/null +++ b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts @@ -0,0 +1,98 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import Utils from '../../common/utils/utils'; +import { Block, BlockCheckpoint } from '../../models'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { Config, BULL_JOB_NAME, SERVICE } from '../../common'; +import config from '../../../config.json' assert { type: 'json' }; +import knex from '../../common/utils/db_connection'; +import { S3Service } from '../../common/utils/s3'; + +const s3Client = S3Service.connectS3(); +@Service({ + name: SERVICE.V1.UploadBlockRawLogToS3.key, + version: 1, +}) +export default class UploadBlockRawLogToS3 extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + jobName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + }) + async uplodaBlockRawLogToS3() { + const [startBlock, endBlock, updateBlockCheckpoint] = + await BlockCheckpoint.getCheckpoint( + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + [BULL_JOB_NAME.CRAWL_BLOCK], + config.uploadBlockRawLogToS3.key + ); + if (startBlock > endBlock) { + return; + } + this.logger.info(`startBlock: ${startBlock} to endBlock: ${endBlock}`); + const listBlock = await Block.query() + .select('height', 'hash', 'data') + .where('height', '>', startBlock) + .andWhere('height', '<=', endBlock); + const resultUploadS3 = ( + await Promise.all( + listBlock.map((block) => + Utils.uploadDataToS3( + block.height.toString(), + s3Client, + `rawlog/${config.chainId}/block/${block.height}`, + 'application/json', + Buffer.from(JSON.stringify(block.data)), + Config.BUCKET, + Config.S3_GATEWAY, + false + ) + ) + ) + ).filter((e) => e !== undefined); + + const stringListUpdate = resultUploadS3.map( + (item) => + `(${item?.id}, '${JSON.stringify({ + linkS3: item?.key, + })}'::json)` + ); + + await knex.transaction(async (trx) => { + if (resultUploadS3.length > 0) { + await knex.raw( + `UPDATE block SET data = temp.data from (VALUES ${stringListUpdate}) as temp(height, data) where temp.height = block.height` + ); + } + + updateBlockCheckpoint.height = endBlock; + await BlockCheckpoint.query() + .insert(updateBlockCheckpoint) + .onConflict('job_name') + .merge() + .transacting(trx); + }); + } + + async _start(): Promise { + this.createJob( + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.uploadBlockRawLogToS3.millisecondCrawl, + }, + } + ); + return super._start(); + } +} From a7989796329d1ab8fbcbd7e66d06d832e6bd4a40 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Tue, 16 Jan 2024 16:05:34 +0700 Subject: [PATCH 2/3] feat: add console.warn when found key in S3 --- src/common/utils/utils.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index f988c4161..fd8ddee6d 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -241,6 +241,7 @@ export default class Utils { return err; }); if (foundS3Object && !overwrite) { + console.warn(`This S3 key is found in S3: ${fileName}`); return; } From 6e406c8c7308261b13c6e9a658301b4519391826 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Wed, 17 Jan 2024 10:31:15 +0700 Subject: [PATCH 3/3] feat: add chainName to config; throw error when found S3 key in utils --- ci/config.json.ci | 4 +++- config.json | 4 +++- src/common/utils/utils.ts | 9 ++++++--- .../crawl-block/upload_block_raw_log_to_s3.service.ts | 9 ++++++--- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/ci/config.json.ci b/ci/config.json.ci index 5297e5066..1989820ac 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -1,5 +1,6 @@ { "chainId": "aura-testnet-2", + "chainName": "aura", "networkPrefixAddress": "aura", "consensusPrefixAddress": "valcons", "validatorPrefixAddress": "valoper", @@ -259,7 +260,8 @@ "uploadBlockRawLogToS3": { "key": "uploadBlockRawLogToS3", "millisecondCrawl": 1000, - "blocksPerCall": 100 + "blocksPerCall": 100, + "overwriteS3IfFound": true }, "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", diff --git a/config.json b/config.json index 3a46b51fd..ccc862566 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,6 @@ { "chainId": "aura-testnet", + "chainName": "aura", "networkPrefixAddress": "aura", "consensusPrefixAddress": "valcons", "validatorPrefixAddress": "valoper", @@ -274,7 +275,8 @@ "uploadBlockRawLogToS3": { "key": "uploadBlockRawLogToS3", "millisecondCrawl": 1000, - "blocksPerCall": 100 + "blocksPerCall": 100, + "overwriteS3IfFound": true }, "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index fd8ddee6d..6bbd99985 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -240,9 +240,12 @@ export default class Utils { console.error(err); return err; }); - if (foundS3Object && !overwrite) { - console.warn(`This S3 key is found in S3: ${fileName}`); - return; + if (foundS3Object) { + const err = `This S3 key is found in S3: ${fileName}`; + console.warn(err); + if (!overwrite) { + throw new Error(err); + } } // eslint-disable-next-line consistent-return diff --git a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts index e85165607..de0248a41 100644 --- a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts +++ b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts @@ -44,15 +44,18 @@ export default class UploadBlockRawLogToS3 extends BullableService { Utils.uploadDataToS3( block.height.toString(), s3Client, - `rawlog/${config.chainId}/block/${block.height}`, + `rawlog/${config.chainName}/${config.chainId}/block/${block.height}`, 'application/json', Buffer.from(JSON.stringify(block.data)), Config.BUCKET, Config.S3_GATEWAY, - false + config.uploadBlockRawLogToS3.overwriteS3IfFound ) ) - ) + ).catch((err) => { + this.logger.error(err); + throw err; + }) ).filter((e) => e !== undefined); const stringListUpdate = resultUploadS3.map(