From a3e25a2ed1c042c02accbbd2c8f67bf6674bf0d1 Mon Sep 17 00:00:00 2001 From: Kevin Date: Fri, 21 Feb 2025 10:35:04 +0100 Subject: [PATCH] refactor(ImportData): refactors the notification of the Import Data Progress to the sockets. --- api/src/modules/events/app-events.module.ts | 20 ++---- .../import-progress.emitter.ts | 68 ------------------- .../import-progress.handler.ts | 20 ------ .../import-progress.tracker.factory.ts | 53 --------------- .../modules/geo-coding/geo-coding.module.ts | 4 +- .../modules/geo-coding/geo-coding.service.ts | 20 ++++-- .../geo-coding.progress-tracker.ts | 28 -------- api/src/modules/h3-data/h3-data.module.ts | 6 +- api/src/modules/impact/impact.module.ts | 8 ++- .../impact-calculation.progress-tracker.ts | 15 ++-- .../cqrs/import-data-progress.emitter.ts | 44 ++++++++++++ .../cqrs/import-data-progress.handler.ts | 66 ++++++++++++++++++ .../import-data-progress.object.ts} | 12 ++-- .../modules/import-data/import-data.module.ts | 16 +++-- .../import-data/import-data.service.ts | 14 ++-- .../validation.progress-tracker.ts | 28 -------- .../validation/excel-validator.service.ts | 35 ++++------ .../workers/import-data.consumer.ts | 53 +++++++++++++-- .../indicator-record.repository.ts | 6 +- .../indicator-records.module.ts | 4 +- .../services/impact-calculator.service.ts | 8 +-- .../sourcing-data.progress-tracker.ts | 31 --------- .../sourcing-location.repository.ts | 16 ++--- .../sourcing-locations.module.ts | 2 + 24 files changed, 250 insertions(+), 327 deletions(-) delete mode 100644 api/src/modules/events/import-data-progress/import-progress.emitter.ts delete mode 100644 api/src/modules/events/import-data-progress/import-progress.handler.ts delete mode 100644 api/src/modules/events/import-data-progress/import-progress.tracker.factory.ts delete mode 100644 api/src/modules/geo-coding/progress-tracker/geo-coding.progress-tracker.ts create mode 100644 api/src/modules/import-data/cqrs/import-data-progress.emitter.ts create mode 100644 api/src/modules/import-data/cqrs/import-data-progress.handler.ts rename api/src/modules/{events/import-data-progress/import-progress.event.ts => import-data/import-data-progress.object.ts} (80%) delete mode 100644 api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts delete mode 100644 api/src/modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker.ts diff --git a/api/src/modules/events/app-events.module.ts b/api/src/modules/events/app-events.module.ts index 02e15bd16..7ac5366e1 100644 --- a/api/src/modules/events/app-events.module.ts +++ b/api/src/modules/events/app-events.module.ts @@ -1,25 +1,13 @@ import { Global, Module } from '@nestjs/common'; import { CqrsModule } from '@nestjs/cqrs'; -import { ImportProgressHandler } from 'modules/events/import-data-progress/import-progress.handler'; -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; import { WebSocketsModule } from 'modules/notifications/websockets/websockets.module'; import { ImportProgressSocket } from 'modules/events/import-data-progress/import-progress.socket'; -import { ImportProgressTrackerFactory } from 'modules/events/import-data-progress/import-progress.tracker.factory'; +import { CacheModule } from '@nestjs/cache-manager'; @Global() @Module({ - imports: [CqrsModule, WebSocketsModule], - providers: [ - ImportProgressHandler, - ImportProgressEmitter, - ImportProgressSocket, - ImportProgressTrackerFactory, - ], - exports: [ - ImportProgressEmitter, - ImportProgressTrackerFactory, - ImportProgressSocket, - CqrsModule, - ], + imports: [CqrsModule, WebSocketsModule, CacheModule.register()], /// Might be problematic + providers: [ImportProgressSocket], + exports: [ImportProgressSocket, CqrsModule], }) export class AppEventsModule {} diff --git a/api/src/modules/events/import-data-progress/import-progress.emitter.ts b/api/src/modules/events/import-data-progress/import-progress.emitter.ts deleted file mode 100644 index a4c6af0ce..000000000 --- a/api/src/modules/events/import-data-progress/import-progress.emitter.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { EventBus } from '@nestjs/cqrs'; -import { ImportProgressUpdateEvent } from 'modules/events/import-data-progress/import-progress.event'; -import { ImportProgressSteps } from 'modules/events/import-data-progress/types'; - -/** - * @note: We use eventBus instead of commandBus because even tho broadcasting via websockets can be considered a command, it is not a command in the context of events. (apparently) - */ - -@Injectable() -export class ImportProgressEmitter { - public steps: Record = { - VALIDATING_DATA: 'VALIDATING_DATA', - IMPORTING_DATA: 'IMPORTING_DATA', - GEOCODING: 'GEOCODING', - CALCULATING_IMPACT: 'CALCULATING_IMPACT', - }; - - constructor(private readonly eventBus: EventBus) {} - - emitValidationProgress(importProgress: { - taskId?: string; - progress: number; - }): void { - this.eventBus.publish( - new ImportProgressUpdateEvent( - this.steps.VALIDATING_DATA, - importProgress.progress, - ), - ); - } - - emitImportProgress(importProgress: { - taskId?: string; - progress: number; - }): void { - this.eventBus.publish( - new ImportProgressUpdateEvent( - this.steps.IMPORTING_DATA, - importProgress.progress, - ), - ); - } - - emitGeocodingProgress(importProgress: { - taskId?: string; - progress: number; - }): void { - this.eventBus.publish( - new ImportProgressUpdateEvent( - this.steps.GEOCODING, - importProgress.progress, - ), - ); - } - - emitImpactCalculationProgress(importProgress: { - taskId?: string; - progress: number; - }): void { - this.eventBus.publish( - new ImportProgressUpdateEvent( - this.steps.CALCULATING_IMPACT, - importProgress.progress, - ), - ); - } -} diff --git a/api/src/modules/events/import-data-progress/import-progress.handler.ts b/api/src/modules/events/import-data-progress/import-progress.handler.ts deleted file mode 100644 index f126ffb37..000000000 --- a/api/src/modules/events/import-data-progress/import-progress.handler.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Logger } from '@nestjs/common'; -import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { ImportProgressUpdateEvent } from 'modules/events/import-data-progress/import-progress.event'; -import { ImportProgressSocket } from 'modules/events/import-data-progress/import-progress.socket'; -import { ImportProgressPayload } from 'modules/events/import-data-progress/types'; - -@EventsHandler(ImportProgressUpdateEvent) -export class ImportProgressHandler - implements IEventHandler -{ - logger: Logger = new Logger(ImportProgressHandler.name); - - constructor(private readonly importProgressSocket: ImportProgressSocket) {} - - handle(event: ImportProgressUpdateEvent): void { - this.logger.debug(`Handling event: ${JSON.stringify(event)}`); - const payload: ImportProgressPayload = event.payload; - this.importProgressSocket.emitProgressUpdateToSocket(payload); - } -} diff --git a/api/src/modules/events/import-data-progress/import-progress.tracker.factory.ts b/api/src/modules/events/import-data-progress/import-progress.tracker.factory.ts deleted file mode 100644 index 55748b428..000000000 --- a/api/src/modules/events/import-data-progress/import-progress.tracker.factory.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; -import { GeoCodingProgressTracker } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker'; -import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker'; -import { ValidationProgressTracker } from 'modules/import-data/progress-tracker/validation.progress-tracker'; -import { SourcingDataImportProgressTracker } from 'modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker'; - -// TODO: It would be nice to implement a starting point for all trackers so we have more freedom to place the progress - -@Injectable() -export class ImportProgressTrackerFactory { - constructor(public readonly importProgressEmitter: ImportProgressEmitter) { - this.importProgressEmitter = importProgressEmitter; - } - - createValidationProgressTracker(validationOptions: { - totalSteps: number; - }): ValidationProgressTracker { - return new ValidationProgressTracker(this.importProgressEmitter, { - totalSteps: validationOptions.totalSteps, - }); - } - - createGeoCodingTracker(geoCodeTrackingOptions: { - totalLocations: number; - }): GeoCodingProgressTracker { - return new GeoCodingProgressTracker( - this.importProgressEmitter, - geoCodeTrackingOptions, - ); - } - - createSourcingDataImportTracker(sourcingDataImportOptions: { - totalRecords: number; - totalChunks: number; - }): SourcingDataImportProgressTracker { - return new SourcingDataImportProgressTracker(this.importProgressEmitter, { - totalRecords: sourcingDataImportOptions.totalRecords, - totalChunks: sourcingDataImportOptions.totalChunks, - }); - } - - createImpactCalculationProgressTracker(impactCalculationOptions: { - totalRecords: number; - totalChunks: number; - startingPercentage?: number; - }): ImpactCalculationProgressTracker { - return new ImpactCalculationProgressTracker( - this.importProgressEmitter, - impactCalculationOptions, - ); - } -} diff --git a/api/src/modules/geo-coding/geo-coding.module.ts b/api/src/modules/geo-coding/geo-coding.module.ts index e0ef0be1d..2242ff21b 100644 --- a/api/src/modules/geo-coding/geo-coding.module.ts +++ b/api/src/modules/geo-coding/geo-coding.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { CacheModule } from '@nestjs/cache-manager'; import { AdminRegionsModule } from 'modules/admin-regions/admin-regions.module'; import { GeoRegionsModule } from 'modules/geo-regions/geo-regions.module'; @@ -18,6 +18,7 @@ import { import { GoogleMapsGeocoder } from 'modules/geo-coding/geocoders/google-maps.geocoder'; import * as redisStore from 'cache-manager-redis-store'; import * as config from 'config'; +import { ImportDataModule } from 'modules/import-data/import-data.module'; const geocodingCacheConfig: any = config.get('geocodingCache'); @@ -40,6 +41,7 @@ const geocodingCacheEnabled: boolean = db: geocodingCacheConfig.database, ttl: geocodingCacheTTL, }), + forwardRef(() => ImportDataModule), ], providers: [ { diff --git a/api/src/modules/geo-coding/geo-coding.service.ts b/api/src/modules/geo-coding/geo-coding.service.ts index c5affa5f4..b95622977 100644 --- a/api/src/modules/geo-coding/geo-coding.service.ts +++ b/api/src/modules/geo-coding/geo-coding.service.ts @@ -10,8 +10,7 @@ import { } from 'modules/sourcing-locations/sourcing-location.entity'; import { GeoCodingAbstractClass } from 'modules/geo-coding/geo-coding-abstract-class'; import { AdminRegionOfProductionService } from 'modules/geo-coding/strategies/admin-region-of-production.service'; -import { GeoCodingProgressTracker } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker'; -import { ImportProgressTrackerFactory } from '../events/import-data-progress/import-progress.tracker.factory'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; interface locationInfo { locationAddressInput?: string; @@ -32,7 +31,7 @@ export class GeoCodingService extends GeoCodingAbstractClass { protected readonly countryOfProductionService: CountryOfProductionGeoCodingStrategy, protected readonly unknownLocationService: UnknownLocationGeoCodingStrategy, protected readonly adminRegionOfProductionService: AdminRegionOfProductionService, - protected readonly progressTrackerFactory: ImportProgressTrackerFactory, + protected readonly importDataProgressEmitter: ImportDataProgressEmitter, ) { super(); } @@ -46,8 +45,7 @@ export class GeoCodingService extends GeoCodingAbstractClass { const geoCodedSourcingData: SourcingData[] = []; const errors: any[] = []; const totalLocations: number = sourcingData.length; - const progressTracker: GeoCodingProgressTracker = - this.progressTrackerFactory.createGeoCodingTracker({ totalLocations }); + let locationsSoFar: number = 0; for (let i: number = 0; i < totalLocations; i++) { const location: SourcingData = sourcingData[i]; this.logger.debug( @@ -90,7 +88,11 @@ export class GeoCodingService extends GeoCodingAbstractClass { await this.geoCodeAdminRegionOfProductionLocationType(location), ); } - progressTracker.trackProgress(); + + locationsSoFar++; + this.importDataProgressEmitter.emitGeocodingProgress( + (locationsSoFar / totalLocations) * 100, + ); } catch (e: any) { errors.push({ row: i + 5, @@ -99,7 +101,11 @@ export class GeoCodingService extends GeoCodingAbstractClass { sheet: 'sourcingData', column: null, }); - progressTracker.trackProgress(); + + locationsSoFar++; + this.importDataProgressEmitter.emitGeocodingProgress( + (locationsSoFar / totalLocations) * 100, + ); } } diff --git a/api/src/modules/geo-coding/progress-tracker/geo-coding.progress-tracker.ts b/api/src/modules/geo-coding/progress-tracker/geo-coding.progress-tracker.ts deleted file mode 100644 index 5a34be527..000000000 --- a/api/src/modules/geo-coding/progress-tracker/geo-coding.progress-tracker.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; - -export class GeoCodingProgressTracker { - totalLocations: number; - processedLocations: number = 0; - - constructor( - public readonly importProgressEmitter: ImportProgressEmitter, - trackingOptions: { - totalLocations: number; - }, - ) { - this.importProgressEmitter = importProgressEmitter; - this.totalLocations = trackingOptions.totalLocations; - } - - trackProgress(): void { - this.processedLocations++; - - this.importProgressEmitter.emitGeocodingProgress({ - progress: this.getProgress(), - }); - } - - private getProgress(): number { - return (this.processedLocations / this.totalLocations) * 100; - } -} diff --git a/api/src/modules/h3-data/h3-data.module.ts b/api/src/modules/h3-data/h3-data.module.ts index 74287e730..68db8d4de 100644 --- a/api/src/modules/h3-data/h3-data.module.ts +++ b/api/src/modules/h3-data/h3-data.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { H3DataController } from 'modules/h3-data/h3-data.controller'; import { H3DataService } from 'modules/h3-data/h3-data.service'; import { TypeOrmModule } from '@nestjs/typeorm'; @@ -24,10 +24,10 @@ import { AuthorizationModule } from 'modules/authorization/authorization.module' IndicatorsModule, UnitConversionsModule, SourcingRecordsModule, - AdminRegionsModule, + forwardRef(() => AdminRegionsModule), SuppliersModule, BusinessUnitsModule, - SourcingLocationsModule, + forwardRef(() => SourcingLocationsModule), AuthorizationModule, ], controllers: [H3DataController], diff --git a/api/src/modules/impact/impact.module.ts b/api/src/modules/impact/impact.module.ts index fc7c534b6..34fe79518 100644 --- a/api/src/modules/impact/impact.module.ts +++ b/api/src/modules/impact/impact.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { ImpactService } from 'modules/impact/impact.service'; import { ImpactController } from 'modules/impact/impact.controller'; import { IndicatorsModule } from 'modules/indicators/indicators.module'; @@ -17,18 +17,20 @@ import { BaseImpactService } from 'modules/impact/base-impact.service'; import { ImpactRepository } from 'modules/impact/impact.repository'; import { ImpactReportController } from 'modules/impact/impact-report.controller'; import { ImpactReportsModule } from 'modules/impact/reports/impact-reports.module'; +import { ImportDataModule } from 'modules/import-data/import-data.module'; @Module({ imports: [ IndicatorsModule, SourcingRecordsModule, BusinessUnitsModule, - AdminRegionsModule, + forwardRef(() => AdminRegionsModule), SuppliersModule, MaterialsModule, - SourcingLocationsModule, + forwardRef(() => SourcingLocationsModule), AuthorizationModule, ImpactReportsModule, + forwardRef(() => ImportDataModule), ], providers: [ ImpactRepository, diff --git a/api/src/modules/impact/progress-tracker/impact-calculation.progress-tracker.ts b/api/src/modules/impact/progress-tracker/impact-calculation.progress-tracker.ts index 716852d58..c14206f40 100644 --- a/api/src/modules/impact/progress-tracker/impact-calculation.progress-tracker.ts +++ b/api/src/modules/impact/progress-tracker/impact-calculation.progress-tracker.ts @@ -1,4 +1,4 @@ -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; export class ImpactCalculationProgressTracker { totalRecords: number; @@ -7,7 +7,7 @@ export class ImpactCalculationProgressTracker { private interval: NodeJS.Timer | null = null; constructor( - private readonly importProgressEmitter: ImportProgressEmitter, + private readonly importDataProgressEmitter: ImportDataProgressEmitter, private readonly importTrackInfo: { totalRecords: number; totalChunks: number; @@ -15,7 +15,6 @@ export class ImpactCalculationProgressTracker { estimatedTime?: number; }, ) { - this.importProgressEmitter = importProgressEmitter; this.totalRecords = importTrackInfo.totalRecords; const startingPercentage: number = importTrackInfo.startingPercentage ?? 0; this.progress = startingPercentage; @@ -26,9 +25,7 @@ export class ImpactCalculationProgressTracker { trackProgress(): void { this.progress += this.progressPerChunk; - this.importProgressEmitter.emitImpactCalculationProgress({ - progress: this.getProgress(), - }); + this.importDataProgressEmitter.emitImpactCalculationProgress(this.progress); } private getProgress(): number { @@ -43,9 +40,9 @@ export class ImpactCalculationProgressTracker { this.interval = setInterval(() => { this.progress += progressIncrement; this.progress = Math.min(this.progress, maxProgress); - this.importProgressEmitter.emitImpactCalculationProgress({ - progress: this.getProgress(), - }); + this.importDataProgressEmitter.emitImpactCalculationProgress( + this.progress, + ); if (this.progress >= maxProgress) { this.stopProgressInterval(); diff --git a/api/src/modules/import-data/cqrs/import-data-progress.emitter.ts b/api/src/modules/import-data/cqrs/import-data-progress.emitter.ts new file mode 100644 index 000000000..3b69c8da9 --- /dev/null +++ b/api/src/modules/import-data/cqrs/import-data-progress.emitter.ts @@ -0,0 +1,44 @@ +import { Injectable } from '@nestjs/common'; +import { EventBus } from '@nestjs/cqrs'; +import { ImportProgressSteps } from 'modules/events/import-data-progress/types'; +import { ImportDataProgressEvent } from 'modules/import-data/cqrs/import-data-progress.handler'; + +/** + * @note: We use eventBus instead of commandBus because even tho broadcasting via websockets can be considered a command, it is not a command in the context of events. (apparently) + */ + +@Injectable() +export class ImportDataProgressEmitter { + public steps: Record = { + VALIDATING_DATA: 'VALIDATING_DATA', + IMPORTING_DATA: 'IMPORTING_DATA', + GEOCODING: 'GEOCODING', + CALCULATING_IMPACT: 'CALCULATING_IMPACT', + }; + + constructor(private readonly eventBus: EventBus) {} + + emitValidationProgress(progress: number): void { + this.eventBus.publish( + new ImportDataProgressEvent(this.steps.VALIDATING_DATA, progress), + ); + } + + emitImportProgress(progress: number): void { + this.eventBus.publish( + new ImportDataProgressEvent(this.steps.IMPORTING_DATA, progress), + ); + } + + emitGeocodingProgress(progress: number): void { + this.eventBus.publish( + new ImportDataProgressEvent(this.steps.GEOCODING, progress), + ); + } + + emitImpactCalculationProgress(progress: number): void { + this.eventBus.publish( + new ImportDataProgressEvent(this.steps.CALCULATING_IMPACT, progress), + ); + } +} diff --git a/api/src/modules/import-data/cqrs/import-data-progress.handler.ts b/api/src/modules/import-data/cqrs/import-data-progress.handler.ts new file mode 100644 index 000000000..ce570e617 --- /dev/null +++ b/api/src/modules/import-data/cqrs/import-data-progress.handler.ts @@ -0,0 +1,66 @@ +import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; +import { Job, JobId, Queue } from 'bull'; + +import { ExcelImportJob } from 'modules/import-data/workers/import-data.producer'; +import { Inject, Logger } from '@nestjs/common'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { Cache } from 'cache-manager'; +import { InjectQueue } from '@nestjs/bull'; +import { importQueueName } from 'modules/import-data/workers/import-queue.name'; +import { ImportProgressSteps } from 'modules/events/import-data-progress/types'; +import { ImportDataProgressObject } from 'modules/import-data/import-data-progress.object'; + +export const IMPORT_JOB_CACHE_KEY: string = 'import-job-cache-key'; + +export class ImportDataProgressEvent { + constructor(public step: ImportProgressSteps, public progress: number) {} +} + +@EventsHandler(ImportDataProgressEvent) +export class ImportDataProgressEventHandler + implements IEventHandler +{ + logger: Logger = new Logger(ImportDataProgressEventHandler.name); + + constructor( + @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, + @InjectQueue(importQueueName) private readonly importQueue: Queue, + ) {} + + /** + * Retrieves the bull job instance from the queue and updates the progress triggering an event + * @param event + */ + async handle(event: ImportDataProgressEvent): Promise { + const jobId: JobId | undefined = await this.cacheManager.get( + IMPORT_JOB_CACHE_KEY, + ); + + if (!jobId) { + this.logger.warn('Could not find job id in cache for import progress'); + return; + } + + const job: Job | null = await this.importQueue.getJob( + jobId, + ); + + if (!job) { + this.logger.warn('Could not find job in queue for import progress'); + return; + } + + /* + let currentProgress: ImportProgressObject | undefined = await job.progress(); + + if(!currentProgress){ + currentProgress = new ImportProgressObject(); + } + + */ + + await job.progress( + new ImportDataProgressObject(event.step, event.progress), + ); + } +} diff --git a/api/src/modules/events/import-data-progress/import-progress.event.ts b/api/src/modules/import-data/import-data-progress.object.ts similarity index 80% rename from api/src/modules/events/import-data-progress/import-progress.event.ts rename to api/src/modules/import-data/import-data-progress.object.ts index 74b2ae3ad..81025e3b0 100644 --- a/api/src/modules/events/import-data-progress/import-progress.event.ts +++ b/api/src/modules/import-data/import-data-progress.object.ts @@ -3,10 +3,10 @@ import { ImportProgressPayload, ImportProgressSequence, ImportProgressSteps, -} from './types'; +} from 'modules/events/import-data-progress/types'; -export class ImportProgressUpdateEvent implements IEvent { - stepOrder: ImportProgressSequence = [ +export class ImportDataProgressObject implements IEvent { + static stepOrder: ImportProgressSequence = [ 'VALIDATING_DATA', 'GEOCODING', 'IMPORTING_DATA', @@ -54,10 +54,12 @@ export class ImportProgressUpdateEvent implements IEvent { private setPreviousStepsAsCompleted(step: ImportProgressSteps): void { // Update all previous steps to 'completed' status and 100% progress - const currentStepIndex: number = this.stepOrder.indexOf(step); + const currentStepIndex: number = + ImportDataProgressObject.stepOrder.indexOf(step); for (let i: number = 0; i < currentStepIndex; i++) { - const previousStep: ImportProgressSteps = this.stepOrder[i]; + const previousStep: ImportProgressSteps = + ImportDataProgressObject.stepOrder[i]; this.payload[previousStep].status = 'completed'; this.payload[previousStep].progress = 100; } diff --git a/api/src/modules/import-data/import-data.module.ts b/api/src/modules/import-data/import-data.module.ts index 5af773ed8..ef3a4c14c 100644 --- a/api/src/modules/import-data/import-data.module.ts +++ b/api/src/modules/import-data/import-data.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { ImportDataController } from 'modules/import-data/import-data.controller'; import { MaterialsModule } from 'modules/materials/materials.module'; import { BusinessUnitsModule } from 'modules/business-units/business-units.module'; @@ -25,7 +25,10 @@ import { ImportMailService } from 'modules/import-data/import-mail/import-mail.s import { NotificationsModule } from 'modules/notifications/notifications.module'; import { ExcelValidatorService } from 'modules/import-data/sourcing-data/validation/excel-validator.service'; import { SourcingDataDbCleaner } from 'modules/import-data/sourcing-data/sourcing-data.db-cleaner'; -import { ImportDataEventHandler } from '../events/import-data-events/import-data.event-handler'; +import { ImportDataEventHandler } from 'modules/events/import-data-events/import-data.event-handler'; +import { CacheModule } from '@nestjs/cache-manager'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; +import { AppEventsModule } from 'modules/events/app-events.module'; // TODO: Move EUDR related stuff to EUDR modules @@ -48,14 +51,16 @@ import { ImportDataEventHandler } from '../events/import-data-events/import-data MaterialsModule, BusinessUnitsModule, SuppliersModule, - SourcingLocationsModule, - GeoCodingModule, + forwardRef(() => SourcingLocationsModule), + forwardRef(() => GeoCodingModule), IndicatorRecordsModule, TasksModule, IndicatorsModule, ImpactModule, WebSocketsModule, NotificationsModule, + CacheModule.register(), + AppEventsModule, ], providers: [ MulterConfigService, @@ -69,6 +74,7 @@ import { ImportDataEventHandler } from '../events/import-data-events/import-data ExcelValidatorService, SourcingDataDbCleaner, ImportDataEventHandler, + ImportDataProgressEmitter, { provide: 'FILE_UPLOAD_SIZE_LIMIT', useValue: config.get('fileUploads.sizeLimit'), @@ -83,6 +89,6 @@ import { ImportDataEventHandler } from '../events/import-data-events/import-data }, ], controllers: [ImportDataController], - exports: [ImportDataService, MulterConfigService], + exports: [ImportDataService, MulterConfigService, ImportDataProgressEmitter], }) export class ImportDataModule {} diff --git a/api/src/modules/import-data/import-data.service.ts b/api/src/modules/import-data/import-data.service.ts index d16b2fdea..a229bb311 100644 --- a/api/src/modules/import-data/import-data.service.ts +++ b/api/src/modules/import-data/import-data.service.ts @@ -4,9 +4,7 @@ import { ServiceUnavailableException, } from '@nestjs/common'; import { SourcingDataImportService } from 'modules/import-data/sourcing-data/sourcing-data-import.service'; -import { - ImportDataProducer -} from 'modules/import-data/workers/import-data.producer'; +import { ImportDataProducer } from 'modules/import-data/workers/import-data.producer'; import { Task } from 'modules/tasks/task.entity'; import { TasksService } from 'modules/tasks/tasks.service'; @@ -18,7 +16,7 @@ export class ImportDataService { private readonly importDataProducer: ImportDataProducer, private readonly sourcingDataImportService: SourcingDataImportService, private readonly tasksService: TasksService, - ) { } + ) {} async loadXlsxFile( userId: string, @@ -34,7 +32,8 @@ export class ImportDataService { return this.tasksService.serialize(task); } catch (error: any) { this.logger.error( - `Job for file: ${xlsxFileData.filename + `Job for file: ${ + xlsxFileData.filename } sent by user: ${userId} could not been added to queue: ${error.toString()}`, ); @@ -45,7 +44,10 @@ export class ImportDataService { } } - async processImportJob(taskId: string, xlsxFileData: Express.Multer.File): Promise { + async processImportJob( + taskId: string, + xlsxFileData: Express.Multer.File, + ): Promise { await this.sourcingDataImportService.importSourcingData( xlsxFileData.path, taskId, diff --git a/api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts b/api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts deleted file mode 100644 index 96d3015a0..000000000 --- a/api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; - -export class ValidationProgressTracker { - totalSteps: number; - progress: number = 0; - - constructor( - public readonly importProgressEmitter: ImportProgressEmitter, - trackingOptions: { - totalSteps: number; - }, - ) { - this.importProgressEmitter = importProgressEmitter; - this.totalSteps = trackingOptions.totalSteps; - } - - trackProgress(): void { - this.progress++; - - this.importProgressEmitter.emitValidationProgress({ - progress: this.getProgress(), - }); - } - - private getProgress(): number { - return (this.progress / this.totalSteps) * 100; - } -} diff --git a/api/src/modules/import-data/sourcing-data/validation/excel-validator.service.ts b/api/src/modules/import-data/sourcing-data/validation/excel-validator.service.ts index bb94fcc4a..94e35343e 100644 --- a/api/src/modules/import-data/sourcing-data/validation/excel-validator.service.ts +++ b/api/src/modules/import-data/sourcing-data/validation/excel-validator.service.ts @@ -10,9 +10,8 @@ import { SourcingDataSheetValidator } from './validators/sourcing-data.sheet-val import { IndicatorsSheetValidator } from './validators/indicators.sheet-validator'; import { plainToInstance } from 'class-transformer'; import { validate, ValidationError } from 'class-validator'; -import { ValidationProgressTracker } from '../../progress-tracker/validation.progress-tracker'; -import { ImportProgressTrackerFactory } from 'modules/events/import-data-progress/import-progress.tracker.factory'; -import { ImportTaskError } from '../../../tasks/types/import-task-error.type'; +import { ImportTaskError } from 'modules/tasks/types/import-task-error.type'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; export type SourcingDataSheet = { materials: MaterialSheetValidator[]; @@ -49,14 +48,19 @@ export class ExcelValidatorService { constructor( private readonly dtoProcessor: SourcingRecordsDtoProcessorService, - private readonly importProgressTrackerFactory: ImportProgressTrackerFactory, + private readonly importDataProgressEmitter: ImportDataProgressEmitter, ) {} async validate(sheet: SourcingDataSheet): Promise { - const progressTracker: ValidationProgressTracker = - this.getProgressTracker(sheet); const validationErrors: ImportTaskError[] = []; + let stepsSoFar: number = 0; + const totalSteps: number = + SHEET_NAMES.reduce( + (acc: number, sheetName: SheetName) => acc + sheet[sheetName].length, + 0, + ) + 1; + /* * Parse sourcing location rows and sourcing records from the sourcing data sheet */ @@ -75,7 +79,11 @@ export class ExcelValidatorService { if (errors.length) { this.handleErrors(errors, index, sheetName, validationErrors); } - progressTracker.trackProgress(); + + stepsSoFar++; + this.importDataProgressEmitter.emitValidationProgress( + (stepsSoFar / totalSteps) * 100, + ); } } @@ -92,19 +100,6 @@ export class ExcelValidatorService { return sheetName === 'sourcingData' ? index + 5 : index + 2; } - private getProgressTracker( - sheet: SourcingDataSheet, - ): ValidationProgressTracker { - const totalSteps: number = - SHEET_NAMES.reduce( - (acc: number, sheetName: SheetName) => acc + sheet[sheetName].length, - 0, - ) + 1; - return this.importProgressTrackerFactory.createValidationProgressTracker({ - totalSteps: totalSteps, - }); - } - private handleErrors( errors: ValidationError[], index: number, diff --git a/api/src/modules/import-data/workers/import-data.consumer.ts b/api/src/modules/import-data/workers/import-data.consumer.ts index 2c5d04a5c..06f30bc84 100644 --- a/api/src/modules/import-data/workers/import-data.consumer.ts +++ b/api/src/modules/import-data/workers/import-data.consumer.ts @@ -2,6 +2,7 @@ import { OnQueueCompleted, OnQueueError, OnQueueFailed, + OnQueueProgress, Process, Processor, } from '@nestjs/bull'; @@ -10,9 +11,13 @@ import { CommandBus, EventBus } from '@nestjs/cqrs'; import { Job } from 'bull'; import { ExcelImportJob } from 'modules/import-data/workers/import-data.producer'; import { importQueueName } from 'modules/import-data/workers/import-queue.name'; -import { HandleImportFailedCommand } from '../cqrs/import-data-failed.handler'; -import { StartImportProcessingCommand } from '../cqrs/import-data-processing.handler'; -import { HandleImportSuccessCommand } from '../cqrs/import-data-success.handler'; +import { HandleImportFailedCommand } from 'modules/import-data/cqrs/import-data-failed.handler'; +import { StartImportProcessingCommand } from 'modules/import-data/cqrs/import-data-processing.handler'; +import { HandleImportSuccessCommand } from 'modules/import-data/cqrs/import-data-success.handler'; +import { Cache } from 'cache-manager'; +import { ImportProgressSocket } from 'modules/events/import-data-progress/import-progress.socket'; +import { ImportDataProgressObject } from 'modules/import-data/import-data-progress.object'; +import { IMPORT_JOB_CACHE_KEY } from 'modules/import-data/cqrs/import-data-progress.handler'; @Processor(importQueueName) export class ImportDataConsumer { @@ -21,7 +26,9 @@ export class ImportDataConsumer { constructor( private readonly commandBus: CommandBus, private readonly eventBus: EventBus, - ) { } + private readonly importProgressSocket: ImportProgressSocket, + private readonly cache: Cache, + ) {} @OnQueueError() async onQueueError(error: Error): Promise { @@ -35,7 +42,7 @@ export class ImportDataConsumer { if (this.isJobStalled(err)) { return this.removeJob(job); } - + // Delegate the failure logic to a command await this.commandBus.execute( new HandleImportFailedCommand( @@ -54,11 +61,28 @@ export class ImportDataConsumer { ); } + @OnQueueProgress() + async onJobProgress( + job: Job, + progress: ImportDataProgressObject, + ): Promise { + this.logger.debug(`Job ${job.id} progress: ${JSON.stringify(progress)}`); + + // Emit the progress to the socket + this.importProgressSocket.emitProgressUpdateToSocket(progress.payload); + } + @Process('excel-import-job') async readImportDataJob(job: Job): Promise { const { taskId, xlsxFileData } = job.data; + + // set the job id in the cache so we can reference it later for tracking + await this.cache.set(IMPORT_JOB_CACHE_KEY, job.id); + // Delegate the processing logic to a command - await this.commandBus.execute(new StartImportProcessingCommand(taskId, xlsxFileData)); + await this.commandBus.execute( + new StartImportProcessingCommand(taskId, xlsxFileData), + ); } private isJobStalled(err: Error): boolean { @@ -68,4 +92,21 @@ export class ImportDataConsumer { private async removeJob(job: Job): Promise { return job.remove(); } + /* + registerJobEventHandler(job: Job): void { + if (!this.importProgressEventHandler) { + this.importProgressEventHandler = new ImportProgressTrackerEventHandler( + job, + ); + } + this.eventBus.register([new ImportProgressTrackerEventHandler(job)]); + } + + unregisterJobEventHandler(): void { + if (this.importProgressEventHandler) { + this.eventBus.([this.importProgressEventHandler]); + } + } + + */ } diff --git a/api/src/modules/indicator-records/indicator-record.repository.ts b/api/src/modules/indicator-records/indicator-record.repository.ts index cc08a1d13..5101b8181 100644 --- a/api/src/modules/indicator-records/indicator-record.repository.ts +++ b/api/src/modules/indicator-records/indicator-record.repository.ts @@ -5,8 +5,8 @@ import { AppBaseRepository } from 'utils/app-base.repository'; import { SaveOptions } from 'typeorm/repository/SaveOptions'; import { chunk } from 'lodash'; import { AppConfig } from 'utils/app.config'; -import { ImportProgressTrackerFactory } from 'modules/events/import-data-progress/import-progress.tracker.factory'; import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; const dbConfig: any = AppConfig.get('db'); const batchChunkSize: number = parseInt(`${dbConfig.batchChunkSize}`, 10); @@ -15,7 +15,7 @@ const batchChunkSize: number = parseInt(`${dbConfig.batchChunkSize}`, 10); export class IndicatorRecordRepository extends AppBaseRepository { constructor( protected dataSource: DataSource, - private readonly importProgressTrackerFactory: ImportProgressTrackerFactory, + protected importDataProgressEmitter: ImportDataProgressEmitter, ) { super(IndicatorRecord, dataSource.createEntityManager()); } @@ -33,7 +33,7 @@ export class IndicatorRecordRepository extends AppBaseRepository ImportDataModule), ], controllers: [IndicatorRecordsController], providers: [ diff --git a/api/src/modules/indicator-records/services/impact-calculator.service.ts b/api/src/modules/indicator-records/services/impact-calculator.service.ts index 0e4897e91..cae9ff5fb 100644 --- a/api/src/modules/indicator-records/services/impact-calculator.service.ts +++ b/api/src/modules/indicator-records/services/impact-calculator.service.ts @@ -28,9 +28,8 @@ import { CACHED_DATA_TYPE, CachedData, } from 'modules/cached-data/cached-data.entity'; -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker'; -import { ImportProgressTrackerFactory } from 'modules/events/import-data-progress/import-progress.tracker.factory'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; /** * @description: This is PoC (Proof of Concept) for the updated LG methodology v0.1 @@ -53,8 +52,7 @@ export class ImpactCalculator { private readonly dependencyManager: ImpactQueryBuilder, private readonly cachedDataService: CachedDataService, private readonly dataSource: DataSource, - private readonly importProgress: ImportProgressEmitter, - private readonly importProgressTrackerFactory: ImportProgressTrackerFactory, + private readonly importDataProgressEmitter: ImportDataProgressEmitter, ) {} async calculateImpactForAllSourcingRecords( @@ -64,7 +62,7 @@ export class ImpactCalculator { const halfTime: number = totalEstimatedTime / 2; // La mitad del tiempo para esta tarea const progressIncrement: number = 50 / (halfTime / 1000); // Cálculo para incrementar al 50% const tracker: ImpactCalculationProgressTracker = - this.importProgressTrackerFactory.createImpactCalculationProgressTracker({ + new ImpactCalculationProgressTracker(this.importDataProgressEmitter, { totalRecords: 1, totalChunks: 1, startingPercentage: 0, diff --git a/api/src/modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker.ts b/api/src/modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker.ts deleted file mode 100644 index d67ec2be0..000000000 --- a/api/src/modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { ImportProgressEmitter } from 'modules/events/import-data-progress/import-progress.emitter'; - -export class SourcingDataImportProgressTracker { - totalRecords: number; - progress: number = 0; - progressPerChunk: number; - - constructor( - private readonly importProgressEmitter: ImportProgressEmitter, - private readonly importTrackInfo: { - totalRecords: number; - totalChunks: number; - }, - ) { - this.importProgressEmitter = importProgressEmitter; - this.totalRecords = importTrackInfo.totalRecords; - this.progressPerChunk = (100 - 50) / importTrackInfo.totalChunks; - } - - trackProgress(): void { - this.progress += this.progressPerChunk; - - this.importProgressEmitter.emitImportProgress({ - progress: this.getProgress(), - }); - } - - private getProgress(): number { - return this.progress; - } -} diff --git a/api/src/modules/sourcing-locations/sourcing-location.repository.ts b/api/src/modules/sourcing-locations/sourcing-location.repository.ts index 409d8adeb..7ad23f168 100644 --- a/api/src/modules/sourcing-locations/sourcing-location.repository.ts +++ b/api/src/modules/sourcing-locations/sourcing-location.repository.ts @@ -10,9 +10,8 @@ import { Injectable, Logger, NotFoundException } from '@nestjs/common'; import { BaseQueryBuilder } from 'utils/base.query-builder'; import { SaveOptions } from 'typeorm/repository/SaveOptions'; import { chunk } from 'lodash'; -import { SourcingDataImportProgressTracker } from 'modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker'; -import { ImportProgressTrackerFactory } from 'modules/events/import-data-progress/import-progress.tracker.factory'; import { AppConfig } from 'utils/app.config'; +import { ImportDataProgressEmitter } from 'modules/import-data/cqrs/import-data-progress.emitter'; const dbConfig: any = AppConfig.get('db'); const batchChunkSize: number = parseInt(`${dbConfig.batchChunkSize}`, 10); @@ -23,7 +22,7 @@ export class SourcingLocationRepository extends Repository { constructor( protected dataSource: DataSource, - protected readonly trackerFactory: ImportProgressTrackerFactory, + protected readonly importDataProgressEmitter: ImportDataProgressEmitter, ) { super(SourcingLocation, dataSource.createEntityManager()); } @@ -60,11 +59,8 @@ export class SourcingLocationRepository extends Repository { const result: SourcingLocation[][] = []; const totalEntities: number = entities.length; const totalChunks: number = Math.ceil(totalEntities / batchChunkSize); - const tracker: SourcingDataImportProgressTracker = - this.trackerFactory.createSourcingDataImportTracker({ - totalRecords: entities.length, - totalChunks, - }); + const progressPerChunk: number = (100 - 50) / totalChunks; + let progressSoFar: number = 0; try { for (const [index, dataChunk] of chunk( @@ -79,7 +75,9 @@ export class SourcingLocationRepository extends Repository { ); const saved: SourcingLocation[] = await Promise.all(promises); result.push(saved); - tracker.trackProgress(); + + progressSoFar += progressPerChunk; + this.importDataProgressEmitter.emitImportProgress(progressSoFar); } // commit transaction if every chunk was saved successfully diff --git a/api/src/modules/sourcing-locations/sourcing-locations.module.ts b/api/src/modules/sourcing-locations/sourcing-locations.module.ts index 13e2bb931..0e22fc407 100644 --- a/api/src/modules/sourcing-locations/sourcing-locations.module.ts +++ b/api/src/modules/sourcing-locations/sourcing-locations.module.ts @@ -9,6 +9,7 @@ import { BusinessUnitsModule } from 'modules/business-units/business-units.modul import { SuppliersModule } from 'modules/suppliers/suppliers.module'; import { MaterialsModule } from 'modules/materials/materials.module'; import { SourcingLocationRepository } from 'modules/sourcing-locations/sourcing-location.repository'; +import { ImportDataModule } from 'modules/import-data/import-data.module'; @Module({ imports: [ @@ -17,6 +18,7 @@ import { SourcingLocationRepository } from 'modules/sourcing-locations/sourcing- forwardRef(() => BusinessUnitsModule), forwardRef(() => SuppliersModule), forwardRef(() => MaterialsModule), + forwardRef(() => ImportDataModule), ], controllers: [SourcingLocationsController], providers: [