Skip to content

Commit

Permalink
[Improve] flink job restore mode option improvement (#2925)
Browse files Browse the repository at this point in the history
* [Improve] flink job restore mode option improvement

* check style minor improvement
  • Loading branch information
wolfboys authored Aug 6, 2023
1 parent 2565cdd commit ca4e055
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.streampark.console.base.exception.ApiDetailException;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
Expand Down Expand Up @@ -61,6 +62,12 @@ public class FlinkEnv implements Serializable {

private transient FlinkVersion flinkVersion;

private transient String versionOfLarge;

private transient String versionOfMiddle;

private transient String versionOfLast;

private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString();

public void doSetFlinkConf() throws ApiDetailException {
Expand Down Expand Up @@ -101,23 +108,31 @@ public void unzipFlinkConf() {
this.flinkConf = DeflaterUtils.unzipString(this.flinkConf);
}

@JsonIgnore
public String getLargeVersion() {
return this.version.substring(0, this.version.lastIndexOf("."));
if (StringUtils.isNotEmpty(this.version)) {
return this.version.substring(0, this.version.lastIndexOf("."));
}
return null;
}

@JsonIgnore
public String getVersionOfFirst() {
return this.version.split("\\.")[0];
if (StringUtils.isNotEmpty(this.version)) {
return this.version.split("\\.")[0];
}
return null;
}

@JsonIgnore
public String getVersionOfMiddle() {
return this.version.split("\\.")[1];
if (StringUtils.isNotEmpty(this.version)) {
return this.version.split("\\.")[1];
}
return null;
}

@JsonIgnore
public String getVersionOfLast() {
return this.version.split("\\.")[2];
if (StringUtils.isNotEmpty(this.version)) {
return this.version.split("\\.")[2];
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ enum FLINK_API {
* flink environment data
* @returns Promise<FlinkEnv[]>
*/
export function fetchFlinkEnv() {
export function fetchListFlinkEnv() {
return defHttp.post<FlinkEnv[]>({
url: FLINK_API.LIST,
});
}

export function fetchFlinkEnv(id: string) {
return defHttp.post<FlinkEnv>({
url: FLINK_API.GET,
data: { id: id },
});
}

/**
* Set the default
* @param {String} id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
}
}

.extra .conf-switch {
.tip-info {
margin-top: 10px;
color: darkgrey;
margin-left: 5px;
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ export default {
status: 'Run Status',
startTime: 'Start Time',
endTime: 'End Time',
restoreModeTip:
'restore mode is supported since flink 1.15, usually, you do not have to set this parameter',
release: {
releaseTitle: 'The current release of the application is in progress.',
releaseDesc: 'are you sure you want to force another build',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export default {
status: '运行状态',
startTime: '启动时间',
endTime: '结束时间',
restoreModeTip: 'flink 1.15开始支持restore模式,一般情况下不用设置该参数',
release: {
releaseTitle: '该应用程序的当前启动正在进行中.',
releaseDesc: '您确定要强制进行另一次构建吗',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
});
</script>
<script setup lang="ts" name="StartApplicationModal">
import { h, onMounted, ref, unref } from 'vue';
import { h } from 'vue';
import { Select, Input, Tag } from 'ant-design-vue';
import { BasicForm, useForm } from '/@/components/Form';
import { SvgIcon, Icon } from '/@/components/Icon';
Expand All @@ -34,7 +34,7 @@
import { fetchStart } from '/@/api/flink/app/app';
import { RestoreModeEnum } from '/@/enums/flinkEnum';
import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv';
import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type';
import { renderFlinkAppRestoreMode } from '/@/views/flink/app/hooks/useFlinkRender';
const SelectOption = Select.Option;
Expand All @@ -45,8 +45,6 @@
const emits = defineEmits(['register', 'updateOption']);
const receiveData = reactive<Recordable>({});
const flinkEnvs = ref<FlinkEnv[]>([]);
const [registerModal, { closeModal }] = useModalInner((data) => {
if (data) {
Object.assign(receiveData, data);
Expand All @@ -73,7 +71,7 @@
afterItem: () =>
h(
'span',
{ class: 'conf-switch' },
{ class: 'tip-info' },
'restore the application from savepoint or latest checkpoint',
),
},
Expand All @@ -87,7 +85,7 @@
afterItem: () =>
h(
'span',
{ class: 'conf-switch' },
{ class: 'tip-info' },
'restore the application from savepoint or latest checkpoint',
),
slot: 'savepoint',
Expand All @@ -99,21 +97,8 @@
label: 'restore mode',
component: 'Select',
defaultValue: RestoreModeEnum.NO_CLAIM,
componentProps: {
options: [
{ label: 'CLAIM', value: RestoreModeEnum.CLAIM },
{ label: 'NO_CLAIM', value: RestoreModeEnum.NO_CLAIM },
{ label: 'LEGACY', value: RestoreModeEnum.LEGACY },
],
},
afterItem: () =>
h(
'span',
{ class: 'conf-switch' },
'restore mode is supported since flink 1.15, usually, you do not have to set this parameter',
),
ifShow: ({ values }) =>
values.startSavePointed && checkFlinkVersion(receiveData.application.versionId),
render: (renderCallbackParams) => renderFlinkAppRestoreMode(renderCallbackParams),
ifShow: ({ values }) => values.startSavePointed && checkFlinkVersion(),
},
{
field: 'allowNonRestoredState',
Expand All @@ -124,7 +109,7 @@
unCheckedChildren: 'OFF',
},
afterItem: () =>
h('span', { class: 'conf-switch' }, 'ignore savepoint then cannot be restored'),
h('span', { class: 'tip-info' }, 'ignore savepoint then cannot be restored'),
defaultValue: false,
ifShow: ({ values }) => values.startSavePointed,
},
Expand Down Expand Up @@ -191,16 +176,11 @@
}
}
function checkFlinkVersion(versionId: string) {
let env = unref(flinkEnvs).filter((env) => env.id == versionId)[0];
return parseInt(env.version.split('.')[1]) >= 15;
async function checkFlinkVersion() {
const versionId = receiveData.application.versionId;
const flinkVersion = await fetchFlinkEnv(versionId);
return parseInt(flinkVersion.versionOfMiddle) >= 15;
}
onMounted(() => {
fetchFlinkEnv().then((res) => {
flinkEnvs.value = res;
});
});
</script>
<template>
<BasicModal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@
unCheckedChildren: 'OFF',
},
defaultValue: true,
afterItem: () =>
h('span', { class: 'conf-switch' }, 'trigger savePoint before taking cancel'),
afterItem: () => h('span', { class: 'tip-info' }, 'trigger savePoint before taking cancel'),
},
{
field: 'customSavepoint',
Expand All @@ -65,7 +64,7 @@
placeholder: 'Entry the custom savepoint path',
allowClear: true,
},
afterItem: () => h('span', { class: 'conf-switch' }, 'cancel job with savepoint path'),
afterItem: () => h('span', { class: 'tip-info' }, 'cancel job with savepoint path'),
ifShow: ({ values }) => !!values.stopSavePointed,
},
{
Expand All @@ -77,7 +76,7 @@
unCheckedChildren: 'OFF',
},
defaultValue: false,
afterItem: () => h('span', { class: 'conf-switch' }, 'Send max watermark before stopped'),
afterItem: () => h('span', { class: 'tip-info' }, 'Send max watermark before stopped'),
},
],
colon: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
import { fetchSelect } from '/@/api/flink/project';
import { fetchAlertSetting } from '/@/api/flink/setting/alert';
import { fetchFlinkCluster } from '/@/api/flink/setting/flinkCluster';
import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv';
import { fetchFlinkEnv, fetchListFlinkEnv } from '/@/api/flink/setting/flinkEnv';
import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type';
import { AlertSetting } from '/@/api/flink/setting/types/alert.type';
import { FlinkCluster } from '/@/api/flink/setting/types/flinkCluster.type';
Expand Down Expand Up @@ -167,9 +167,9 @@ export const useCreateAndEditSchema = (
];
});

function handleFlinkVersion(id: number | string) {
async function handleFlinkVersion(id: number | string) {
if (!dependencyRef) return;
scalaVersion = unref(flinkEnvs)?.find((v) => v.id === id)?.scalaVersion || '';
scalaVersion = await fetchFlinkEnv(id)?.scalaVersion;
checkPomScalaVersion();
}

Expand Down Expand Up @@ -617,7 +617,7 @@ export const useCreateAndEditSchema = (
});

//get flinkEnv
fetchFlinkEnv().then((res) => {
fetchListFlinkEnv().then((res) => {
flinkEnvs.value = res;
});
//get flinkCluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { handleConfTemplate } from '/@/api/flink/config';
import { decodeByBase64 } from '/@/utils/cipher';
import { useMessage } from '/@/hooks/web/useMessage';
import { SelectValue } from 'ant-design-vue/lib/select';
import { CandidateTypeEnum, FailoverStrategyEnum } from '/@/enums/flinkEnum';
import { CandidateTypeEnum, FailoverStrategyEnum, RestoreModeEnum } from '/@/enums/flinkEnum';
import { useI18n } from '/@/hooks/web/useI18n';
import { fetchYarnQueueList } from '/@/api/flink/setting/yarnQueue';
import { ApiSelect } from '/@/components/Form';
Expand Down Expand Up @@ -227,7 +227,7 @@ export const renderOptionsItems = (
rules={[{ validator: conf.validator }]}
/>
)}
{conf.type === 'switch' && <span class="conf-switch">({conf.placeholder})</span>}
{conf.type === 'switch' && <span class="tip-info">({conf.placeholder})</span>}
<p class="conf-desc"> {descriptionFilter(conf)} </p>
</Form.Item>
);
Expand Down Expand Up @@ -610,3 +610,39 @@ export const renderStreamParkJarApp = ({ model, resources }) => {
</div>
);
};

export const renderFlinkAppRestoreMode = ({ model, field }: RenderCallbackParams) => {
return (
<div>
<Select
value={model[field]}
onChange={(value) => (model[field] = value)}
placeholder="Please select restore mode"
>
<Select.Option key="claim" value={RestoreModeEnum.CLAIM}>
<Tag color="#13c2c2" style=";margin-left: 5px;" size="small">
CLAIM
</Tag>
</Select.Option>
<Select.Option key="no_claim" value={RestoreModeEnum.NO_CLAIM}>
<Tag color="#2db7f5" style=";margin-left: 5px;" size="small">
NO_CLAIM
</Tag>
</Select.Option>
<Select.Option key="legacy" value={RestoreModeEnum.LEGACY}>
<Tag color="#8E50FF" style=";margin-left: 5px;" size="small">
LEGACY
</Tag>
</Select.Option>
</Select>
<p class="mt-10px">
<span class="note-info">
<Tag color="#2db7f5" class="tag-note" size="small">
{t('flink.app.noteInfo.note')}
</Tag>
<span class="tip-info">{t('flink.app.restoreModeTip')}</span>
</span>
</p>
</div>
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,3 @@
}
}
</script>

<style lang="less">
.conf-switch {
display: inline-block;
margin-top: 10px;
color: darkgrey;
}
</style>
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
},
defaultValue: false,
afterItem: () =>
h('span', { class: 'conf-switch' }, t('flink.variable.form.desensitizationDesc')),
h('span', { class: 'tip-info' }, t('flink.variable.form.desensitizationDesc')),
},
];
});
Expand Down Expand Up @@ -194,11 +194,3 @@
}
}
</script>

<style lang="less">
.conf-switch {
display: inline-block;
margin-top: 10px;
color: darkgrey;
}
</style>
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
allowClear: true,
placeholder: t('setting.alarm.alertNamePlaceHolder'),
},
afterItem: () => h('span', { class: 'conf-switch' }, t('setting.alarm.alertNameTips')),
afterItem: () => h('span', { class: 'tip-info' }, t('setting.alarm.alertNameTips')),
dynamicRules: () => {
return [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
afterItem: () =>
h(
'span',
{ class: 'conf-switch' },
{ class: 'tip-info' },
'Supported variables: {job_id}, {yarn_id}, {job_name},Example: https://grafana/flink-monitoring?var-JobId=var-JobId={job_id}',
),
rules: [
Expand Down
Loading

0 comments on commit ca4e055

Please sign in to comment.