Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Kubernetes Job Agent #198

Merged
merged 16 commits into from
Nov 4, 2024
83 changes: 63 additions & 20 deletions integrations/kubernetes-job-agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import handlebars from "handlebars";
import yaml from "js-yaml";

import { logger } from "@ctrlplane/logger";
import { JobAgent } from "@ctrlplane/node-sdk";

import { env } from "./config.js";
import { getBatchClient, getJobStatus } from "./k8s.js";
Expand Down Expand Up @@ -33,23 +34,29 @@ const deployManifest = async (
namespace,
manifest,
});
await api.updateJob({
jobId,
updateJobRequest: {
await api.PATCH("/v1/jobs/{jobId}", {
params: {
path: { jobId },
},
body: {
status: "invalid_job_agent",
message: "Job name not found in manifest.",
},
});
return;
}

logger.info(`Creating job - ${namespace}/${name}`);
await getBatchClient().createNamespacedJob(namespace, manifest);
await api.updateJob({
jobId,
updateJobRequest: {
await api.PATCH("/v1/jobs/{jobId}", {
params: {
path: { jobId },
},
body: {
status: "in_progress",
externalId: `${namespace}/${name}`,
message: "Job created successfully.",
},
});
logger.info(`Job created successfully`, {
jobId,
Expand All @@ -62,9 +69,11 @@ const deployManifest = async (
namespace,
error,
});
await api.updateJob({
jobId,
updateJobRequest: {
await api.PATCH("/v1/jobs/{jobId}", {
params: {
path: { jobId },
},
body: {
status: "invalid_job_agent",
message: error.body?.message || error.message,
},
Expand All @@ -74,18 +83,42 @@ const deployManifest = async (

const spinUpNewJobs = async (agentId: string) => {
try {
const { jobs = [] } = await api.getNextJobs({ agentId });
const response = await api.GET("/v1/job-agents/{agentId}/queue/next", {
params: {
path: { agentId },
},
});
if (response.data == undefined) return;
const { jobs = [] } = response.data;

logger.info(`Found ${jobs.length} job(s) to run.`);
await Promise.allSettled(
jobs.map(async (job) => {
logger.info(`Running job ${job.id}`);
logger.debug(`Job details:`, { job });
try {
const je = await api.getJob({ jobId: job.id });
const manifest = renderManifest(job.jobAgentConfig.manifest, je);
const je = await api.GET("/v1/jobs/{jobId}", {
params: {
path: { jobId: job.id },
},
});
if (je.data == null)
throw new Error(`Failed to fetch job details for job ${job.id}`);

if (
typeof job.jobAgentConfig !== "object" ||
!("manifest" in job.jobAgentConfig)
)
throw new Error("Job manifest is required");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Enhance null checking for 'job.jobAgentConfig'

The condition typeof job.jobAgentConfig !== "object" may not effectively handle cases where job.jobAgentConfig is null, since typeof null === "object". To prevent potential runtime errors, consider adding an explicit null check.

Apply this diff to fix the issue:

 if (
-  typeof job.jobAgentConfig !== "object" ||
+  job.jobAgentConfig == null ||
   !("manifest" in job.jobAgentConfig)
 )
   throw new Error("Job manifest is required");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (
typeof job.jobAgentConfig !== "object" ||
!("manifest" in job.jobAgentConfig)
)
throw new Error("Job manifest is required");
if (
job.jobAgentConfig == null ||
!("manifest" in job.jobAgentConfig)
)
throw new Error("Job manifest is required");

const manifest = renderManifest(job.jobAgentConfig.manifest, je.data);

const namespace = manifest?.metadata?.namespace ?? env.KUBE_NAMESPACE;
await api.acknowledgeJob({ jobId: job.id });
await api.POST("/v1/jobs/{jobId}/acknowledge", {
params: {
path: { jobId: job.id },
},
});
await deployManifest(job.id, namespace, manifest);
} catch (error: any) {
logger.error(`Error processing job ${job.id}`, {
Expand All @@ -107,7 +140,13 @@ const spinUpNewJobs = async (agentId: string) => {

const updateExecutionStatus = async (agentId: string) => {
try {
const jobs = await api.getAgentRunningJob({ agentId });
const response = await api.GET("/v1/job-agents/{agentId}/jobs/running", {
params: {
path: { agentId },
},
});
if (response.data == undefined) return;
const jobs = response.data;
logger.info(`Found ${jobs.length} running execution(s)`);
await Promise.allSettled(
jobs.map(async (job) => {
Expand All @@ -123,9 +162,11 @@ const updateExecutionStatus = async (agentId: string) => {
logger.debug(`Checking status of ${namespace}/${name}`);
try {
const { status, message } = await getJobStatus(namespace, name);
await api.updateJob({
jobId: job.id,
updateJobRequest: { status, message },
await api.PATCH(`/v1/jobs/{jobId}`, {
params: {
path: { jobId: job.id },
},
body: { status, message },
});
logger.info(`Updated status for ${namespace}/${name}`, {
status,
Expand All @@ -148,13 +189,15 @@ const updateExecutionStatus = async (agentId: string) => {

const scan = async () => {
try {
const { id } = await api.updateJobAgent({
updateJobAgentRequest: {
const agent = new JobAgent(
{
name: env.CTRLPLANE_AGENT_NAME,
workspaceId: env.CTRLPLANE_WORKSPACE_ID,
type: "kubernetes-job",
},
});
api,
);
const { id } = await agent.get();

logger.info(`Agent ID: ${id}`);
await spinUpNewJobs(id);
Expand Down
8 changes: 3 additions & 5 deletions integrations/kubernetes-job-agent/src/sdk.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Configuration, DefaultApi } from "@ctrlplane/node-sdk";
import { createClient } from "@ctrlplane/node-sdk";

import { env } from "./config.js";

const config = new Configuration({
basePath: `${env.CTRLPLANE_API_URL}/api`,
export const api = createClient({
baseUrl: env.CTRLPLANE_API_URL,
apiKey: env.CTRLPLANE_API_KEY,
});

export const api = new DefaultApi(config);
4 changes: 2 additions & 2 deletions integrations/kubernetes-job-agent/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { SetTargetProvidersTargetsRequest } from "@ctrlplane/node-sdk";
import type { Operations } from "@ctrlplane/node-sdk";

export function omitNullUndefined(obj: object) {
return Object.entries(obj).reduce<Record<string, string>>(
Expand All @@ -11,5 +11,5 @@ export function omitNullUndefined(obj: object) {
}

export type ScannerFunc = () => Promise<
SetTargetProvidersTargetsRequest["targets"]
Operations["setTargetProvidersTargets"]["requestBody"]["content"]["application/json"]["targets"]
>;
Loading