Skip to content

Commit

Permalink
Overwriting of cached task outputs for a single execution #minor (#616)
Browse files Browse the repository at this point in the history
* Added direnv .envrc to gitignore

Signed-off-by: Nick Müller <[email protected]>

* Added cache skip override to launch forms

Signed-off-by: Nick Müller <[email protected]>

* Adapted protobuf timestamp helpers to support number and Long values

Signed-off-by: Nick Müller <[email protected]>

* Added SIGINT handling for server shutdown
Allows for stopping docker run via CTRL+C

Signed-off-by: Nick Müller <[email protected]>

* Renamed skipCache flag to overwriteCache

Signed-off-by: Nick Müller <[email protected]>

Signed-off-by: Nick Müller <[email protected]>
  • Loading branch information
Nick Müller authored Jan 6, 2023
1 parent 385eb69 commit 88bcfba
Show file tree
Hide file tree
Showing 21 changed files with 484 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.env
*.swp
# direnv
.envrc

# C extensions
*.so
Expand Down
9 changes: 6 additions & 3 deletions packages/zapp/console/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ if (env.ADMIN_API_USE_SSL === 'https') {
});
}

process.on('SIGTERM', () => {
console.info('SIGTERM signal received. Shutting down.');
function shutdown(signal) {
console.info(`${signal} signal received. Shutting down.`);
server.close((error) => {
if (error) {
console.error('Failed to close server:', error);
Expand All @@ -74,4 +74,7 @@ process.on('SIGTERM', () => {
console.log('Server closed');
process.exit(0);
});
});
}

process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
16 changes: 12 additions & 4 deletions packages/zapp/console/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ export function isValidDate(input: string | Date): boolean {
/** Converts a Protobuf Timestamp object to a JS Date */
export function timestampToDate(timestamp: Protobuf.ITimestamp): Date {
const nanos = timestamp.nanos || 0;
const milliseconds = (timestamp.seconds as Long).toNumber() * 1000 + nanos / 1e6;
const seconds =
typeof timestamp.seconds === 'number'
? timestamp.seconds
: (timestamp.seconds as Long).toNumber();
const milliseconds = seconds * 1000 + nanos / 1e6;
return new Date(milliseconds);
}

/** A sort comparison function for ordering timestamps in ascending progression */
export function compareTimestampsAscending(a: Protobuf.ITimestamp, b: Protobuf.ITimestamp) {
const leftSeconds: Long = a.seconds || Long.fromNumber(0);
const leftSeconds: Long =
(typeof a.seconds === 'number' ? Long.fromNumber(a.seconds) : a.seconds) || Long.fromNumber(0);
const leftNanos: number = a.nanos || 0;
const rightSeconds: Long = b.seconds || Long.fromNumber(0);
const rightSeconds: Long =
(typeof b.seconds === 'number' ? Long.fromNumber(b.seconds) : b.seconds) || Long.fromNumber(0);
const rightNanos: number = b.nanos || 0;
if (leftSeconds.eq(rightSeconds)) {
return leftNanos - rightNanos;
Expand All @@ -44,7 +50,9 @@ export function dateToTimestamp(date: Date): Protobuf.Timestamp {
/** Converts a Protobuf Duration object to its equivalent value in milliseconds */
export function durationToMilliseconds(duration: Protobuf.IDuration): number {
const nanos = duration.nanos || 0;
return (duration.seconds as Long).toNumber() * 1000 + nanos / 1e6;
const seconds =
typeof duration.seconds === 'number' ? duration.seconds : (duration.seconds as Long).toNumber();
return seconds * 1000 + nanos / 1e6;
}

/** Converts a (possibly fractional) value in milliseconds to a Protobuf Duration object */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ const NodeExecutionCacheStatusIcon: React.FC<
> = React.forwardRef(({ status, ...props }, ref) => {
switch (status) {
case CatalogCacheStatus.CACHE_DISABLED:
case CatalogCacheStatus.CACHE_MISS: {
case CatalogCacheStatus.CACHE_MISS:
case CatalogCacheStatus.CACHE_SKIPPED: {
return <InfoOutlined {...props} ref={ref} data-testid="cache-icon" />;
}
case CatalogCacheStatus.CACHE_HIT: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const ExecutionMetadataExtra: React.FC<{
rawOutputDataConfig,
securityContext,
interruptible,
overwriteCache,
} = execution.spec;

const [launchPlanSpec, setLaunchPlanSpec] = React.useState<Partial<LaunchPlanSpec>>({});
Expand Down Expand Up @@ -71,6 +72,10 @@ export const ExecutionMetadataExtra: React.FC<{
label: ExecutionMetadataLabels.interruptible,
value: interruptible ? (interruptible.value ? 'true' : 'false') : dashedValueString,
},
{
label: ExecutionMetadataLabels.overwriteCache,
value: overwriteCache ? 'true' : 'false',
},
];

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function useRelaunchWorkflowFormState({ execution }: RelaunchExecutionFormProps)
authRole,
securityContext,
interruptible,
overwriteCache,
},
} = execution;

Expand All @@ -61,6 +62,7 @@ function useRelaunchWorkflowFormState({ execution }: RelaunchExecutionFormProps)
authRole,
securityContext,
interruptible,
overwriteCache,
};
},
},
Expand All @@ -76,7 +78,7 @@ function useRelaunchTaskFormState({ execution }: RelaunchExecutionFormProps) {
defaultValue: {} as TaskInitialLaunchParameters,
doFetch: async (execution) => {
const {
spec: { authRole, launchPlan: taskId, interruptible },
spec: { authRole, launchPlan: taskId, interruptible, overwriteCache },
} = execution;
const task = await apiContext.getTask(taskId);
const inputDefinitions = getTaskInputs(task);
Expand All @@ -87,7 +89,7 @@ function useRelaunchTaskFormState({ execution }: RelaunchExecutionFormProps) {
},
apiContext,
);
return { authRole, values, taskId, interruptible };
return { authRole, values, taskId, interruptible, overwriteCache };
},
},
execution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export enum ExecutionMetadataLabels {
parallelism = 'Parallelism',
securityContextDefault = 'default',
interruptible = 'Interruptible override',
overwriteCache = 'Overwrite cached outputs',
}

export const tabs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const clusterTestId = `metadata-${ExecutionMetadataLabels.cluster}`;
const startTimeTestId = `metadata-${ExecutionMetadataLabels.time}`;
const durationTestId = `metadata-${ExecutionMetadataLabels.duration}`;
const interruptibleTestId = `metadata-${ExecutionMetadataLabels.interruptible}`;
const overwriteCacheTestId = `metadata-${ExecutionMetadataLabels.overwriteCache}`;

jest.mock('models/Launch/api', () => ({
getLaunchPlan: jest.fn(() => Promise.resolve({ spec: {} })),
Expand Down Expand Up @@ -95,4 +96,22 @@ describe('ExecutionMetadata', () => {
const { getByTestId } = renderMetadata();
expect(getByTestId(interruptibleTestId)).toHaveTextContent(dashedValueString);
});

it('shows true if cache was overwritten for execution', () => {
execution.spec.overwriteCache = true;
const { getByTestId } = renderMetadata();
expect(getByTestId(overwriteCacheTestId)).toHaveTextContent('true');
});

it('shows false if cache was not overwritten for execution', () => {
execution.spec.overwriteCache = false;
const { getByTestId } = renderMetadata();
expect(getByTestId(overwriteCacheTestId)).toHaveTextContent('false');
});

it('shows false if no cache overwrite value is found in execution spec', () => {
delete execution.spec.overwriteCache;
const { getByTestId } = renderMetadata();
expect(getByTestId(overwriteCacheTestId)).toHaveTextContent('false');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,39 @@ describe('RelaunchExecutionForm', () => {
}),
});
});

it('should not set cache overwrite value if not provided', async () => {
delete execution.spec.overwriteCache;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: undefined,
}),
});
});

it('should have correct cache overwrite value if override is enabled', async () => {
execution.spec.overwriteCache = true;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: true,
}),
});
});

