Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamlined logging done by bree jobs. This PR includes logging chang… #964

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions Tombolo/server/jobSchedular/job-scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,26 @@ class JobScheduler {
workerName = "File monitoring";

if (message === "done") {
logger.verbose(`${workerName} signaled 'done'`);
// Handle this is in Finally block
}
if (message?.level === "verbose") {
logger.verbose(`[${workerName}]:`);
logger.verbose(message.text);
}
if (message?.level === "info") {
logger.info(`[${workerName}]:`);
logger.info(message.text);

if (
message?.level === "warn" ||
message?.level === "info" ||
message?.level === "verbose" ||
message?.level === "debug" ||
message?.level === "silly"
) {
logger[message.level](message.text);
}

if (message?.level === "error") {
logger.error(`[${workerName}]:`);
logger.error(`${message.text}`, message.error);
}

if (message?.action === "remove") {
this.bree.remove(worker.name);
logger.info(`👷 JOB REMOVED: ${workerName}`);
logger.info(`Job removed: ${workerName}`);
}
if (message?.action == "scheduleNext") {
await this.scheduleCheckForJobsWithSingleDependency({
Expand Down Expand Up @@ -135,11 +138,12 @@ class JobScheduler {
await this.checkClusterReachability();
})();
}

//Bree related methods
logBreeJobs() {
return logBreeJobs.call(this);
}

createNewBreeJob({
uniqueJobName,
cron,
Expand Down Expand Up @@ -360,7 +364,7 @@ class JobScheduler {
return createOrbitMonitoringJob.call(this, { orbitMonitoring_id, cron });
}

checkClusterReachability(){
checkClusterReachability() {
return checkClusterReachability.call(this);
}
}
Expand Down
2 changes: 1 addition & 1 deletion Tombolo/server/jobSchedularMethods/apiKeys.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async function scheduleKeyCheck() {
});

this.bree.start(jobName);
logger.info("📺 KEY MONITORING STARTED ...");
logger.info("API key monitoring initialized ...");
} catch (err) {
logger.error(err);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async function checkClusterReachability() {
},
});
this.bree.start(jobName);
logger.info("🕗 CHECKING CLUSTER REACHABILITY ... ");
logger.info("Cluster reachability checker job initialized ...");
} catch (err) {
logger.error(err);
}
Expand Down
6 changes: 3 additions & 3 deletions Tombolo/server/jobSchedularMethods/clusterJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const CLUSTER_USAGE_HISTORY_TRACKER = "submitClusterUsageTracker.js";
const SUBMIT_CLUSTER_MONITORING_JOB = "submitClusterMonitoring.js";

async function scheduleClusterTimezoneOffset() {
logger.info("☸ CLUSTER TIMEZONE OFFSET STARTED ...");
logger.info("Cluster timezone offset checker job initialized ...");
try {
let jobName = "cluster-timezone-offset-" + new Date().getTime();
this.bree.add({
Expand Down Expand Up @@ -39,7 +39,7 @@ async function createClusterUsageHistoryJob() {
};
this.bree.add(job);
this.bree.start(uniqueJobName);
logger.info("📈 CLUSTER USAGE HISTORY TRACKER JOB STARTED ...");
logger.info("Cluster usage monitoring job initialized ...");
}

function createClusterMonitoringBreeJob({ clusterMonitoring_id, cron }) {
Expand All @@ -58,7 +58,7 @@ function createClusterMonitoringBreeJob({ clusterMonitoring_id, cron }) {

async function scheduleClusterMonitoringOnServerStart() {
try {
logger.info("📺 CLUSTER MONITORING STARTED ...");
logger.info("Cluster monitoring initialized ...");
const clusterMonitoring = await ClusterMonitoring.findAll({ raw: true });
for (let monitoring of clusterMonitoring) {
const { id, cron, isActive } = monitoring;
Expand Down
4 changes: 2 additions & 2 deletions Tombolo/server/jobSchedularMethods/hpccFiles.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ function createSuperFileMonitoringBreeJob({ filemonitoring_id, cron }) {

async function scheduleSuperFileMonitoringOnServerStart() {
try {
logger.info("📺 SUPER FILE MONITORING STARTED ...");
logger.info("Super file monitoring initialized ...");
const superfileMonitoring = await filemonitoring_superfile.findAll({
raw: true,
});
Expand Down Expand Up @@ -164,7 +164,7 @@ async function scheduleFileMonitoringOnServerStart() {
}

async function scheduleFileMonitoring() {
logger.info("📂 FILE MONITORING STARTED ...");
logger.info("File monitoring initialized ...");
try {
let jobName = "file-monitoring-" + new Date().getTime();
this.bree.add({
Expand Down
2 changes: 1 addition & 1 deletion Tombolo/server/jobSchedularMethods/hpccJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const logger = require("../config/logger");
const JOB_STATUS_POLLER = "statusPoller.js";

async function scheduleJobStatusPolling() {
logger.info("📢 STATUS POLLING SCHEDULER STARTED...");
logger.info("Status puller for dataflow jobs initialized ...");

try {
let jobName = "job-status-poller-" + new Date().getTime();
Expand Down
24 changes: 12 additions & 12 deletions Tombolo/server/jobSchedularMethods/jobMonitoring.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async function startJobMonitoring() {
let jobName = "job-monitoring" + new Date().getTime();
this.bree.add({
name: jobName,
// interval: "10s", // For development
interval: humanReadableIntervalForJobMonitoring,
interval: "10s", // For development
// interval: humanReadableIntervalForJobMonitoring,
path: path.join(
__dirname,
"..",
Expand All @@ -43,9 +43,9 @@ async function startJobMonitoring() {
},
});
this.bree.start(jobName);
logger.info("🕗 JOB MONITORING STARTED ");
logger.info("Job Monitoring initialized ...");
} catch (err) {
logger.error(err);
logger.error(err.message);
}
}

Expand All @@ -65,8 +65,8 @@ async function startIntermediateJobsMonitoring() {
let jobName = "intermediate-state-jobs-monitoring" + new Date().getTime();
this.bree.add({
name: jobName,
// interval: "20s", // For development
interval: humanReadableIntervalForIntermediateJobMonitoring,
interval: "20s", // For development
// interval: humanReadableIntervalForIntermediateJobMonitoring,
path: path.join(
__dirname,
"..",
Expand All @@ -82,9 +82,9 @@ async function startIntermediateJobsMonitoring() {
},
});
this.bree.start(jobName);
logger.info("🕗 INTERMEDIATE JOB MONITORING STARTED ");
logger.info("Intermediate job monitoring initialized ...");
} catch (err) {
logger.error(err);
logger.error(err.message);
}
}

Expand All @@ -103,8 +103,8 @@ async function startJobPunctualityMonitoring() {
let jobName = "job-punctuality-monitoring" + new Date().getTime();
this.bree.add({
name: jobName,
// interval: "10s", // For development
interval: humanReadableIntervalForJobPunctualityMonitoring,
interval: "10s", // For development
// interval: humanReadableIntervalForJobPunctualityMonitoring,
path: path.join(
__dirname,
"..",
Expand All @@ -120,10 +120,10 @@ async function startJobPunctualityMonitoring() {
},
});
this.bree.start(jobName);
logger.info("🕗 JOB PUNCTUALITY MONITORING STARTED ...");
logger.info("Job punctuality monitoring initialized ...");
}
catch (err) {
logger.error(err);
logger.error(err.message);
}
}

Expand Down
4 changes: 2 additions & 2 deletions Tombolo/server/jobSchedularMethods/notificationJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async function scheduleEmailNotificationProcessing() {
});

this.bree.start(jobName);
logger.info("🔔 E-MAIL NOTIFICATION PROCESSING STARTED ...");
logger.info("E-mail Notification processing job initialized ...");
} catch (err) {
console.error(err);
}
Expand All @@ -42,7 +42,7 @@ async function scheduleTeamsNotificationProcessing() {
});

this.bree.start(jobName);
logger.info("🔔 TEAMS NOTIFICATION PROCESSING STARTED ...");
logger.info("Teams notification processing job initialized ...");
} catch (err) {
console.error(err);
}
Expand Down
4 changes: 2 additions & 2 deletions Tombolo/server/jobSchedularMethods/orbitJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function createOrbitMegaphoneJob() {
};
this.bree.add(job);
this.bree.start(uniqueJobName);
logger.info("📈 ORBIT MEGAPHONE JOB STARTED ...");
logger.info("Orbit megaphone job initialized ...");
}

function createOrbitMonitoringJob({ orbitMonitoring_id, cron }) {
Expand All @@ -35,7 +35,7 @@ function createOrbitMonitoringJob({ orbitMonitoring_id, cron }) {

async function scheduleOrbitMonitoringOnServerStart() {
try {
logger.info("📺 ORBIT MONITORING STARTED ...");
logger.info("Orbit monitoring initialized ...");
const orbitMonitorings = await orbitMonitoring.findAll({ raw: true });
for (let monitoring of orbitMonitorings) {
const { id, cron, isActive } = monitoring;
Expand Down
46 changes: 28 additions & 18 deletions Tombolo/server/jobs/jobMonitoring/monitorIntermediateStateJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const { WorkunitsService } = require("@hpcc-js/comms");
const _ = require("lodash");

// Local imports
const logger = require("../../config/logger");
const models = require("../../models");
const { decryptString } = require("../../utils/cipher");

Expand All @@ -26,6 +25,7 @@ const monitoring_types = models.monitoring_types;
const monitoring_logs = models.monitoring_logs;

(async () => {
parentPort && parentPort.postMessage({level: "info", text: "Intermediate state JM: Monitoring started ..."});
const now = new Date(); // UTC time

try {
Expand Down Expand Up @@ -85,9 +85,7 @@ const monitoring_logs = models.monitoring_logs;
clusterInfo.timezone_offset || 0
);
} catch (error) {
logger.error(
`Failed to decrypt hash for cluster ${clusterInfo.id}: ${error.message}`
);
parentPort && parentPort.postMessage({level: "error", text: `Intermediate State Job Monitoring: Failed to decrypt hash for cluster ${clusterInfo.id}: ${error.message}`});
}
});

Expand Down Expand Up @@ -143,7 +141,7 @@ const monitoring_logs = models.monitoring_logs;
try{
newWuDetails = (await wuService.WUInfo({ Wuid: wuData.Wuid })).Workunit;
}catch(err){
logger.error(`Intermediate JM : Error getting WU details for ${wuData.Wuid} on cluster ${clusterDetail.id}: ${err.message}`);
parentPort && parentPort.postMessage({level: "error", text: `Intermediate state JM : Error getting WU details for ${wuData.Wuid} on cluster ${clusterDetail.id}: ${err.message}`});
continue;
}

Expand Down Expand Up @@ -173,7 +171,8 @@ const monitoring_logs = models.monitoring_logs;
} else if (notificationConditionLowerCase.includes(currentStateLowerCase)) {
// Check if the job state is included in the notification condition
sendAlert = true;
notificationDescription = `job is in ${wuData.State} state.`;
wuData.State = currentWuState;
notificationDescription = `job is in ${currentWuState} state.`;
keepWu = false;
} else if (intermediateStates.includes(currentStateLowerCase)) {
// Check if the job state is in intermediate states
Expand All @@ -186,7 +185,11 @@ const monitoring_logs = models.monitoring_logs;
sendAlert = true;
keepWu = false;
} else {
logger.verbose(`Intermediate JM : ${wuData.Wuid} on cluster ${clusterDetail.id} is in intermediate state ${currentWuState} not covered by any condition`);
parentPort &&
parentPort.postMessage({
level: "verbose",
text: `Intermediate state JM : ${wuData.Wuid} on cluster ${clusterDetail.id} is in intermediate state ${currentWuState} not covered by any condition`,
});
}
}

Expand Down Expand Up @@ -255,6 +258,8 @@ const monitoring_logs = models.monitoring_logs;
"Discovered at": findLocalDateTimeAtCluster(
clusterDetail.timezone_offset
).toLocaleString(),
"Expected Start Time": expectedStartTime,
"Expected Completion Time": expectedCompletionTime,
},
notificationId: generateNotificationId({
notificationPrefix,
Expand Down Expand Up @@ -288,7 +293,7 @@ const monitoring_logs = models.monitoring_logs;
}
}
} catch (err) {
logger.error(`Monitoring Intermediate state jobs : ${err.message}`);
parentPort && parentPort.postMessage({level: "error", text: `Intermediate State Job Monitoring: ${err.message}`});
}
}

Expand All @@ -302,9 +307,11 @@ const monitoring_logs = models.monitoring_logs;
wuToStopMonitoring.length === 0 &&
Object.keys(wuWithNewIntermediateState).length === 0
) {
logger.debug(
"Intermediate state job Monitoring - No WU to remove from intermediate state and no intermediate workunit with updated state Exiting..."
);
parentPort &&
parentPort.postMessage({
level: "info",
text: "Intermediate state JM: No WU to remove or update. Exiting...",
});
return;
}

Expand Down Expand Up @@ -334,16 +341,19 @@ const monitoring_logs = models.monitoring_logs;
{ where: { id } }
);
} catch (error) {
logger.error(
`Intermediate State Jobs - Error updating log with id ${log.id}:`,
error
);
parentPort &&
parentPort.postMessage({
level: "error",
text: `Intermediate state JM: Error updating log with id ${log.id}: ${error.message}`,
});
}
}
} catch (err) {
logger.error(`Monitoring Intermediate state jobs : ${err.message}`);
parentPort && parentPort.postMessage({level: "error", text: `Intermediate state JM: ${err.message}`});
} finally {
if (parentPort) parentPort.postMessage("done");
else process.exit(0);
if (parentPort){
parentPort.postMessage({level: "info", text: `Intermediate state JM: Job completed successfully in ${new Date() - now} ms`});
}
else{ process.exit(0)};
}
})();
Loading
Loading