Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 过期任务数据迁移 --story=121721246 #7684

Open
wants to merge 3 commits into
base: feature_data_cleaner_enhance_master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ def monitor_report_config():

# V2引擎任务清理配置
ENABLE_CLEAN_EXPIRED_V2_TASK = env.ENABLE_CLEAN_EXPIRED_V2_TASK
ENBLE_ARCHIVE_EXPIRED_V2_TASK = env.ENBLE_ARCHIVE_EXPIRED_V2_TASK
CLEAN_EXPIRED_V2_TASK_CRON = env.CLEAN_EXPIRED_V2_TASK_CRON
V2_TASK_VALIDITY_DAY = env.V2_TASK_VALIDITY_DAY
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_BATCH_NUM
Expand Down
1 change: 1 addition & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@

# V2引擎任务清理配置
ENABLE_CLEAN_EXPIRED_V2_TASK = bool(os.getenv("BKAPP_ENABLE_CLEAN_EXPIRED_V2_TASK", False))
ENBLE_ARCHIVE_EXPIRED_V2_TASK = bool(os.getenv("BKAPP_ENABLE_ARCHIVE_EXPIRED_V2_TASK", False))
CLEAN_EXPIRED_V2_TASK_CRON = tuple(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CRON", "30 0 * * *").split())
V2_TASK_VALIDITY_DAY = int(os.getenv("BKAPP_V2_TASK_VALIDITY_DAY", 730))
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_BATCH_NUM", 100))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Generated by Django 3.2.15 on 2025-01-21 02:56

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("analysis_statistics", "0008_taskflowexecutednodestatistics_trigger_template_id"),
]

operations = [
migrations.CreateModel(
name="TaskArchivedStatistics",
fields=[
("id", models.BigAutoField(primary_key=True, serialize=False, verbose_name="id")),
("task_id", models.CharField(db_index=True, max_length=255, verbose_name="任务 ID")),
("project_id", models.CharField(default=-1, help_text="模板所属项目ID", max_length=32, verbose_name="项目 ID")),
("name", models.CharField(default="default_instance", max_length=128, verbose_name="实例名称")),
("template_id", models.CharField(max_length=32, verbose_name="Pipeline模板ID")),
("task_template_id", models.CharField(max_length=32, verbose_name="Task模板ID")),
(
"template_source",
models.CharField(
choices=[("project", "项目流程"), ("common", "公共流程"), ("onetime", "一次性任务")],
default="project",
max_length=32,
verbose_name="流程模板来源",
),
),
(
"create_method",
models.CharField(
choices=[
("app", "手动"),
("api", "API网关"),
("app_maker", "轻应用"),
("periodic", "周期任务"),
("clocked", "计划任务"),
("mobile", "移动端"),
],
default="app",
max_length=30,
verbose_name="创建方式",
),
),
(
"create_info",
models.CharField(blank=True, max_length=255, verbose_name="创建任务额外信息(App maker ID或APP CODE或周期任务ID)"),
),
("creator", models.CharField(blank=True, max_length=32, verbose_name="创建者")),
("create_time", models.DateTimeField(db_index=True, verbose_name="任务创建时间")),
("archived_time", models.DateTimeField(auto_now_add=True, db_index=True, verbose_name="任务归档时间")),
("executor", models.CharField(blank=True, max_length=32, verbose_name="执行者")),
(
"recorded_executor_proxy",
models.CharField(blank=True, default=None, max_length=255, null=True, verbose_name="任务执行人代理"),
),
("start_time", models.DateTimeField(blank=True, null=True, verbose_name="启动时间")),
("finish_time", models.DateTimeField(blank=True, null=True, verbose_name="结束时间")),
("is_started", models.BooleanField(default=False, verbose_name="是否已经启动")),
("is_finished", models.BooleanField(default=False, verbose_name="是否已经完成")),
("is_revoked", models.BooleanField(default=False, verbose_name="是否已经撤销")),
("engine_ver", models.IntegerField(choices=[(1, "v1"), (2, "v2")], default=2, verbose_name="引擎版本")),
("is_child_taskflow", models.BooleanField(default=False, verbose_name="是否为子任务")),
(
"snapshot_id",
models.CharField(blank=True, max_length=32, null=True, verbose_name="实例结构数据,指向实例对应的模板的结构数据"),
),
("extra_info", models.TextField(blank=True, null=True, verbose_name="额外信息")),
],
options={
"verbose_name": "归档任务实例",
"verbose_name_plural": "归档任务实例",
"index_together": {("project_id", "task_template_id")},
},
),
]
36 changes: 35 additions & 1 deletion gcloud/analysis_statistics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
from django.db import models
from django.utils.translation import ugettext_lazy as _