it('should have correct cache overwrite value if override is disabled', async () => {
execution.spec.overwriteCache = false;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: undefined,
}),
});
});
});

describe('Launch form with full inputs', () => {
Expand Down Expand Up @@ -322,5 +355,38 @@ describe('RelaunchExecutionForm', () => {
}),
});
});

it('should not set cache overwrite value if not provided', async () => {
delete execution.spec.overwriteCache;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: undefined,
}),
});
});

it('should have correct cache overwrite value if override is enabled', async () => {
execution.spec.overwriteCache = true;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: true,
}),
});
});

it('should have correct cache overwrite value if override is disabled', async () => {
execution.spec.overwriteCache = false;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: false,
}),
});
});
});
});
2 changes: 1 addition & 1 deletion packages/zapp/console/src/components/Executions/strings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const str = {
cachePopulatedMessage: 'The result of this execution was written to cache.',
cachePutFailure: 'Failed to write output for this execution to cache.',
mapCacheMessage: "Check the detail panel for each task's cache status.",
cacheSkippedMessage: 'Cache skipped.',
cacheSkippedMessage: 'Cache was skipped for this execution.',
unknownCacheStatusString: 'Cache status is unknown',
viewSourceExecutionString: 'View source execution',
fromCache: 'From cache',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Typography } from '@material-ui/core';
import FormControlLabel from '@material-ui/core/FormControlLabel';
import Checkbox from '@material-ui/core/Checkbox';
import * as React from 'react';
import { useStyles } from './styles';
import { LaunchOverwriteCacheInputRef } from './types';
import t from './strings';

const isValueValid = (value: any) => {
return value !== undefined && value !== null;
};

interface LaunchOverwriteCacheInputProps {
initialValue?: boolean | null;
}

