Skip to content

Commit

Permalink
#2180 - Queue Consumers Log improvements (#2296)
Browse files Browse the repository at this point in the history
Created the class `ProcessSummary` (based on the existing one) at the
generic library to allow the capture of logs alongside processes like
integrations and schedulers services. The goal is to allow the capture
of a set of logs to allow them to be logged at the same time for
convenience. For instance, the below logs are better interpreted if read
altogether.
This effort does not have the intention of removing the
`QueueProcessSummary` but it improves the logs one step further.
  • Loading branch information
andrewsignori-aot authored Sep 15, 2023
1 parent 598073f commit 12d9f30
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { LoggerService } from "@nestjs/common";
import { ProcessSummary } from "@sims/utilities/logger";

export interface QueueProcessSummaryResult {
/**
Expand Down Expand Up @@ -81,6 +82,27 @@ export class QueueProcessSummary {
errors: this.errors.length ? this.errors : undefined,
};
}

/**
* Writes all the log entries.
* @param processSummary process summary logs.
* @param logger optionally provides a job logger
* in case one is not present.
*/
async logProcessSummaryToJobLogger(
processSummary: ProcessSummary,
logger?: JobLogger,
): Promise<void> {
const jobLogger = logger ?? this.loggers?.jobLogger;
if (!jobLogger) {
throw new Error("No job logger was provided.");
}
for (const logEntry of processSummary.flattenLogs()) {
await jobLogger.log(
`${logEntry.level.toUpperCase()}: ${logEntry.message}`,
);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { Job, Queue } from "bull";
import { BaseScheduler } from "../base-scheduler";
import { QueueNames } from "@sims/utilities";
import { QueueNames, parseJSONError } from "@sims/utilities";
import { QueueService } from "@sims/services/queue";
import { QueueProcessSummary } from "../../models/processors.models";
import {
QueueProcessSummary,
QueueProcessSummaryResult,
} from "../../models/processors.models";
import { WorkflowEnqueuerService } from "../../../services";
import { InjectLogger, LoggerService } from "@sims/utilities/logger";
import { ProcessSummaryResult } from "@sims/integrations/models";
import {
InjectLogger,
LoggerService,
ProcessSummary,
} from "@sims/utilities/logger";

/**
* Search for assessments that have some pending operation, for instance,
Expand All @@ -31,20 +37,37 @@ export class AssessmentWorkflowEnqueuerScheduler extends BaseScheduler<void> {
@Process()
async enqueueAssessmentOperations(
job: Job<void>,
): Promise<ProcessSummaryResult[]> {
): Promise<QueueProcessSummaryResult> {
const summary = new QueueProcessSummary({
appLogger: this.logger,
jobLogger: job,
});
await summary.info(
"Checking application assessments to be queued for start.",
);
const result =
await this.workflowEnqueuerService.enqueueStartAssessmentWorkflows();
await summary.info("All application assessments queued.");
// Process summary to be populated by the enqueueStartAssessmentWorkflows.
// In case an unexpected error happen the finally block will still be able to
// output the partial information captured by the processSummary.
const processSummary = new ProcessSummary();
try {
await this.workflowEnqueuerService.enqueueStartAssessmentWorkflows(
processSummary,
);
await summary.info(
"All application assessments queued with success, check logs for further details.",
);
} catch (error: unknown) {
await summary.error(
`Unexpected error while executing the job check logs for further details. ${parseJSONError(
error,
)}`,
);
} finally {
this.logger.logProcessSummary(processSummary);
await summary.logProcessSummaryToJobLogger(processSummary);
}
await this.cleanSchedulerQueueHistory();
// TODO: logs improvement.
return [summary.getSummary(), result];
return summary.getSummary();
}

@InjectLogger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import { ApplicationService } from "..";
import { Queue } from "bull";
import { StartAssessmentQueueInDTO } from "@sims/services/queue";
import { InjectQueue } from "@nestjs/bull";
import { QueueNames, parseJSONError, processInParallel } from "@sims/utilities";
import { QueueNames, processInParallel } from "@sims/utilities";
import {
Application,
StudentAssessment,
StudentAssessmentStatus,
} from "@sims/sims-db";
import { DataSource } from "typeorm";
import { ProcessSummaryResult } from "@sims/integrations/models";
import { LogScopes, ProcessSummary } from "@sims/utilities/logger";

/**
* Manages the operations to search assessments that requires some
Expand All @@ -29,33 +29,37 @@ export class WorkflowEnqueuerService {
* Search applications with pending assessments to be processed by the assessment workflow.
* If no other assessment is being processed for that application the oldest pending
* assessment will be queue for start.
* @returns process summary log.
* @param summary process summary to group all the logs.
*/
async enqueueStartAssessmentWorkflows(): Promise<ProcessSummaryResult> {
const result = new ProcessSummaryResult();
async enqueueStartAssessmentWorkflows(
summary: ProcessSummary,
): Promise<void> {
try {
result.summary.push(
summary.info(
"Checking database for applications with assessments waiting to be triggered.",
LogScopes.Summary,
);
const applications =
await this.applicationService.getApplicationsToStartAssessments();
result.summary.push(`Found ${applications.length} applications.`);
summary.info(
`Found ${applications.length} applications.`,
LogScopes.Summary,
);
if (!applications.length) {
result.summary.push("No applications found");
return result;
return;
}
result.children = await processInParallel(
summary.children = await processInParallel(
(application: Application) => this.queueNextAssessment(application),
applications,
);
summary.info("All assessments were processed.", LogScopes.Summary);
} catch (error: unknown) {
result.errors.push(
`Error while enqueueing assessment workflows to be processed. ${parseJSONError(
error,
)}`,
summary.error(
"Error while enqueueing assessment workflows to be processed.",
error,
LogScopes.Summary,
);
}
return result;
}

/**
Expand All @@ -65,19 +69,19 @@ export class WorkflowEnqueuerService {
*/
private async queueNextAssessment(
application: Application,
): Promise<ProcessSummaryResult> {
const result = new ProcessSummaryResult();
): Promise<ProcessSummary> {
const summary = new ProcessSummary();
try {
result.summary.push(
summary.info(
`Queueing next pending assessment for application id ${application.id}.`,
);
const [nextAssessment] = application.studentAssessments;
result.summary.push(
summary.info(
`Found ${application.studentAssessments.length} pending assessment(s). Queueing assessment ${nextAssessment.id}.`,
);
// Update application and student assessment.
await this.dataSource.transaction(async (entityManager) => {
result.summary.push(
summary.info(
`Associating application currentProcessingAssessment as assessment id ${nextAssessment.id}.`,
);
const applicationUpdateResult = await entityManager
Expand All @@ -88,7 +92,7 @@ export class WorkflowEnqueuerService {
if (!applicationUpdateResult.affected) {
throw new Error("Application update did not affected any records.");
}
result.summary.push(
summary.info(
`Updating assessment status to ${StudentAssessmentStatus.Queued}.`,
);
const assessmentUpdateResults = await entityManager
Expand All @@ -102,18 +106,17 @@ export class WorkflowEnqueuerService {
);
}
});
result.summary.push(
summary.info(
`Adding assessment to queue ${QueueNames.StartApplicationAssessment}.`,
);
await this.startAssessmentQueue.add({ assessmentId: nextAssessment.id });
result.summary.push("Assessment queued for start.");
summary.info("Assessment queued for start.");
} catch (error: unknown) {
result.errors.push(
`Error while enqueueing assessment workflow to be processed for application id ${
application.id
}. ${parseJSONError(error)}`,
summary.error(
`Error while enqueueing assessment workflow to be processed for application id ${application.id}.`,
error,
);
}
return result;
return summary;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./logger-utils";
export * from "./logger.module";
export * from "./logger.service";
export * from "./process-summary";
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
import { Injectable, ConsoleLogger, Scope } from "@nestjs/common";
import { LogLevels, ProcessSummary } from "./process-summary";

/**
* Common log across entire solution.
*/
@Injectable({ scope: Scope.TRANSIENT })
export class LoggerService extends ConsoleLogger {}
export class LoggerService extends ConsoleLogger {
/**
* Writes all the log entries.
* @param processSummary process summary logs.
*/
logProcessSummary(processSummary: ProcessSummary): void {
for (const logEntry of processSummary.flattenLogs()) {
switch (logEntry.level) {
case LogLevels.Error:
this.error(logEntry.message);
break;
case LogLevels.Warn:
this.warn(logEntry.message);
break;
default:
this.log(logEntry.message);
break;
}
}
}
}
Loading

0 comments on commit 12d9f30

Please sign in to comment.