Skip to content

Commit

Permalink
feat(core): Add metric for active workflow count (#13420)
Browse files Browse the repository at this point in the history
  • Loading branch information
juusotn8n authored Feb 26, 2025
1 parent f7f5f5e commit 3aa679e
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 3 deletions.
4 changes: 4 additions & 0 deletions packages/@n8n/config/src/configs/endpoints.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class PrometheusMetricsConfig {
/** How often (in seconds) to update queue metrics. */
@Env('N8N_METRICS_QUEUE_METRICS_INTERVAL')
queueMetricsInterval: number = 20;

/** How often (in seconds) to update active workflow metric */
@Env('N8N_METRICS_ACTIVE_WORKFLOW_METRIC_INTERVAL')
activeWorkflowCountInterval: number = 60;
}

@Config
Expand Down
1 change: 1 addition & 0 deletions packages/@n8n/config/test/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ describe('GlobalConfig', () => {
includeApiStatusCodeLabel: false,
includeQueueMetrics: false,
queueMetricsInterval: 20,
activeWorkflowCountInterval: 60,
},
additionalNonUIRoutes: '',
disableProductionWebhooksOnMainProcess: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
return activeWorkflows.map((workflow) => workflow.id);
}

async getActiveCount() {
return await this.count({
where: { active: true },
});
}

async findById(workflowId: string) {
return await this.findOne({
where: { id: workflowId },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { InstanceSettings } from 'n8n-core';
import promClient from 'prom-client';

import config from '@/config';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import type { EventService } from '@/events/event.service';
import { mockInstance } from '@test/mocking';
Expand Down Expand Up @@ -52,12 +53,14 @@ describe('PrometheusMetricsService', () => {
const eventBus = mock<MessageEventBus>();
const eventService = mock<EventService>();
const instanceSettings = mock<InstanceSettings>({ instanceType: 'main' });
const workflowRepository = mock<WorkflowRepository>();
const prometheusMetricsService = new PrometheusMetricsService(
mock(),
eventBus,
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

afterEach(() => {
Expand All @@ -75,6 +78,7 @@ describe('PrometheusMetricsService', () => {
customGlobalConfig,
mock(),
instanceSettings,
mock(),
);

await customPrometheusMetricsService.init(app);
Expand Down Expand Up @@ -217,7 +221,7 @@ describe('PrometheusMetricsService', () => {

await prometheusMetricsService.init(app);

expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric
expect(promClient.Gauge).toHaveBeenCalledTimes(2); // version metric + active workflow count metric
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
expect(eventService.on).not.toHaveBeenCalled();
});
Expand All @@ -230,9 +234,22 @@ describe('PrometheusMetricsService', () => {

await prometheusMetricsService.init(app);

expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric
expect(promClient.Gauge).toHaveBeenCalledTimes(2); // version metric + active workflow count metric
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
expect(eventService.on).not.toHaveBeenCalled();
});

it('should setup active workflow count metric', async () => {
await prometheusMetricsService.init(app);

// First call is n8n version metric
expect(promClient.Gauge).toHaveBeenCalledTimes(2);

expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
name: 'n8n_active_workflow_count',
help: 'Total number of active workflows.',
collect: expect.any(Function),
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import promClient from 'prom-client';

import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventMessageWorkflow } from '@/eventbus/event-message-classes/event-message-workflow';
import type { EventService } from '@/events/event.service';
import type { CacheService } from '@/services/cache/cache.service';
import { mockInstance } from '@test/mocking';

import { MessageEventBus } from '../../eventbus/message-event-bus/message-event-bus';
Expand All @@ -15,8 +17,10 @@ jest.unmock('@/eventbus/message-event-bus/message-event-bus');

const customPrefix = 'custom_';

const cacheService = mock<CacheService>();
const eventService = mock<EventService>();
const instanceSettings = mock<InstanceSettings>({ instanceType: 'main' });
const workflowRepository = mock<WorkflowRepository>();
const app = mock<express.Application>();
const eventBus = new MessageEventBus(
mock(),
Expand Down Expand Up @@ -48,6 +52,7 @@ describe('workflow_success_total', () => {
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

await prometheusMetricsService.init(app);
Expand Down Expand Up @@ -87,6 +92,7 @@ workflow_success_total{workflow_id="1234"} 1"
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

await prometheusMetricsService.init(app);
Expand All @@ -107,3 +113,67 @@ workflow_success_total{workflow_id="1234"} 1"
}
});
});

describe('Active workflow count', () => {
const globalConfig = mockInstance(GlobalConfig, {
endpoints: {
metrics: {
prefix: '',
activeWorkflowCountInterval: 30,
},
},
});

const prometheusMetricsService = new PrometheusMetricsService(
cacheService,
eventBus,
globalConfig,
eventService,
instanceSettings,
workflowRepository,
);

afterEach(() => {
jest.clearAllMocks();
prometheusMetricsService.disableAllMetrics();
});

it('should prioritize cached value', async () => {
await prometheusMetricsService.init(app);

cacheService.get.mockReturnValueOnce(Promise.resolve('1'));
workflowRepository.getActiveCount.mockReturnValueOnce(Promise.resolve(2));

const activeWorkflowCount =
await promClient.register.getSingleMetricAsString('active_workflow_count');

expect(cacheService.get).toHaveBeenCalledWith('metrics:active-workflow-count');
expect(workflowRepository.getActiveCount).not.toHaveBeenCalled();

expect(activeWorkflowCount).toMatchInlineSnapshot(`
"# HELP active_workflow_count Total number of active workflows.
# TYPE active_workflow_count gauge
active_workflow_count 1"
`);
});

it('should query value from database if cache misses', async () => {
await prometheusMetricsService.init(app);

cacheService.get.mockReturnValueOnce(Promise.resolve(undefined));
workflowRepository.getActiveCount.mockReturnValueOnce(Promise.resolve(2));

const activeWorkflowCount =
await promClient.register.getSingleMetricAsString('active_workflow_count');

expect(cacheService.get).toHaveBeenCalledWith('metrics:active-workflow-count');
expect(workflowRepository.getActiveCount).toHaveBeenCalled();
expect(cacheService.set).toHaveBeenCalledWith('metrics:active-workflow-count', '2', 30_000);

expect(activeWorkflowCount).toMatchInlineSnapshot(`
"# HELP active_workflow_count Total number of active workflows.
# TYPE active_workflow_count gauge
active_workflow_count 2"
`);
});
});
40 changes: 39 additions & 1 deletion packages/cli/src/metrics/prometheus-metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import promClient, { type Counter, type Gauge } from 'prom-client';
import semverParse from 'semver/functions/parse';

import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { N8N_VERSION, Time } from '@/constants';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { EventMessageTypes } from '@/eventbus';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
Expand All @@ -24,6 +25,7 @@ export class PrometheusMetricsService {
private readonly globalConfig: GlobalConfig,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
private readonly workflowRepository: WorkflowRepository,
) {}

private readonly counters: { [key: string]: Counter<string> | null } = {};
Expand Down Expand Up @@ -58,6 +60,7 @@ export class PrometheusMetricsService {
this.initEventBusMetrics();
this.initRouteMetrics(app);
this.initQueueMetrics();
this.initActiveWorkflowCountMetric();
this.mountMetricsEndpoint(app);
}

Expand Down Expand Up @@ -285,6 +288,41 @@ export class PrometheusMetricsService {
});
}

/**
* Setup active workflow count metric
*
* This metric is updated every time metrics are collected.
* We also cache the value of active workflow counts so we
* don't hit the database on every metrics query. Both the
* metric being enabled and the TTL of the cached value is
* configurable.
*/
private initActiveWorkflowCountMetric() {
const workflowRepository = this.workflowRepository;
const cacheService = this.cacheService;
const cacheKey = 'metrics:active-workflow-count';
const cacheTtl =
this.globalConfig.endpoints.metrics.activeWorkflowCountInterval * Time.seconds.toMilliseconds;

new promClient.Gauge({
name: this.prefix + 'active_workflow_count',
help: 'Total number of active workflows.',
async collect() {
const value = await cacheService.get<string>(cacheKey);
const numericValue = value !== undefined ? parseInt(value, 10) : undefined;

if (numericValue !== undefined && Number.isFinite(numericValue)) {
this.set(numericValue);
} else {
const activeWorkflowCount = await workflowRepository.getActiveCount();
await cacheService.set(cacheKey, activeWorkflowCount.toString(), cacheTtl);

this.set(activeWorkflowCount);
}
},
});
}

private toLabels(event: EventMessageTypes): Record<string, string> {
const { __type, eventName, payload } = event;

Expand Down
87 changes: 87 additions & 0 deletions packages/cli/test/integration/prometheus-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import request, { type Response } from 'supertest';

import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventService } from '@/events/event.service';
import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
import { CacheService } from '@/services/cache/cache.service';
import { createWorkflow, newWorkflow } from '@test-integration/db/workflows';

import { setupTestServer } from './shared/utils';

Expand All @@ -16,6 +19,7 @@ const toLines = (response: Response) => response.text.trim().split('\n');

const eventService = Container.get(EventService);
const globalConfig = Container.get(GlobalConfig);
globalConfig.cache.backend = 'memory';
globalConfig.endpoints.metrics = {
enable: true,
prefix: 'n8n_test_',
Expand All @@ -31,6 +35,7 @@ globalConfig.endpoints.metrics = {
includeApiStatusCodeLabel: true,
includeQueueMetrics: true,
queueMetricsInterval: 20,
activeWorkflowCountInterval: 60,
};

const server = setupTestServer({ endpointGroups: ['metrics'] });
Expand Down Expand Up @@ -202,6 +207,51 @@ describe('PrometheusMetricsService', () => {
);
});

it('should include last activity metric with route metrics', async () => {
/**
* Arrange
*/
prometheusService.enableMetric('routes');
await prometheusService.init(server.app);
await agent.get('/api/v1/workflows');

/**
* Act
*/
let response = await agent.get('/metrics');

/**
* Assert
*/
expect(response.status).toEqual(200);
expect(response.type).toEqual('text/plain');

const lines = toLines(response);

expect(lines).toContainEqual(expect.stringContaining('n8n_test_last_activity'));

const lastActivityLine = lines.find((line) =>
line.startsWith('n8n_test_last_activity{timestamp='),
);

expect(lastActivityLine).toBeDefined();
expect(lastActivityLine?.endsWith('1')).toBe(true);

// Update last activity
await agent.get('/api/v1/workflows');

response = await agent.get('/metrics');
const updatedLines = toLines(response);

const newLastActivityLine = updatedLines.find((line) =>
line.startsWith('n8n_test_last_activity{timestamp='),
);

expect(newLastActivityLine).toBeDefined();
// Timestamp label should be different
expect(newLastActivityLine).not.toBe(lastActivityLine);
});

it('should return labels in route metrics if enabled', async () => {
/**
* ARrange
Expand Down Expand Up @@ -284,4 +334,41 @@ describe('PrometheusMetricsService', () => {
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_completed 0');
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_failed 0');
});

it('should return active workflow count', async () => {
await prometheusService.init(server.app);

let response = await agent.get('/metrics');

expect(response.status).toEqual(200);
expect(response.type).toEqual('text/plain');

let lines = toLines(response);

expect(lines).toContain('n8n_test_active_workflow_count 0');

const workflow = newWorkflow({ active: true });
await createWorkflow(workflow);

const workflowRepository = Container.get(WorkflowRepository);
const activeWorkflowCount = await workflowRepository.getActiveCount();

expect(activeWorkflowCount).toBe(1);

response = await agent.get('/metrics');

lines = toLines(response);

// Should return cached value
expect(lines).toContain('n8n_test_active_workflow_count 0');

const cacheService = Container.get(CacheService);
await cacheService.delete('metrics:active-workflow-count');

response = await agent.get('/metrics');

lines = toLines(response);

expect(lines).toContain('n8n_test_active_workflow_count 1');
});
});

0 comments on commit 3aa679e

Please sign in to comment.