From ff20ec81646b65ee03430878dcff98d8e71def9c Mon Sep 17 00:00:00 2001 From: yadhap Dahal Date: Wed, 30 Oct 2024 15:50:05 -0400 Subject: [PATCH] Added fixes to JM not sending alerts; --- .../monitorIntermediateStateJobs.js | 465 +++++++----------- .../jobMonitoring/monitorJobPunctuality.js | 58 +-- .../server/jobs/jobMonitoring/monitorJobs.js | 96 ++-- .../jobs/jobMonitoring/monitorJobsUtil.js | 31 +- Tombolo/server/routes/app/read.js | 15 +- 5 files changed, 268 insertions(+), 397 deletions(-) diff --git a/Tombolo/server/jobs/jobMonitoring/monitorIntermediateStateJobs.js b/Tombolo/server/jobs/jobMonitoring/monitorIntermediateStateJobs.js index 8309a4475..f58936d13 100644 --- a/Tombolo/server/jobs/jobMonitoring/monitorIntermediateStateJobs.js +++ b/Tombolo/server/jobs/jobMonitoring/monitorIntermediateStateJobs.js @@ -24,8 +24,6 @@ const cluster = models.cluster; const notification_queue = models.notification_queue; const monitoring_types = models.monitoring_types; const monitoring_logs = models.monitoring_logs; -const IntegrationMapping = models.integration_mapping; -const Integrations = models.integrations; (async () => { const now = new Date(); // UTC time @@ -53,9 +51,19 @@ const Integrations = models.integrations; raw: true, }); + // Monitoring logs with intermediate state Wus -> metaData.wuInIntermediateState length is greater than 0 + const monitoringsWithIntermediateStateWus = monitoringLogs.filter( + (log) => + log.metaData && + log.metaData.wuInIntermediateState && + log.metaData.wuInIntermediateState.length > 0 + ); + // list of unique cluster IDs const clusterIds = [ - ...new Set(monitoringLogs.map((log) => log.cluster_id)), + ...new Set( + monitoringsWithIntermediateStateWus.map((log) => log.cluster_id) + ), ]; // Get cluster info for all unique clusters @@ -83,258 +91,166 @@ const Integrations = models.integrations; } }); - // Get array of monitoring logs which has the intermediate worUnits - const monitoringsWithIntermediateStateWus = []; - - monitoringLogs.forEach((log) => { - if (!log.metaData) return; - const { - metaData: { wuInIntermediateState }, - } = log; - if (!wuInIntermediateState) return; - monitoringsWithIntermediateStateWus.push(log); - }); + // Cluster info as object with cluster ID as key + const clustersInfoObj = clustersInfo.reduce((acc, cluster) => { + acc[cluster.id] = cluster; + return acc; + }, {}); - // copy the array to avoid mutation - const copyMonitoringsWithIntermediateStateWus = [ - ...monitoringsWithIntermediateStateWus, - ]; + // Combine all the intermediate wus in an array + const allIntermediateWus = monitoringsWithIntermediateStateWus.reduce( + (acc, log) => { + const { metaData } = log; + const { wuInIntermediateState = [] } = metaData; + acc.push(...wuInIntermediateState); + return acc; + },[]); const notificationsToBeQueued = []; - const wuNoLongerInIntermediateState = []; + const wuToStopMonitoring = []; const wuWithNewIntermediateState = {}; - - // Find severity level (For ASR ) - based on that determine when to send out notifications - let severityThreshHold = 0; - let severeEmailRecipients = null; - try { - const { id: integrationId } = await Integrations.findOne({ - where: { name: "ASR" }, - raw: true, - }); - - if (integrationId) { - // Get integration mapping with integration details - const integrationMapping = await IntegrationMapping.findOne({ - where: { integration_id: integrationId }, - raw: true, - }); - - if(integrationMapping){ - const { - metaData: { - nocAlerts: { severityLevelForNocAlerts, emailContacts }, - }, - } = integrationMapping; - severityThreshHold = severityLevelForNocAlerts; - severeEmailRecipients = emailContacts; - } - } - } catch (error) { - logger.error( - `Intermediate State Job Monitoring : Error while getting integration level severity threshold: ${error.message}` - ); - } - - for (let [ - monitoringIndex, - monitoring, - ] of monitoringsWithIntermediateStateWus.entries()) { - const { cluster_id } = monitoring; - const clusterDetail = clustersInfo.find( - (cluster) => cluster.id === cluster_id - ); - - try { - // create a new instance of WorkunitsService - const wuService = new WorkunitsService({ - baseUrl: `${clusterDetail.thor_host}:${clusterDetail.thor_port}/`, - userID: clusterDetail.username || "", - password: clusterDetail.password || "", - }); - - const { - metaData: { wuInIntermediateState }, - } = monitoring; - - // Iterate through all the intermediate state WUs - for (let wu of wuInIntermediateState) { - const { - jobMonitoringData: { + const wuToKeepMonitoring = []; + + // Iterate through all the monitoring logs with intermediate state WUs + for (wuData of allIntermediateWus) { + try { + const { clusterId } = wuData; + const clusterDetail = clustersInfoObj[clusterId]; + const { applicationId, - monitoringName, - clusterId, - jobName: jobNamePattern, - metaData: { - notificationMetaData: { - notificationCondition, - primaryContacts = [], - secondaryContacts = [], - notifyContacts = [], + jobName, + jobMonitoringData: { + monitoringName, + metaData: { + notificationMetaData, + asrSpecificMetaData, + requireComplete, + expectedStartTime, + expectedCompletionTime, }, - asrSpecificMetaData, - requireComplete = [], - expectedStartTime, - expectedCompletionTime, }, - }, - } = wu; - - // Cluster details - const clusterDetail = clustersInfo.find( - (cluster) => cluster.id === clusterId - ); - - // Notification ID prefix - let notificationPrefix = "JM"; - let product; - let domain; - let severity; - - if (asrSpecificMetaData && asrSpecificMetaData.productCategory) { - // Product - const { name: productName, shortCode } = await getProductCategory( - asrSpecificMetaData.productCategory - ); - notificationPrefix = shortCode; - product = productName; - - //Domain - const { name: domainName } = await getDomain( - asrSpecificMetaData.domain - ); - domain = domainName; - - //Severity - severity = asrSpecificMetaData.severity; - } - - // Notification condition in lower case. ex - failed, aborted, completed - const notificationConditionLowerCase = notificationCondition.map( - (condition) => condition.toLowerCase() - ); - - try { - const info = await wuService.WUInfo({ Wuid: wu.Wuid }); - const {Workunit: { State }} = info; - - // Check if current time is before, after, within the window - const currentTimeToWindowRelation = - checkIfCurrentTimeIsWithinRunWindow({ - start: expectedStartTime, - end: expectedCompletionTime, - currentTime: clusterDetail.localTime, - }); + } = wuData; + + // create a new instance of WorkunitsService + const wuService = new WorkunitsService({ + baseUrl: `${clusterDetail.thor_host}:${clusterDetail.thor_port}/`, + userID: clusterDetail.username || "", + password: clusterDetail.password || "", + }); + + // Make call to HPCC to get the state of the WU + let newWuDetails = null; + try{ + newWuDetails = (await wuService.WUInfo({ Wuid: wuData.Wuid })).Workunit; + }catch(err){ + logger.error(`Intermediate JM : Error getting WU details for ${wuData.Wuid} on cluster ${clusterDetail.id}: ${err.message}`); + continue; + } - - const sendAlertToNoc = severity >= severityThreshHold && severeEmailRecipients; + const currentWuState = newWuDetails.State; + const currentStateLowerCase = currentWuState.toLowerCase(); + + // Notification condition in lower case + const notificationConditionLowerCase = + notificationMetaData.notificationCondition.map((condition) => + condition.toLowerCase() + ); + + let sendAlert = false; + let keepWu = true; + let notificationDescription; - // WU now in state such as failed, aborted etc - if (notificationConditionLowerCase.includes(State)) { - console.log('------------------------------------------'); - console.dir("Intermediate job failed") - console.log('------------------------------------------'); - // Add new State to the WU - wu.State = _.capitalize(State); - - // notification object - const notificationPayload = createNotificationPayload({ - type: "email", - notificationDescription: `Analysis detected a monitored job in ${State} state`, - templateName: "jobMonitoring", - originationId: monitoringTypeId, - applicationId: applicationId, - subject: `Job Monitoring Alert: Job in ${State} state`, - recipients: { - primaryContacts, - secondaryContacts, - notifyContacts, - }, - jobName: jobNamePattern, - wuState: wu.State, - monitoringName, - issue: { - Issue: `Job in ${wu.State} state`, - Cluster: clusterDetail.name, - "Job Name/Filter": jobNamePattern, - "Returned Job": wu.Jobname, - State: wu.State, - "Discovered at": findLocalDateTimeAtCluster( - clusterDetail.timezone_offset - ), - }, - notificationId: generateNotificationId({ - notificationPrefix, - timezoneOffset: clusterDetail.timezone_offset || 0, - }), - asrSpecificMetaData: { - region: "USA", - product, - domain, - severity, - }, // region: "USA", product: "Telematics", domain: "Insurance", severity: 3, - firstLogged: findLocalDateTimeAtCluster( - clusterDetail.timezone_offset - ), - lastLogged: findLocalDateTimeAtCluster( - clusterDetail.timezone_offset - ), - }); + const currentTimeToWindowRelation = checkIfCurrentTimeIsWithinRunWindow({ + start: expectedStartTime, + end: expectedCompletionTime, + currentTime: findLocalDateTimeAtCluster(clusterDetail.timezone_offset || 0), + }); + // Check if the job is completed + if (currentStateLowerCase === 'completed') { + //TODO - There is a gap, if the job is completed but immediately after the expected time, notification won't be sent, although users might want to know that the job was completed after the expected time + keepWu = false; + } else if (notificationConditionLowerCase.includes(currentStateLowerCase)) { + // Check if the job state is included in the notification condition + sendAlert = true; + notificationDescription = `job is in ${wuData.State} state.`; + keepWu = false; + } else if (intermediateStates.includes(currentStateLowerCase)) { + // Check if the job state is in intermediate states + if (!requireComplete) { + keepWu = false; + } else if (requireComplete && currentTimeToWindowRelation === 'within') { + keepWu = true; + } else if (requireComplete && currentTimeToWindowRelation === "after") { + notificationDescription = `job in ${wuData.State} state and has not been completed by the expected time.`; + sendAlert = true; + keepWu = false; + } else { + logger.verbose(`Intermediate JM : ${wuData.Wuid} on cluster ${clusterDetail.id} is in intermediate state ${currentWuState} not covered by any condition`); + } + } - notificationPayload.wuId = wu.Wuid; - notificationsToBeQueued.push(notificationPayload); + // If monitoring to be kept, add to wuToKeepMonitoring + if (keepWu) { + wuToKeepMonitoring.push(wuData); + }else { + wuToStopMonitoring.push(wuData.Wuid); + } - // NOC email notification if severity is high - if (sendAlertToNoc) { - const notificationPayloadForNoc = { ...notificationPayload} - notificationPayloadForNoc.metaData.notificationDescription = nocAlertDescription; - notificationPayloadForNoc.metaData.mainRecipients = severeEmailRecipients; - notificationPayloadForNoc.metaData.notificationId = generateNotificationId({ - notificationPrefix, - timezoneOffset: clusterDetail.timezone_offset || 0, - }), - delete notificationPayloadForNoc.metaData.cc; - notificationsToBeQueued.push(notificationPayloadForNoc); + // If alert to be sent, create notification payload + if (sendAlert) { + // Notification ID prefix + let notificationPrefix = "JM"; + let product; + let domain; + let region; + let domainLevelSeverity; + let jobLevelSeverity; + let severeEmailRecipients; + + if (asrSpecificMetaData && asrSpecificMetaData.productCategory) { + const { name: productName, shortCode } = + await getProductCategory(asrSpecificMetaData.productCategory); + + notificationPrefix = shortCode; + product = productName; + + const { + name: domainName, + region: domainRegion, + severityThreshold, + severityAlertRecipients, + } = await getDomain(asrSpecificMetaData.domain); + domain = domainName; + region = domainRegion; + domainLevelSeverity = severityThreshold; + jobLevelSeverity = asrSpecificMetaData.severity; + severeEmailRecipients = severityAlertRecipients; } - wuNoLongerInIntermediateState.push(wu.Wuid); - } - // Still in intermediate state but window is passed an the job is required to be completed - else if ( - intermediateStates.includes(State) && - currentTimeToWindowRelation === "after" && - requireComplete === true - ) { - console.log('------------------------------------------'); - console.dir("Intermediate job passed time") - console.log('------------------------------------------'); - - // Add new State to the WU - wu.State = _.capitalize(State); - - // notification object + // Generate notification payload const notificationPayload = createNotificationPayload({ type: "email", - notificationDescription: `Analysis detected that a monitored job has not completed by the expected time. The job is currently in the ${State} state.`, + notificationDescription: `Analysis (${monitoringName}) detected that a monitored ${notificationDescription}`, templateName: "jobMonitoring", originationId: monitoringTypeId, applicationId: applicationId, - subject: `Job Monitoring Alert: Job not completed by expected time`, + subject: `Job Monitoring Alert from ${process.env.INSTANCE_NAME} : Job not completed by expected time`, recipients: { - primaryContacts, - secondaryContacts, - notifyContacts, + primaryContacts: notificationMetaData.primaryContacts || [], + secondaryContacts: + notificationMetaData.secondaryContacts || [], + notifyContacts: notificationMetaData.notifyContacts || [], }, - jobName: jobNamePattern, - wuState: wu.State, + jobName: jobName, + wuState: wuData.State, monitoringName, issue: { - Issue: `Job in ${wu.State} state`, + Issue: _.startCase(notificationDescription), Cluster: clusterDetail.name, - "Job Name/Filter": jobNamePattern, - "Returned Job": wu.Jobname, - State: wu.State, + "Job Name/Filter": wuData.jobNamePattern, + "Returned Job": wuData.Jobname, + State: wuData.State, "Discovered at": findLocalDateTimeAtCluster( clusterDetail.timezone_offset ), @@ -344,10 +260,10 @@ const Integrations = models.integrations; timezoneOffset: clusterDetail.timezone_offset || 0, }), asrSpecificMetaData: { - region: "USA", + region, product, domain, - severity, + severity: jobLevelSeverity, }, // region: "USA", product: "Telematics", domain: "Insurance", severity: 3, firstLogged: findLocalDateTimeAtCluster( clusterDetail.timezone_offset @@ -357,78 +273,32 @@ const Integrations = models.integrations; ), }); - // console.log('-----------Payload 1----------------------'); - // console.dir(notificationPayload) - // console.log('------------------------------------------'); + // Add notification payload to notificationsToBeQueued notificationsToBeQueued.push(notificationPayload); - // NOC email notification if severity is high - if (sendAlertToNoc) { - const notificationPayloadForNoc = { ...notificationPayload} - notificationPayloadForNoc.metaData.notificationDescription = nocAlertDescription; - notificationPayloadForNoc.metaData.mainRecipients = severeEmailRecipients; - notificationPayloadForNoc.metaData.notificationId = generateNotificationId({ - notificationPrefix, - timezoneOffset: clusterDetail.timezone_offset || 0, - }), - delete notificationPayloadForNoc.metaData.cc; - - - // console.log("-----------Payload 2----------------------"); - // console.dir(notificationPayload); - // console.log("------------------------------------------"); - - notificationsToBeQueued.push(notificationPayloadForNoc); - } - - console.log('----------To be queued -------------------'); - console.dir(notificationsToBeQueued) - console.log('------------------------------------------'); - - wuNoLongerInIntermediateState.push(wu.Wuid); - } - // IF the job is still in intermediate state and the current time is within the run window - else if ( - intermediateStates.includes(State) && - currentTimeToWindowRelation === "within" - ) { - console.log('------------------------------------------'); - console.dir("STILL IN INTERMEDIATE STATE") - console.log('------------------------------------------'); - - // If the State has changed from last time it was checked, update monitoring needs to be updated with new state - if (wu.State !== State) { - wuWithNewIntermediateState[wu.Wuid] = State; - } - } else { - console.log('------------------------------------------'); - console.dir("COMPLETED") - console.log('------------------------------------------'); - // WU in completed state - Remove the WU from the intermediate state - wuNoLongerInIntermediateState.push(wu.Wuid); + // If job level severity is greater than or equal to domain level severity, send alert to NOC + if (jobLevelSeverity >= domainLevelSeverity && severeEmailRecipients) { + const notificationPayloadForNoc = _.cloneDeep(notificationPayload); + notificationPayloadForNoc.metaData.notificationDescription = nocAlertDescription; + notificationPayloadForNoc.metaData.mainRecipients = severeEmailRecipients; + notificationPayloadForNoc.metaData.notificationId = generateNotificationId({ notificationPrefix, timezoneOffset: clusterDetail.timezone_offset || 0}), + delete notificationPayloadForNoc.metaData.cc; + notificationsToBeQueued.push(notificationPayloadForNoc); + } } } catch (err) { - logger.error( - `Monitor Intermediate Jobs. WUId - ${wu.Wuid} - Cluster ${cluster_id}: ${err.message}` - ); + logger.error(`Monitoring Intermediate state jobs : ${err.message}`); } - } - } catch (err) { - logger.error(err); - } } // Insert notification in queue for (let notification of notificationsToBeQueued) { - // console.log('-NOTIFICATION LOOP ------------------------'); - // console.dir(notification); - // console.log('------------------------------------------'); await notification_queue.create(notification); } - // if wuNoLongerInIntermediateState is empty, or state of intermediate wu has not changed return + // if wuToStopMonitoring is empty, or state of intermediate wu has not changed return if ( - wuNoLongerInIntermediateState.length === 0 && + wuToStopMonitoring.length === 0 && Object.keys(wuWithNewIntermediateState).length === 0 ) { logger.debug( @@ -445,7 +315,7 @@ const Integrations = models.integrations; // Remove completed jobs let wuStillInIntermediateState = wuInIntermediateState.filter( - (wu) => !wuNoLongerInIntermediateState.includes(wu.Wuid) + (wu) => !wuToStopMonitoring.includes(wu.Wuid) ); // If state of intermediate WU has changed, update @@ -463,13 +333,16 @@ const Integrations = models.integrations; { where: { id } } ); } catch (error) { - logger.error(`Intermediate State Jobs - Error updating log with id ${log.id}:`, error); + logger.error( + `Intermediate State Jobs - Error updating log with id ${log.id}:`, + error + ); } } } catch (err) { - logger.error(err); + logger.error(`Monitoring Intermediate state jobs : ${err.message}`); } finally { if (parentPort) parentPort.postMessage("done"); else process.exit(0); } -})(); +})(); \ No newline at end of file diff --git a/Tombolo/server/jobs/jobMonitoring/monitorJobPunctuality.js b/Tombolo/server/jobs/jobMonitoring/monitorJobPunctuality.js index 7f59a29c7..676107ba7 100644 --- a/Tombolo/server/jobs/jobMonitoring/monitorJobPunctuality.js +++ b/Tombolo/server/jobs/jobMonitoring/monitorJobPunctuality.js @@ -1,7 +1,11 @@ +// Import from libraries const { WorkunitsService } = require("@hpcc-js/comms"); +const { parentPort } = require("worker_threads"); +const _ = require("lodash"); + +// Local imports const logger = require("../../config/logger"); const { decryptString } = require("../../utils/cipher"); -const { parentPort } = require("worker_threads"); const { calculateRunOrCompleteByTimes, generateJobName, @@ -14,6 +18,7 @@ const { } = require("./monitorJobsUtil"); const models = require("../../models"); +// Models const JobMonitoring = models.jobMonitoring; const Cluster = models.cluster; const NotificationQueue = models.notification_queue; @@ -107,41 +112,18 @@ const Integrations = models.integrations; const clusterInfo = clustersObj[clusterId]; // Find severity level (For ASR ) - based on that determine when to send out notifications - let severityThreshHold = 0; + let severityThreshHold = 0; // Domain specific severity threshold for ASR let severeEmailRecipients = null; - if (metaData.asrSpecificMetaData) { + if (asrSpecificMetaData) { try { - const { id: integrationId } = await Integrations.findOne({ - where: { name: "ASR" }, - raw: true, - }); - - if (integrationId) { - // Get integration mapping with integration details - const integrationMapping = await IntegrationMapping.findOne({ - where: { - integration_id: integrationId, - application_id: applicationId, - }, - raw: true, - }); - - if (integrationMapping) { - const { - metaData: { - nocAlerts: { severityLevelForNocAlerts, emailContacts }, - }, - } = integrationMapping; - severityThreshHold = severityLevelForNocAlerts; - severeEmailRecipients = emailContacts; - } + const {domain: domainId} = asrSpecificMetaData; + const domain = await getDomain(domainId); + if(domain) { + severityThreshHold = domain.severityThreshold; + severeEmailRecipients = domain.severityAlertRecipients; } - } catch (error) { - logger.error( - `Job Punctuality Monitoring : Error while getting integration level severity threshold: ${error.message}` - ); - } + } catch (error) {logger.error(`Job Punctuality Monitoring : Error while getting Domain level severity : ${error.message}`);} } // Job level severity threshold @@ -191,16 +173,17 @@ const Integrations = models.integrations; if (jobLevelSeverity < severityThreshHold || !severityThreshHold) { alertTimePassed = window.end < window.currentTime; + lateByInMinutes = Math.floor( (window.currentTime - window.end) / 60000 ); - } else { - alertTimePassed = window.start < window.currentTime; + } else { lateByInMinutes = Math.floor( (window.currentTime - window.start) / 60000 ); } + // If the time has not passed, or with in grace period of 10 minutes, continue if (!alertTimePassed || lateByInMinutes < 10) { continue; @@ -315,12 +298,11 @@ const Integrations = models.integrations; // Notification payload const notificationPayload = createNotificationPayload({ type: "email", - notificationDescription: - "Monitoring detected that a monitored job did not started on time", + notificationDescription: `Monitoring ( ${monitoringName} ) detected that a monitored job did not started on time`, templateName: "jobMonitoring", originationId: monitoringTypeDetails.id, applicationId: applicationId, - subject: `Job Monitoring Alert: Job not started on expected time`, + subject: `Job Monitoring Alert from ${process.env.INSTANCE_NAME} : Job not started on expected time`, recipients: { primaryContacts, secondaryContacts, @@ -359,7 +341,7 @@ const Integrations = models.integrations; // NOC email notification if (jobLevelSeverity >= severityThreshHold && severeEmailRecipients) { - const notificationPayloadForNoc = { ...notificationPayload }; + const notificationPayloadForNoc = _.cloneDeep(notificationPayload); notificationPayloadForNoc.metaData.notificationDescription = nocAlertDescription; notificationPayloadForNoc.metaData.mainRecipients = diff --git a/Tombolo/server/jobs/jobMonitoring/monitorJobs.js b/Tombolo/server/jobs/jobMonitoring/monitorJobs.js index a96b9e79b..11662a5d1 100644 --- a/Tombolo/server/jobs/jobMonitoring/monitorJobs.js +++ b/Tombolo/server/jobs/jobMonitoring/monitorJobs.js @@ -1,7 +1,10 @@ -const monitoring_name = "Job Monitoring"; +// Imports from libraries const { parentPort } = require("worker_threads"); const { WorkunitsService } = require("@hpcc-js/comms"); +const _ = require("lodash"); + +// Local imports const logger = require("../../config/logger"); const models = require("../../models"); const { decryptString } = require("../../utils/cipher"); @@ -17,15 +20,16 @@ const { findLocalDateTimeAtCluster, nocAlertDescription, } = require("./monitorJobsUtil"); -const e = require("express"); +// Models const JobMonitoring = models.jobMonitoring; const cluster = models.cluster; const MonitoringTypes = models.monitoring_types; const MonitoringLogs = models.monitoring_logs; const NotificationQueue = models.notification_queue; -const IntegrationMapping = models.integration_mapping; -const Integrations = models.integrations; + +// Variables +const monitoring_name = "Job Monitoring"; (async () => { const now = new Date(); // UTC time @@ -53,40 +57,6 @@ const Integrations = models.integrations; return; } - // Find severity level (For ASR ) - based on that determine when to send out notifications - let severityThreshHold = 0; - let severeEmailRecipients = null; - - try { - const { id: integrationId } = await Integrations.findOne({ - where: { name: "ASR" }, - raw: true, - }); - - if (integrationId) { - // Get integration mapping with integration details - const integrationMapping = await IntegrationMapping.findOne({ - where: { integration_id: integrationId }, - raw: true, - }); - - - if(integrationMapping){ - const { - metaData: { - nocAlerts: { severityLevelForNocAlerts, emailContacts }, - }, - } = integrationMapping; - severityThreshHold = severityLevelForNocAlerts; - severeEmailRecipients = emailContacts; - } - } - } catch (error) { - logger.error( - `Job Monitoring : Error while getting integration level severity threshold: ${error.message}` - ); - } - /* Organize job monitoring based on cluster ID. This approach simplifies interaction with the HPCC cluster and minimizes the number of necessary API calls. */ const jobMonitoringsByCluster = {}; @@ -137,6 +107,7 @@ const Integrations = models.integrations; raw: true, }); + // Cluster last scan info (Scan logs) clustersInfo.forEach((clusterInfo) => { const lastScanDetails = lastClusterScanDetails.find( @@ -220,7 +191,7 @@ const Integrations = models.integrations; } try { - const x = await MonitoringLogs.upsert( + await MonitoringLogs.upsert( { cluster_id: id, monitoring_type_id: monitoringTypeId, @@ -246,6 +217,7 @@ const Integrations = models.integrations; ); return; } + // Clusters with new work units - Done to minimize the number of calls to db later const clusterIdsWithNewWUs = Object.keys(wuBasicInfoByCluster); @@ -364,14 +336,31 @@ const Integrations = models.integrations; severity = asrSpecificMetaData.severity; } + // Find severity level (For ASR ) - based on that determine weather to send NOC notification + let severityThreshHold = 0; + let severeEmailRecipients = null; + + if (asrSpecificMetaData) { + try { + const { domain: domainId } = asrSpecificMetaData; + const domain = await getDomain(domainId); + if (domain) { + severityThreshHold = domain.severityThreshold; + severeEmailRecipients = domain.severityAlertRecipients; + } + } catch (error) { + logger.error(`Job Monitoring : Error while getting Domain level severity : ${error.message}` ); + } + } + //Notification payload const notificationPayload = createNotificationPayload({ type: "email", - notificationDescription: `Monitoring detected that a monitored job is in ${wu.State} state`, + notificationDescription: `Monitoring (${jobMonitoring.monitoringName}) detected that a monitored job is in ${wu.State} state`, templateName: "jobMonitoring", originationId: monitoringTypeId, applicationId: jobMonitoring.applicationId, - subject: `Job Monitoring Alert: Job in ${wu.State} state`, + subject: `Job Monitoring Alert from ${process.env.INSTANCE_NAME} : Job in ${wu.State} state`, recipients: { primaryContacts, secondaryContacts, notifyContacts }, jobName: jobName, wuState: wu.State, @@ -409,13 +398,16 @@ const Integrations = models.integrations; // If severity is above threshold, send out NOC notification if (severity >= severityThreshHold && severeEmailRecipients) { - const notificationPayloadForNoc = { ...notificationPayload }; - notificationPayloadForNoc.metaData.notificationDescription = nocAlertDescription; - notificationPayloadForNoc.metaData.mainRecipients = severeEmailRecipients; - notificationPayload.metaData.notificationId = generateNotificationId({ - notificationPrefix, - timezoneOffset: clusterInfoObj[clusterId].timezone_offset || 0, - }), + const notificationPayloadForNoc = _.cloneDeep(notificationPayload); + notificationPayloadForNoc.metaData.notificationDescription = + nocAlertDescription; + notificationPayloadForNoc.metaData.mainRecipients = + severeEmailRecipients; + notificationPayloadForNoc.metaData.notificationId = + generateNotificationId({ + notificationPrefix, + timezoneOffset: clusterInfoObj[clusterId].timezone_offset || 0, + }), delete notificationPayloadForNoc.metaData.cc; await NotificationQueue.create(notificationPayloadForNoc); } @@ -435,10 +427,6 @@ const Integrations = models.integrations; raw: true, }); - if (!log) { - continue; - } - // Filter intermediate state jobs for the cluster const intermediateStateJobsForCluster = intermediateStateJobs.filter( (job) => job.clusterId === id @@ -447,7 +435,7 @@ const Integrations = models.integrations; let existingIntermediateStateJobs = []; let existingMetaData = {}; - existingMetaData = log.metaData || {}; + existingMetaData = log?.metaData || {}; existingIntermediateStateJobs = existingMetaData?.wuInIntermediateState || []; @@ -475,7 +463,7 @@ const Integrations = models.integrations; } } catch (err) { - logger.error(err); + logger.error(`Job Monitoring - Error while monitoring jobs: ${err.message}`); } finally { if (parentPort) parentPort.postMessage("done"); else process.exit(0); diff --git a/Tombolo/server/jobs/jobMonitoring/monitorJobsUtil.js b/Tombolo/server/jobs/jobMonitoring/monitorJobsUtil.js index 8976fb990..0a715bd7b 100644 --- a/Tombolo/server/jobs/jobMonitoring/monitorJobsUtil.js +++ b/Tombolo/server/jobs/jobMonitoring/monitorJobsUtil.js @@ -452,9 +452,34 @@ function calculateRunOrCompleteByTimeForCronJobs({ } -// check if current time is before, within or after the run window -function checkIfCurrentTimeIsWithinRunWindow({start, end, currentTime}) { - if (currentTime > start && currentTime < end) { + +function checkIfCurrentTimeIsWithinRunWindow({ start, end, currentTime }) { + // Ensure currentTime is a Date object + if (!(currentTime instanceof Date)) { + throw new Error("Invalid input: currentTime must be a Date object"); + } + + // Extract the date part from currentTime + const currentDate = currentTime.toISOString().split("T")[0]; // Get the date part of currentTime + + // Combine the date part with the start and end times to create Date objects + start = new Date(`${currentDate}T${start}:00.000Z`); + end = new Date(`${currentDate}T${end}:00.000Z`); + + + // Validate input parameters + if ( + !(start instanceof Date) || + !(end instanceof Date) || + !(currentTime instanceof Date) + ) { + throw new Error( + "Invalid input: start, end, and currentTime must be Date objects" + ); + } + + // Adjust conditions to include start and end in the "within" range + if (currentTime >= start && currentTime <= end) { return "within"; } else if (currentTime < start) { return "before"; diff --git a/Tombolo/server/routes/app/read.js b/Tombolo/server/routes/app/read.js index 8a9e11780..0c4d23442 100644 --- a/Tombolo/server/routes/app/read.js +++ b/Tombolo/server/routes/app/read.js @@ -20,7 +20,6 @@ let Query = models.query; let QueryField = models.query_field; let Dataflow = models.dataflow; - const { io } = require("../../server"); const validatorUtil = require("../../utils/validator"); const { body, query, validationResult } = require("express-validator"); @@ -134,16 +133,14 @@ router.post( [ body("user_id") .optional({ checkFalsy: true }) - .matches(/^[a-zA-Z]{1}[a-zA-Z0-9_:.\-]*$/) + .isUUID(4) .withMessage("Invalid user_id"), body("title") .matches(/^[a-zA-Z]{1}[a-zA-Z0-9_: .\-]*$/) .withMessage("Invalid title"), body("description").optional({ checkFalsy: true }), - body("creator") - .matches(/^[a-zA-Z]{1}[a-zA-Z0-9_:.\-]*$/) - .withMessage("Invalid creator"), - body("creator") + body("creator").isUUID(4).withMessage("Invalid creator"), + body("visibility") .matches(/^[a-zA-Z]/) .withMessage("Invalid visibility"), ], @@ -156,12 +153,14 @@ router.post( } try { if (req.body.id == "") { + req.body.createdBy = req.body.creator; models.application .create({ title: req.body.title, description: req.body.description, creator: req.body.creator, visibility: req.body.visibility, + createdBy: req.body.createdBy, }) .then(function (application) { if (req.body.user_id) { @@ -169,12 +168,16 @@ router.post( .create({ user_id: req.body.user_id, application_id: application.id, + createdBy: req.body.createdBy, + user_app_relation: "created", }) .then(function (userapp) { res.json({ result: "success", id: application.id, title: application.title, + description: application.description, + user_app_id: userapp.id, }); }); } else {