Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Kubernetes Job Agent #198

Merged
merged 16 commits into from
Nov 4, 2024
18 changes: 18 additions & 0 deletions apps/webservice/src/app/api/v1/jobs/[jobId]/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ export const openapi: Swagger.SwaggerV3 = {
"external_run_not_found",
],
},
externalId: {
type: "string",
nullable: true,
description:
"External job identifier (e.g. GitHub workflow run ID)",
},
release: {
type: "object",
properties: {
Expand Down Expand Up @@ -205,13 +211,25 @@ export const openapi: Swagger.SwaggerV3 = {
type: "string",
format: "date-time",
},
jobAgentConfig: {
type: "object",
properties: {
manifest: {
type: "string",
description: "The manifest template for the job",
},
},
required: ["manifest"],
description: "Configuration for the Job Agent",
},
zacharyblasczyk marked this conversation as resolved.
Show resolved Hide resolved
},
required: [
"id",
"status",
"createdAt",
"updatedAt",
"variables",
"jobAgentConfig",
zacharyblasczyk marked this conversation as resolved.
Show resolved Hide resolved
],
},
},
Expand Down
13 changes: 13 additions & 0 deletions integrations/kubernetes-job-agent/src/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { JobAgent } from "@ctrlplane/node-sdk";

import { env } from "./config.js";
import { api } from "./sdk.js";

export const agent = new JobAgent(
{
name: env.CTRLPLANE_AGENT_NAME,
workspaceId: env.CTRLPLANE_WORKSPACE_ID,
type: "kubernetes-job",
},
api,
);
zacharyblasczyk marked this conversation as resolved.
Show resolved Hide resolved
104 changes: 46 additions & 58 deletions integrations/kubernetes-job-agent/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { Job } from "@ctrlplane/node-sdk";
import { CronJob } from "cron";
import handlebars from "handlebars";
import yaml from "js-yaml";

import { logger } from "@ctrlplane/logger";

import { agent } from "./agent.js";
import { env } from "./config.js";
import { getBatchClient, getJobStatus } from "./k8s.js";
import { api } from "./sdk.js";

