-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: upload raw log block to s3 #591
Changes from 3 commits
94c62ce
1191463
a798979
6e406c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* eslint-disable no-await-in-loop */ | ||
import { Service } from '@ourparentcenter/moleculer-decorators-extended'; | ||
import { ServiceBroker } from 'moleculer'; | ||
import Utils from '../../common/utils/utils'; | ||
import { Block, BlockCheckpoint } from '../../models'; | ||
import BullableService, { QueueHandler } from '../../base/bullable.service'; | ||
import { Config, BULL_JOB_NAME, SERVICE } from '../../common'; | ||
import config from '../../../config.json' assert { type: 'json' }; | ||
import knex from '../../common/utils/db_connection'; | ||
import { S3Service } from '../../common/utils/s3'; | ||
|
||
const s3Client = S3Service.connectS3(); | ||
@Service({ | ||
name: SERVICE.V1.UploadBlockRawLogToS3.key, | ||
version: 1, | ||
}) | ||
export default class UploadBlockRawLogToS3 extends BullableService { | ||
public constructor(public broker: ServiceBroker) { | ||
super(broker); | ||
} | ||
|
||
@QueueHandler({ | ||
queueName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, | ||
jobName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, | ||
}) | ||
async uplodaBlockRawLogToS3() { | ||
const [startBlock, endBlock, updateBlockCheckpoint] = | ||
await BlockCheckpoint.getCheckpoint( | ||
BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, | ||
[BULL_JOB_NAME.CRAWL_BLOCK], | ||
peara marked this conversation as resolved.
Show resolved
Hide resolved
|
||
config.uploadBlockRawLogToS3.key | ||
); | ||
if (startBlock > endBlock) { | ||
return; | ||
} | ||
this.logger.info(`startBlock: ${startBlock} to endBlock: ${endBlock}`); | ||
const listBlock = await Block.query() | ||
.select('height', 'hash', 'data') | ||
.where('height', '>', startBlock) | ||
.andWhere('height', '<=', endBlock); | ||
const resultUploadS3 = ( | ||
await Promise.all( | ||
listBlock.map((block) => | ||
Utils.uploadDataToS3( | ||
block.height.toString(), | ||
s3Client, | ||
`rawlog/${config.chainId}/block/${block.height}`, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thêm đc network name sau There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lại phải tạo thêm biến trong config.json nên em không thêm network name đâu There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thêm vào đi cho đẹp, nhỡ có chain nào dở hơi nó để tên trùng There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
'application/json', | ||
Buffer.from(JSON.stringify(block.data)), | ||
Config.BUCKET, | ||
Config.S3_GATEWAY, | ||
false | ||
) | ||
) | ||
) | ||
).filter((e) => e !== undefined); | ||
peara marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
const stringListUpdate = resultUploadS3.map( | ||
(item) => | ||
`(${item?.id}, '${JSON.stringify({ | ||
linkS3: item?.key, | ||
})}'::json)` | ||
); | ||
|
||
await knex.transaction(async (trx) => { | ||
if (resultUploadS3.length > 0) { | ||
await knex.raw( | ||
`UPDATE block SET data = temp.data from (VALUES ${stringListUpdate}) as temp(height, data) where temp.height = block.height` | ||
); | ||
} | ||
|
||
updateBlockCheckpoint.height = endBlock; | ||
await BlockCheckpoint.query() | ||
.insert(updateBlockCheckpoint) | ||
.onConflict('job_name') | ||
.merge() | ||
.transacting(trx); | ||
}); | ||
} | ||
|
||
async _start(): Promise<void> { | ||
this.createJob( | ||
BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, | ||
BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, | ||
{}, | ||
{ | ||
removeOnComplete: true, | ||
removeOnFail: { | ||
count: 3, | ||
}, | ||
repeat: { | ||
every: config.uploadBlockRawLogToS3.millisecondCrawl, | ||
}, | ||
} | ||
); | ||
return super._start(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chỗ này nên log là bị duplicate ra, vì chắc là có vấn đề mới duplicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chỗ này nằm ngoài service nên ko có logger à?
vậy thì nên throw rồi để hàm gọi nó tự xử lý
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log như này có hiện ra ko ko biết
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw ra ngoài thì chỗ Promise.all có thể fail luôn cả job đó anh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done