export const LaunchOverwriteCacheInputImpl: React.ForwardRefRenderFunction<
LaunchOverwriteCacheInputRef,
LaunchOverwriteCacheInputProps
> = (props, ref) => {
// overwriteCache stores the override to enable/disable the setting for an execution
const [overwriteCache, setOverwriteCache] = React.useState(false);

React.useEffect(() => {
if (isValueValid(props.initialValue)) {
setOverwriteCache(() => props.initialValue!);
}
}, [props.initialValue]);

const handleInputChange = React.useCallback(() => {
setOverwriteCache((prevState) => !prevState);
}, [overwriteCache]);

React.useImperativeHandle(
ref,
() => ({
getValue: () => {
return overwriteCache;
},
validate: () => true,
}),
[overwriteCache],
);

const styles = useStyles();

return (
<section>
<header className={styles.sectionHeader}>
<Typography variant="h6">Caching</Typography>
<Typography variant="body2">
Enabling the cache overwrite causes Flyte to ignore all previously computed and stored
outputs for a single execution and run all calculations again, overwriting any cached data
after a successful execution.
</Typography>
</header>
<section title={t('overwriteCache')}>
<FormControlLabel
control={<Checkbox checked={overwriteCache} onChange={handleInputChange} />}
label={t('overwriteCache')}
/>
</section>
</section>
);
};

export const LaunchOverwriteCacheInput = React.forwardRef(LaunchOverwriteCacheInputImpl);
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { LaunchFormInputs } from './LaunchFormInputs';
import { LaunchState } from './launchMachine';
import { LaunchRoleInput } from './LaunchRoleInput';
import { LaunchInterruptibleInput } from './LaunchInterruptibleInput';
import { LaunchOverwriteCacheInput } from './LaunchOverwriteCacheInput';
import { SearchableSelector } from './SearchableSelector';
import { useStyles } from './styles';
import { BaseInterpretedLaunchState, BaseLaunchService, LaunchTaskFormProps } from './types';
Expand All @@ -20,6 +21,7 @@ export const LaunchTaskForm: React.FC<LaunchTaskFormProps> = (props) => {
formInputsRef,
roleInputRef,
interruptibleInputRef,
overwriteCacheInputRef,
state,
service,
taskSourceSelectorState,
Expand Down Expand Up @@ -81,6 +83,10 @@ export const LaunchTaskForm: React.FC<LaunchTaskFormProps> = (props) => {
initialValue={state.context.interruptible}
ref={interruptibleInputRef}
/>
<LaunchOverwriteCacheInput
initialValue={state.context.overwriteCache}
ref={overwriteCacheInputRef}
/>
</DialogContent>
<LaunchFormActions
state={baseState}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { isEnterInputsState } from './utils';
import { LaunchRoleInput } from './LaunchRoleInput';
import { LaunchFormAdvancedInputs } from './LaunchFormAdvancedInputs';
import { LaunchInterruptibleInput } from './LaunchInterruptibleInput';
import { LaunchOverwriteCacheInput } from './LaunchOverwriteCacheInput';

/** Renders the form for initiating a Launch request based on a Workflow */
export const LaunchWorkflowForm: React.FC<LaunchWorkflowFormProps> = (props) => {
Expand All @@ -22,6 +23,7 @@ export const LaunchWorkflowForm: React.FC<LaunchWorkflowFormProps> = (props) =>
roleInputRef,
advancedOptionsRef,
interruptibleInputRef,
overwriteCacheInputRef,
state,
service,
workflowSourceSelectorState,
Expand Down Expand Up @@ -124,6 +126,10 @@ export const LaunchWorkflowForm: React.FC<LaunchWorkflowFormProps> = (props) =>
initialValue={state.context.interruptible}
ref={interruptibleInputRef}
/>
<LaunchOverwriteCacheInput
initialValue={state.context.overwriteCache}
ref={overwriteCacheInputRef}
/>
</AccordionDetails>
</Accordion>
</DialogContent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export interface WorkflowLaunchContext extends BaseLaunchContext {
annotations?: Admin.IAnnotations | null;
securityContext?: Core.ISecurityContext | null;
interruptible?: Protobuf.IBoolValue | null;
overwriteCache?: boolean | null;
}

export interface TaskLaunchContext extends BaseLaunchContext {
Expand All @@ -92,6 +93,7 @@ export interface TaskLaunchContext extends BaseLaunchContext {
taskVersion?: Identifier;
taskVersionOptions?: Task[];
interruptible?: Protobuf.IBoolValue | null;
overwriteCache?: boolean | null;
}

export interface TaskResumeContext extends BaseLaunchContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const str = {
launchPlan: 'Launch Plan',
interruptible: 'Interruptible',
viewNodeInputs: 'View node inputs',
overwriteCache: 'Overwrite cached outputs',
};

export { patternKey } from '@flyteconsole/locale';
Expand Down
Loading

0 comments on commit 88bcfba

Please sign in to comment.