Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[teraslice] fix multi-job deletion bug in force job stop #3751

Merged
merged 12 commits into from
Sep 19, 2024
1 change: 1 addition & 0 deletions e2e/k8s/kindConfigDefaultPorts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: k8s-env
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea
extraPortMappings:
- containerPort: 30200 # Map internal elasticsearch service to host port
hostPort: 9200
Expand Down
1 change: 1 addition & 0 deletions e2e/k8s/kindConfigDefaultPortsDev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ name: k8s-env
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea
extraPortMappings:
- containerPort: 30200 # Map internal elasticsearch service to host port
hostPort: 9200
Expand Down
1 change: 1 addition & 0 deletions e2e/k8s/kindConfigTestPorts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: k8s-e2e
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea
extraPortMappings:
- containerPort: 30200 # Map internal elasticsearch service to host port
hostPort: 49200
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "2.3.1",
"version": "2.3.2",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
2 changes: 1 addition & 1 deletion packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/job-components",
"displayName": "Job Components",
"version": "1.3.0",
"version": "1.3.1",
"description": "A teraslice library for validating jobs schemas, registering apis, and defining and running new Job APIs",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/job-components#readme",
"bugs": {
Expand Down
2 changes: 1 addition & 1 deletion packages/scripts/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/scripts",
"displayName": "Scripts",
"version": "1.1.1",
"version": "1.1.2",
"description": "A collection of terascope monorepo scripts",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/scripts#readme",
"bugs": {
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice-test-harness/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
"fs-extra": "^11.2.0"
},
"devDependencies": {
"@terascope/job-components": "^1.3.0"
"@terascope/job-components": "^1.3.1"
},
"peerDependencies": {
"@terascope/job-components": ">=1.3.0"
"@terascope/job-components": ">=1.3.1"
},
"engines": {
"node": ">=18.18.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "2.3.1",
"version": "2.3.2",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down Expand Up @@ -40,7 +40,7 @@
"dependencies": {
"@kubernetes/client-node": "^0.21.0",
"@terascope/elasticsearch-api": "^4.1.0",
"@terascope/job-components": "^1.3.0",
"@terascope/job-components": "^1.3.1",
"@terascope/teraslice-messaging": "^1.4.0",
"@terascope/types": "^1.1.0",
"@terascope/utils": "^1.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ export class KubernetesClusterBackend {
*/
async listResourcesForJobId(jobId: string) {
const resources = [];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs'];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs', 'replicasets'];
for (const type of resourceTypes) {
const list = await this.k8s.list(`teraslice.terascope.io/jobId=${jobId}`, type);
if (list.items.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import KubeClient from 'kubernetes-client';
// @ts-expect-error
import Request from 'kubernetes-client/backends/request/index.js';
import { getRetryConfig } from './utils.js';
import { IncomingMessage } from 'node:http';

// @ts-expect-error
const { Client, KubeConfig } = KubeClient;
Expand Down Expand Up @@ -98,7 +99,7 @@ export class K8s {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;

while (true) {
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
Expand Down Expand Up @@ -136,7 +137,7 @@ export class K8s {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;

while (true) {
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
Expand All @@ -163,7 +164,7 @@ export class K8s {
* returns list of k8s objects matching provided selector
* @param {String} selector kubernetes selector, like 'app=teraslice'
* @param {String} objType Type of k8s object to get, valid options:
* 'pods', 'deployment', 'services', 'jobs'
* 'pods', 'deployment', 'services', 'jobs', 'replicasets'
* @param {String} ns namespace to search, this will override the default
* @return {Object} body of k8s get response.
*/
Expand All @@ -188,6 +189,10 @@ export class K8s {
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(namespace).jobs()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'replicasets') {
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(namespace).replicasets()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else {
const error = new Error(`Wrong objType provided to get: ${objType}`);
this.logger.error(error);
Expand Down Expand Up @@ -298,12 +303,15 @@ export class K8s {
* Deletes k8s object of specified objType
* @param {String} name Name of the resource to delete
* @param {String} objType Type of k8s object to get, valid options:
* 'deployments', 'services', 'jobs'
* 'deployments', 'services', 'jobs', 'pods', 'replicasets'
* @param {Boolean} force Forcefully delete resource by setting gracePeriodSeconds to 1
* to be forcefully stopped.
* @return {Object} k8s delete response body.
*/
async delete(name: string, objType: string, force?: boolean) {
if (name === undefined || name.trim() === '') {
throw new Error(`Name of resource to delete must be specified. Received: "${name}".`);
}

let response;

// To get a Job to remove the associated pods you have to
Expand All @@ -321,23 +329,53 @@ export class K8s {
deleteOptions.body.gracePeriodSeconds = 1;
}

const deleteWithErrorHandling = async (deleteFn: () => Promise<{
response: IncomingMessage,
body: Record<string, any>
}>) => {
try {
const res = await deleteFn();
return res;
} catch (e) {
if (e.statusCode) {
// 404 should be an acceptable response to a delete request, not an error
if (e.statusCode === 404) {
this.logger.info(`No ${objType} with name ${name} found while attempting to delete.`);
return e;
}

if (e.statusCode >= 400) {
const err = new TSError(`Unexpected response code (${e.statusCode}), when deleting name: ${name}`);
this.logger.error(err);
err.code = e.statusCode.toString();
return Promise.reject(err);
}
}
throw e;
}
}

try {
if (objType === 'services') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.api.v1.namespaces(this.defaultNamespace).services(name)
.delete(), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'deployments') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.delete(), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'jobs') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.apis.batch.v1.namespaces(this.defaultNamespace).jobs(name)
.delete(deleteOptions), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'pods') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.api.v1.namespaces(this.defaultNamespace).pods(name)
.delete(deleteOptions), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'replicasets') {
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).replicasets(name)
.delete(deleteOptions)), getRetryConfig());
} else {
throw new Error(`Invalid objType: ${objType}`);
}
Expand All @@ -347,44 +385,47 @@ export class K8s {
return Promise.reject(err);
}

if (response.statusCode >= 400) {
const err = new TSError(`Unexpected response code (${response.statusCode}), when deleting name: ${name}`);
this.logger.error(err);
err.code = response.statusCode;
return Promise.reject(err);
}

return response.body;
}

/**
* Delete all of Kubernetes resources related to the specified exId
* @param {String} exId ID of the execution
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @param {Boolean} force Forcefully stop all pod, deployment,
* service, replicaset and job resources
* @return {Promise}
*/
async deleteExecution(exId: string, force = false) {
if (!exId) {
throw new Error('deleteExecution requires an executionId');
}

if (force) {
// Order matters. If we delete a parent resource before its children it
// will be marked for background deletion and then can't be force deleted.
await this._deleteObjByExId(exId, 'worker', 'pods', force);
await this._deleteObjByExId(exId, 'worker', 'replicasets', force);
await this._deleteObjByExId(exId, 'worker', 'deployments', force);
await this._deleteObjByExId(exId, 'execution_controller', 'pods', force);
await this._deleteObjByExId(exId, 'execution_controller', 'services', force);
}

await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force);
}

/**
* Finds the k8s object by nodeType and exId and then deletes it
* Finds the k8s objects by nodeType and exId and then deletes them
* @param {String} exId Execution ID
* @param {String} nodeType valid Teraslice k8s node type:
* 'worker', 'execution_controller'
* @param {String} objType valid object type: `services`, `deployments`,
* 'jobs'
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* `jobs`, `pods`, `replicasets`
* @param {Boolean} force Forcefully stop all resources
* @return {Promise}
*/
async _deleteObjByExId(exId: string, nodeType: string, objType: string, force?: boolean) {
let objList;
let forcePodsList;
let deleteResponse;
const deleteResponses = [];

try {
objList = await this.list(`app.kubernetes.io/component=${nodeType},teraslice.terascope.io/exId=${exId}`, objType);
Expand All @@ -394,52 +435,38 @@ export class K8s {
return Promise.reject(err);
}

if (force) {
try {
forcePodsList = await this.list(`teraslice.terascope.io/exId=${exId}`, 'pods');
} catch (e) {
const err = new Error(`Request pods list in _deleteObjByExId with exId: ${exId} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}

if (isEmpty(objList.items) && isEmpty(forcePodsList?.items)) {
if (isEmpty(objList.items)) {
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`);
return Promise.resolve();
}

const deletePodResponses = [];
if (forcePodsList?.items) {
this.logger.info(`k8s._deleteObjByExId: ${exId} force deleting all pods`);
for (const pod of forcePodsList.items) {
const podName = pod.metadata.name;

try {
deletePodResponses.push(await this.delete(podName, 'pods', force));
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
for (const obj of objList.items) {
const { name, deletionTimestamp } = obj.metadata;

if (!name) {
const err = new Error(`Cannot delete ${objType} for ExId: ${exId} by name because it has no name`);
this.logger.error(err);
return Promise.reject(err);
}
}

const name = get(objList, 'items[0].metadata.name');
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`);
// If deletionTimestamp is present then the resource is already terminating.
// K8s will not change the grace period in this case, so force deletion is not possible
if (force && deletionTimestamp) {
this.logger.warn(`Cannot force delete ${name} for ExId: ${exId}. It will finish deleting gracefully by ${deletionTimestamp}`);
return Promise.resolve();
}

try {
deleteResponse = await this.delete(name, objType, force);
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} ${force ? 'force' : ''} deleting: ${name}`);
try {
deleteResponses.push(await this.delete(name, objType, force));
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}

if (deletePodResponses.length > 0) {
deleteResponse.deletePodResponses = deletePodResponses;
}
return deleteResponse;
return deleteResponses;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,12 @@ export class KubernetesClusterBackendV2 {
/**
* Returns a list of all k8s resources associated with a job ID
* @param {string} jobId The job ID of the job to list associated resources
* @returns {Array<any>}
* @returns {Array<K8sClient.V1PodList | K8sClient.V1DeploymentList | K8sClient.V1ServiceList
* | K8sClient.V1JobList | K8sClient.V1ReplicaSetList>}
*/
async listResourcesForJobId(jobId: string) {
const resources = [];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs'];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs', 'replicasets'];
for (const type of resourceTypes) {
const list = await this.k8s.list(`teraslice.terascope.io/jobId=${jobId}`, type);
if (list.items.length > 0) {
Expand Down
Loading
Loading