const renderManifest = (manifestTemplate: string, variables: object) => {
try {
Expand All @@ -20,37 +21,39 @@ const renderManifest = (manifestTemplate: string, variables: object) => {
};

const deployManifest = async (
job: Job,
jobId: string,
namespace: string,
manifest: any,
) => {
try {
const name = manifest?.metadata?.name;
logger.info(`Deploying manifest: ${namespace}/${name}`);

if (name == null) {
logger.error("Job name not found in manifest", {
jobId,
namespace,
manifest,
});
await api.updateJob({
jobId,
updateJobRequest: {
status: "invalid_job_agent",
message: "Job name not found in manifest.",
await job.update({
externalId: "",
status: "invalid_job_agent",
message: "Job name not found in manifest.",
});
return;
}

logger.info(`Creating job - ${namespace}/${name}`);

await getBatchClient().createNamespacedJob(namespace, manifest);
await api.updateJob({
jobId,
updateJobRequest: {
status: "in_progress",
externalId: `${namespace}/${name}`,
message: "Job created successfully.",

await job.update({
status: "in_progress",
externalId: `${namespace}/${name}`,
message: "Job created successfully.",
});

logger.info(`Job created successfully`, {
jobId,
namespace,
Expand All @@ -62,71 +65,63 @@ const deployManifest = async (
namespace,
error,
});
await api.updateJob({
jobId,
updateJobRequest: {
status: "invalid_job_agent",
message: error.body?.message || error.message,
},

await job.update({
status: "invalid_job_agent" as const,
message: error.body?.message || error.message,
});
}
};

const spinUpNewJobs = async (agentId: string) => {
const spinUpNewJobs = async () => {
try {
const { jobs = [] } = await api.getNextJobs({ agentId });
const jobs = await agent.next();
logger.info(`Found ${jobs.length} job(s) to run.`);

await Promise.allSettled(
jobs.map(async (job) => {
logger.info(`Running job ${job.id}`);
logger.debug(`Job details:`, { job });
try {
const je = await api.getJob({ jobId: job.id });
const manifest = renderManifest(job.jobAgentConfig.manifest, je);
jobs.map(async (job: Job) => {
const jobDetails = await job.get();
logger.info(`Running job ${jobDetails.id}`);
logger.debug(`Job details:`, { job: jobDetails });

const namespace = manifest?.metadata?.namespace ?? env.KUBE_NAMESPACE;
await api.acknowledgeJob({ jobId: job.id });
await deployManifest(job.id, namespace, manifest);
} catch (error: any) {
logger.error(`Error processing job ${job.id}`, {
error: error.message,
stack: error.stack,
});
throw error;
}
const manifest = renderManifest(
jobDetails.jobAgentConfig.manifest,
jobDetails,
);
zacharyblasczyk marked this conversation as resolved.
Show resolved Hide resolved
const namespace = manifest?.metadata?.namespace ?? env.KUBE_NAMESPACE;

await job.acknowledge();
await deployManifest(job, jobDetails.id, namespace, manifest);
}),
);
} catch (error: any) {
logger.error("Error spinning up new jobs", {
agentId,
error: error.message,
});
throw error;
}
};

const updateExecutionStatus = async (agentId: string) => {
const updateExecutionStatus = async () => {
try {
const jobs = await api.getAgentRunningJob({ agentId });
logger.info(`Found ${jobs.length} running execution(s)`);
const jobs = await agent.running();
logger.info(`Found ${jobs.length} running job(s)`);
await Promise.allSettled(
jobs.map(async (job) => {
const [namespace, name] = job.externalId?.split("/") ?? [];
jobs.map(async (job: Job) => {
const jobDetails = await job.get();
const [namespace, name] = jobDetails.externalId?.split("/") ?? [];
if (namespace == null || name == null) {
logger.error("Invalid external run ID", {
jobId: job.id,
externalId: job.externalId,
jobId: jobDetails.id,
externalId: jobDetails.externalId,
});
zacharyblasczyk marked this conversation as resolved.
Show resolved Hide resolved
return;
}

logger.debug(`Checking status of ${namespace}/${name}`);
try {
const { status, message } = await getJobStatus(namespace, name);
await api.updateJob({
jobId: job.id,
updateJobRequest: { status, message },
});
await job.update({ status, message });
logger.info(`Updated status for ${namespace}/${name}`, {
status,
message,
Expand All @@ -140,25 +135,18 @@ const updateExecutionStatus = async (agentId: string) => {
);
} catch (error: any) {
logger.error("Error updating execution statuses", {
agentId,
error: error.message,
});
}
};

const scan = async () => {
try {
const { id } = await api.updateJobAgent({
updateJobAgentRequest: {
name: env.CTRLPLANE_AGENT_NAME,
workspaceId: env.CTRLPLANE_WORKSPACE_ID,
type: "kubernetes-job",
},
});
const { id } = await agent.get();

logger.info(`Agent ID: ${id}`);
await spinUpNewJobs(id);
await updateExecutionStatus(id);
await spinUpNewJobs();
await updateExecutionStatus();
} catch (error: any) {
logger.error("Error during scan operation", { error: error.message });
throw error;
Expand Down
8 changes: 3 additions & 5 deletions integrations/kubernetes-job-agent/src/sdk.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Configuration, DefaultApi } from "@ctrlplane/node-sdk";
import { createClient } from "@ctrlplane/node-sdk";

import { env } from "./config.js";

const config = new Configuration({
basePath: `${env.CTRLPLANE_API_URL}/api`,
export const api = createClient({
baseUrl: env.CTRLPLANE_API_URL,
apiKey: env.CTRLPLANE_API_KEY,
});

export const api = new DefaultApi(config);
4 changes: 2 additions & 2 deletions integrations/kubernetes-job-agent/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { SetTargetProvidersTargetsRequest } from "@ctrlplane/node-sdk";
import type { Operations } from "@ctrlplane/node-sdk";

export function omitNullUndefined(obj: object) {
return Object.entries(obj).reduce<Record<string, string>>(
Expand All @@ -11,5 +11,5 @@ export function omitNullUndefined(obj: object) {
}

export type ScannerFunc = () => Promise<
SetTargetProvidersTargetsRequest["targets"]
Operations["setTargetProvidersTargets"]["requestBody"]["content"]["application/json"]["targets"]
>;
Loading
Loading