from gcloud.constants import TASK_CATEGORY, TASK_CREATE_METHOD, TaskCreateMethod
from gcloud.constants import TASK_CATEGORY, TASK_CREATE_METHOD, TaskCreateMethod, TEMPLATE_SOURCE, PROJECT
from gcloud.core.models import EngineConfig

MAX_LEN_OF_NAME = 128
guohelu marked this conversation as resolved.
Show resolved Hide resolved


class TemplateNodeStatistics(models.Model):
Expand Down Expand Up @@ -164,3 +167,34 @@ class TemplateCustomVariableSummary(models.Model):
class Meta:
verbose_name = _("流程模板变量统计数据总览")
verbose_name_plural = _("流程模板变量统计数据总览")


class TaskArchivedStatistics(models.Model):
guohelu marked this conversation as resolved.
Show resolved Hide resolved
id = models.BigAutoField(_("id"), primary_key=True)
task_id = models.CharField(_("任务 ID"), max_length=255, db_index=True)
project_id = models.CharField(_("项目 ID"), default=-1, help_text="模板所属项目ID", max_length=32)
name = models.CharField(_("实例名称"), max_length=MAX_LEN_OF_NAME, default="default_instance")
template_id = models.CharField(_("Pipeline模板ID"), max_length=32)
task_template_id = models.CharField(_("Task模板ID"), max_length=32)
template_source = models.CharField(_("流程模板来源"), max_length=32, choices=TEMPLATE_SOURCE, default=PROJECT)
create_method = models.CharField(_("创建方式"), max_length=30, choices=TASK_CREATE_METHOD, default="app")
create_info = models.CharField(_("创建任务额外信息(App maker ID或APP CODE或周期任务ID)"), max_length=255, blank=True)
creator = models.CharField(_("创建者"), max_length=32, blank=True)
create_time = models.DateTimeField(_("任务创建时间"), db_index=True)
archived_time = models.DateTimeField(_("任务归档时间"), db_index=True, auto_now_add=True)
executor = models.CharField(_("执行者"), max_length=32, blank=True)
recorded_executor_proxy = models.CharField(_("任务执行人代理"), max_length=255, default=None, blank=True, null=True)
start_time = models.DateTimeField(_("启动时间"), null=True, blank=True)
finish_time = models.DateTimeField(_("结束时间"), null=True, blank=True)
is_started = models.BooleanField(_("是否已经启动"), default=False)
is_finished = models.BooleanField(_("是否已经完成"), default=False)
is_revoked = models.BooleanField(_("是否已经撤销"), default=False)
engine_ver = models.IntegerField(_("引擎版本"), choices=EngineConfig.ENGINE_VER, default=2)
is_child_taskflow = models.BooleanField(_("是否为子任务"), default=False)
snapshot_id = models.CharField(_("实例结构数据,指向实例对应的模板的结构数据"), blank=True, null=True, max_length=32)
extra_info = models.TextField(_("额外信息"), blank=True, null=True)

class Meta:
verbose_name = _("归档任务实例")
verbose_name_plural = _("归档任务实例")
index_together = ("project_id", "task_template_id")
51 changes: 51 additions & 0 deletions gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import json
from typing import List, Dict

from django.db.models import QuerySet
from gcloud.analysis_statistics.models import TaskArchivedStatistics
from gcloud.taskflow3.models import TaskFlowInstance

from pipeline.contrib.periodic_task.models import PeriodicTaskHistory
from pipeline.eri.models import (
Expand Down Expand Up @@ -79,3 +82,51 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
"periodic_task_history": periodic_task_history,
"pipeline_instances": pipeline_instances,
}


def archived_expired_task(expire_pipeline_instance_ids):
"""
根据 instance_id 将过期任务的数据进行归档
"""
try:
tasks = (
TaskFlowInstance.objects.select_related("pipeline_instance")
.filter(pipeline_instance__instance_id__in=expire_pipeline_instance_ids)
.order_by("id")
)
expired_task = []
for task in tasks:
archived_task = {
"task_id": task.id,
"project_id": task.project_id,
"name": task.pipeline_instance.name,
"template_id": task.pipeline_instance.template_id,
"task_template_id": task.template_id,
"template_source": task.template_source,
"create_method": task.create_method,
"create_info": task.create_info,
"creator": task.pipeline_instance.creator,
"create_time": task.pipeline_instance.create_time,
"executor": task.pipeline_instance.executor,
"recorded_executor_proxy": task.recorded_executor_proxy,
"start_time": task.pipeline_instance.start_time,
"finish_time": task.pipeline_instance.finish_time,
"is_started": task.pipeline_instance.is_started,
"is_finished": task.pipeline_instance.is_finished,
"is_revoked": task.pipeline_instance.is_revoked,
"engine_ver": task.engine_ver,
"is_child_taskflow": task.is_child_taskflow,
"snapshot_id": task.pipeline_instance.snapshot_id,
"extra_info": json.dumps(
{
"flow_type": task.flow_type,
"current_flow": task.current_flow,
"extra_info": task.extra_info,
}
),
}
archived_task = TaskArchivedStatistics(**archived_task)
guohelu marked this conversation as resolved.
Show resolved Hide resolved
expired_task.append(archived_task)
guohelu marked this conversation as resolved.
Show resolved Hide resolved
TaskArchivedStatistics.objects.bulk_create(expired_task)
guohelu marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
raise Exception(f"Archived expired task error: {e}")
53 changes: 52 additions & 1 deletion gcloud/contrib/cleaner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from django.db import transaction
from django.utils import timezone

from gcloud.contrib.cleaner.pipeline.bamboo_engine_tasks import get_clean_pipeline_instance_data
from pipeline.models import PipelineInstance
from gcloud.contrib.cleaner.pipeline.bamboo_engine_tasks import get_clean_pipeline_instance_data, archived_expired_task
from gcloud.contrib.cleaner.signals import pre_delete_pipeline_instance_data
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.utils.decorators import time_record
Expand Down Expand Up @@ -74,3 +75,53 @@ def clean_expired_v2_task_data():
logger.info(f"[clean_expired_v2_task_data] success clean tasks: {task_ids}")
except Exception as e:
logger.exception(f"[clean_expired_v2_task_data] error: {e}")


@periodic_task(run_every=(crontab(*settings.CLEAN_EXPIRED_V2_TASK_CRON)), ignore_result=True, queue="task_data_clean")
guohelu marked this conversation as resolved.
Show resolved Hide resolved
@time_record(logger)
def archived_expired_v2_task_data():
"""
归档过期任务数据
"""
if not settings.ENBLE_ARCHIVE_EXPIRED_V2_TASK:
logger.info("Skip archived expired v2 task data")
return

logger.info("Start archived expired task data...")
try:
validity_day = settings.V2_TASK_VALIDITY_DAY
expire_time = timezone.now() - timezone.timedelta(days=validity_day)

batch_num = settings.CLEAN_EXPIRED_V2_TASK_BATCH_NUM
guohelu marked this conversation as resolved.
Show resolved Hide resolved

all_expired_task_ids = []
guohelu marked this conversation as resolved.
Show resolved Hide resolved
all_expired_pipeline_instance_ids = []
archived_task_ids = []
archived_pipeline_instance_ids = []

tasks_to_process = (
guohelu marked this conversation as resolved.
Show resolved Hide resolved
TaskFlowInstance.objects.filter(
guohelu marked this conversation as resolved.
Show resolved Hide resolved
pipeline_instance__create_time__lt=expire_time, engine_ver=2, pipeline_instance__is_expired=True
)
.order_by("id")
.values("id", "pipeline_instance__instance_id", "is_deleted")[:batch_num]
)
for task in tasks_to_process:
all_expired_task_ids.append(task["id"])
all_expired_pipeline_instance_ids.append(task["pipeline_instance__instance_id"])
if not task["is_deleted"]:
archived_task_ids.append(task["id"])
archived_pipeline_instance_ids.append(task["pipeline_instance__instance_id"])

with transaction.atomic():
# 归档未删除的过期任务
if archived_pipeline_instance_ids:
archived_expired_task(archived_pipeline_instance_ids)
logger.info(f"Archived expired tasks, task_ids: {archived_task_ids}")

# 删除所有过期的任务
TaskFlowInstance.objects.filter(id__in=all_expired_task_ids).delete()
PipelineInstance.objects.filter(instance_id__in=all_expired_pipeline_instance_ids).delete()
logger.info(f"Deleted expired task data, task_ids: {all_expired_task_ids}")
except Exception as e:
logger.exception(f"[archived_expired_v2_task_data] error: {e}")
Loading