Skip to content

Commit

Permalink
use isKey instead of getKeys
Browse files Browse the repository at this point in the history
  • Loading branch information
busma13 committed Oct 16, 2024
1 parent 72525e8 commit 1c8b870
Show file tree
Hide file tree
Showing 20 changed files with 124 additions and 97 deletions.
4 changes: 2 additions & 2 deletions packages/job-components/src/job-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { validateJobConfig } from './config-validators.js';
import { jobSchema } from './job-schemas.js';
import { OperationLoader } from './operation-loader/index.js';
import { registerApis } from './register-apis.js';
import { OperationModule } from './operations/index.js';
import { OperationAPIConstructor, OperationModule } from './operations/index.js';

export class JobValidator {
public schema: convict.Schema<any>;
Expand All @@ -31,7 +31,7 @@ export class JobValidator {
// top level job validation occurs, but not operations
const jobConfig = validateJobConfig(this.schema, cloneDeep(jobSpec));
const assetIds = jobConfig.assets || [];
const apis: Record<string, any> = {};
const apis: Record<string, OperationAPIConstructor> = {};

type ValidateJobFn = (job: ValidatedJobConfig) => void;
let validateJobFns: ValidateJobFn[] = [];
Expand Down
20 changes: 11 additions & 9 deletions packages/job-components/test/execution-context/worker-spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import 'jest-extended';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { pDelay, DataEntity } from '@terascope/utils';
import { pDelay, DataEntity, isKey } from '@terascope/utils';
import { terasliceOpPath } from '../helpers/index.js';
import {
WorkerExecutionContext, TestContext, newTestExecutionConfig,
FetcherCore, ProcessorCore, newTestSlice, SliceAnalyticsData
FetcherCore, ProcessorCore, newTestSlice
} from '../../src/index.js';

const dirname = path.dirname(fileURLToPath(import.meta.url));
Expand Down Expand Up @@ -210,14 +210,16 @@ describe('WorkerExecutionContext', () => {

for (const metric of ['size', 'time']) {
for (let i = 0; i < ops; i++) {
const previous = previousAnalytics[metric as keyof SliceAnalyticsData][i];
const current = analytics![metric as keyof SliceAnalyticsData][i];
if (i === 0) {
if (current !== previous) {
console.warn(`Metric "${metric}" should not have changed for the fetcher. Expected ${current} === ${previous}`);
if (isKey(previousAnalytics, metric)) {
const previous = previousAnalytics[metric][i];
const current = analytics![metric][i];
if (i === 0) {
if (current !== previous) {
console.warn(`Metric "${metric}" should not have changed for the fetcher. Expected ${current} === ${previous}`);
}
} else if (current < previous) {
console.warn(`Metric "${metric}" should be greater than the last run. Expected ${current} >= ${previous}.`);
}
} else if (current < previous) {
console.warn(`Metric "${metric}" should be greater than the last run. Expected ${current} >= ${previous}.`);
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions packages/scripts/src/helpers/test-runner/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Kafka } from 'kafkajs';
import { execa } from 'execa';
import {
pWhile, TSError, debugLogger,
toHumanTime, getErrorStatusCode
toHumanTime, getErrorStatusCode, isKey
} from '@terascope/utils';
import { getServicesForSuite, getRootDir } from '../misc.js';
import {
Expand Down Expand Up @@ -816,10 +816,15 @@ async function startService(options: TestOptions, service: Service): Promise<()
}
let version: string;
if (serviceName === 'kafka') {
version = config[`${serviceName.toUpperCase()}_IMAGE_VERSION` as keyof typeof config] as string;
const key = 'KAFKA_IMAGE_VERSION';
version = config[key];
signale.pending(`starting ${service}@${config.KAFKA_VERSION} service...`);
} else {
version = config[`${serviceName.toUpperCase()}_VERSION` as keyof typeof config] as string;
const key = `${serviceName.toUpperCase()}_VERSION`;
if (!isKey(config, key)) {
throw new Error(`No version configuration variable found for ${serviceName}`);
}
version = config[key] as string;
signale.pending(`starting ${service}@${version} service...`);
}
if (options.useExistingServices) {
Expand Down
6 changes: 3 additions & 3 deletions packages/terafoundation/src/cluster-context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _cluster from 'node:cluster';
import { get, isFunction, getFullErrorStack } from '@terascope/utils';
import { get, isFunction, getFullErrorStack, isKey } from '@terascope/utils';
import type { Terafoundation } from '@terascope/types';
import { getArgs } from './sysconfig.js';
import validateConfigs from './validate-configs.js';
Expand Down Expand Up @@ -49,9 +49,9 @@ export class ClusterContext<
let keyFound = false;
if (config.descriptors) {
Object.keys(config.descriptors).forEach((key) => {
if (this.assignment === key) {
if (this.assignment === key && isKey(config, key)) {
keyFound = true;
config[key as keyof typeof config](this);
config[key](this);
}
});
// if no key was explicitly set then default to worker
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-cli/src/helpers/display.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import CliTable from 'cli-table3';
import easyTable from 'easy-table';
import prompts from 'prompts';
import { toTitleCase } from '@terascope/utils';
import { Action, Tense, UpdateActions } from '../interfaces';
import { Action, Tense, UpdateActions } from '../interfaces.js';

function pretty(headerValues: string[], rows: string[]) {
const header = headerValues.map((item): ttyTable.Header => ({
Expand Down
31 changes: 20 additions & 11 deletions packages/teraslice-cli/src/helpers/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import fs from 'fs-extra';
import {
has, toString, pDelay, pMap, pRetry, getKeys,
has, toString, pDelay, pMap,
pRetry, isKey
} from '@terascope/utils';
import { Teraslice } from '@terascope/types';
import chalk from 'chalk';
Expand Down Expand Up @@ -728,24 +729,28 @@ export default class Jobs {

formatJobConfig(jobConfig: JobConfigFile) {
const finalJobConfig: Record<string, any> = {};
getKeys(jobConfig).forEach((key) => {
if (key === '__metadata') {
finalJobConfig.job_id = jobConfig[key].cli.job_id;
finalJobConfig._updated = jobConfig[key].cli.updated;
} else {
finalJobConfig[key] = jobConfig[key];
Object.keys(jobConfig).forEach((key) => {
if (isKey(jobConfig, key)) {
if (key === '__metadata') {
finalJobConfig.job_id = jobConfig[key].cli.job_id;
finalJobConfig._updated = jobConfig[key].cli.updated;
} else {
finalJobConfig[key] = jobConfig[key];
}
}
});
return finalJobConfig;
return finalJobConfig as Partial<Teraslice.JobConfig>;
}

getLocalJSONConfigs(srcDir: string, files: string[]) {
const localJobConfigs: Record<string, any> = {};
const localJobConfigs: Record<string, Partial<Teraslice.JobConfig>> = {};
for (const file of files) {
const filePath = path.join(srcDir, file);
const jobConfig: JobConfigFile = JSON.parse(fs.readFileSync(filePath, { encoding: 'utf-8' }));
const formattedJobConfig = this.formatJobConfig(jobConfig);
localJobConfigs[formattedJobConfig.job_id] = formattedJobConfig;
if (formattedJobConfig.job_id) {
localJobConfigs[formattedJobConfig.job_id] = formattedJobConfig;
}
}
return localJobConfigs;
}
Expand Down Expand Up @@ -799,7 +804,11 @@ export default class Jobs {
/// We only want to display a diff of this field if it's greater than a minute
let showUpdateField = false;
const jobConfigUpdateTime = new Date(job.config._updated).getTime();
const localConfigUpdateTime = new Date(localJobConfigs[job.id]._updated).getTime();
const updated = localJobConfigs[job.id]._updated;
if (updated === undefined) {
throw new Error(`Could not retrieve last update time of job ${job.id}`);
}
const localConfigUpdateTime = new Date(updated).getTime();
const timeDiff = Math.abs(localConfigUpdateTime - jobConfigUpdateTime);
if (timeDiff > (1000 * 60)) {
showUpdateField = true;
Expand Down
6 changes: 3 additions & 3 deletions packages/teraslice-cli/src/helpers/tjm-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
unset,
get,
cloneDeep,
getKeys
isKey
} from '@terascope/utils';
import { Teraslice } from '@terascope/types';
import Config from './config.js';
Expand Down Expand Up @@ -220,8 +220,8 @@ export async function saveJobConfigToFile(
const jobConfigCopy: Record<string, any> = {};
const keysToSkip = ['job_id', '_created', '_context', '_updated', '_deleted', '_deleted_on'];

for (const key of getKeys(jobConfig)) {
if (!keysToSkip.includes(key)) {
for (const key of Object.keys(jobConfig)) {
if (!keysToSkip.includes(key) && isKey(jobConfig, key)) {
jobConfigCopy[key] = cloneDeep(jobConfig[key]);
}
}
Expand Down
7 changes: 6 additions & 1 deletion packages/teraslice-client-js/test/ex-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ describe('Teraslice Ex', () => {
});
});

describe.each(['stop', 'pause', 'resume'] as const)('->%s', (method) => {
const methodTestCases = [
'stop',
'pause',
'resume'
] as const;
describe.each(methodTestCases)('->%s', (method) => {
describe('when called with nothing', () => {
beforeEach(() => {
scope.post(`/ex/some-ex-id/_${method}`)
Expand Down
8 changes: 7 additions & 1 deletion packages/teraslice-client-js/test/job-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,13 @@ describe('Teraslice Job', () => {
});
});

describe.each(['start', 'stop', 'pause', 'resume'] as const)('->%s', (method) => {
const methodTestCases = [
'start',
'stop',
'pause',
'resume'
] as const;
describe.each(methodTestCases)('->%s', (method) => {
describe('when called with nothing', () => {
beforeEach(() => {
scope.post(`/jobs/some-job-id/_${method}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface ClusterAnalytics {
}

export interface ExecutionAnalyticsMessage extends Message {
kind: string;
kind: keyof ClusterAnalytics;
stats: ExecutionAnalytics;
}

Expand Down
35 changes: 17 additions & 18 deletions packages/teraslice-messaging/src/cluster-master/server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { isNumber, cloneDeep } from '@terascope/utils';
import * as i from './interfaces.js';
import * as core from '../messenger/index.js';
import { AggregatedExecutionAnalytics } from '@terascope/types';
import { cloneDeep, isKey, isNumber } from '@terascope/utils';
import { Message, ResponseError, Server as _Server } from '../messenger/index.js';
import { ClusterAnalytics, ExecutionAnalyticsMessage, ServerOptions } from './interfaces.js';

export class Server extends core.Server {
private clusterAnalytics: i.ClusterAnalytics;
export class Server extends _Server {
private clusterAnalytics: ClusterAnalytics;

constructor(opts: i.ServerOptions) {
constructor(opts: ServerOptions) {
const {
port,
actionTimeout,
Expand Down Expand Up @@ -53,47 +52,47 @@ export class Server extends core.Server {
await this.listen();
}

sendExecutionPause(exId: string): Promise<core.Message | null> {
sendExecutionPause(exId: string): Promise<Message | null> {
return this.send(exId, 'execution:pause');
}

sendExecutionResume(exId: string): Promise<core.Message | null> {
sendExecutionResume(exId: string): Promise<Message | null> {
return this.send(exId, 'execution:resume');
}

sendExecutionAnalyticsRequest(exId: string): Promise<core.Message | null> {
sendExecutionAnalyticsRequest(exId: string): Promise<Message | null> {
return this.send(exId, 'execution:analytics');
}

getClusterAnalytics(): i.ClusterAnalytics {
getClusterAnalytics(): ClusterAnalytics {
return cloneDeep(this.clusterAnalytics);
}

onExecutionFinished(fn: (clientId: string, error?: core.ResponseError) => void): void {
onExecutionFinished(fn: (clientId: string, error?: ResponseError) => void): void {
this.on('execution:finished', (msg) => {
fn(msg.scope, msg.error);
});
}

private onConnection(exId: string, socket: SocketIO.Socket) {
this.handleResponse(socket, 'execution:finished', (msg: core.Message) => {
this.handleResponse(socket, 'execution:finished', (msg: Message) => {
this.emit('execution:finished', {
scope: exId,
payload: {},
error: msg.payload.error,
});
});

this.handleResponse(socket, 'cluster:analytics', (msg: core.Message) => {
const data = msg.payload as i.ExecutionAnalyticsMessage;
this.handleResponse(socket, 'cluster:analytics', (msg: Message) => {
const data = msg.payload as ExecutionAnalyticsMessage;
if (!(data.kind in this.clusterAnalytics)) {
return;
}
const current = this.clusterAnalytics[data.kind as keyof i.ClusterAnalytics];
const current = this.clusterAnalytics[data.kind];

for (const [field, value] of Object.entries(data.stats)) {
if (field in current) {
current[field as keyof AggregatedExecutionAnalytics] += value;
if (isKey(current, field)) {
current[field] += value;
}
}

Expand Down
10 changes: 4 additions & 6 deletions packages/utils/src/arrays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { TypedArray } from '@terascope/types';
import { Many, ListOfRecursiveArraysOrValues } from './interfaces.js';
import { get } from './deps.js';
import { isBuffer } from './buffers.js';
import { isKey } from './objects.js';

/** A native implementation of lodash flatten */
export function flatten<T>(val: Many<T[]>): T[] {
Expand Down Expand Up @@ -66,12 +67,9 @@ export function sortBy<T, V = any>(
fnOrPath: ((value: T) => V) | string,
): T[] {
return sort(arr, (a, b) => {
const aVal = _getValFnOrPath(a, fnOrPath);
const bVal = _getValFnOrPath(b, fnOrPath);
if (
numLike[typeof aVal as keyof typeof numLike]
&& numLike[typeof bVal as keyof typeof numLike]
) {
const aVal = _getValFnOrPath<T, V>(a, fnOrPath);
const bVal = _getValFnOrPath<T, V>(b, fnOrPath);
if (isKey(numLike, typeof aVal) && isKey(numLike, typeof bVal)) {
return (aVal as any) - (bVal as any);
}
if (aVal < bVal) {
Expand Down
5 changes: 3 additions & 2 deletions packages/utils/src/booleans.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { getTypeOf } from './deps.js';
import { isKey } from './objects.js';
/**
* Convert any input into a boolean, this will work with stringified boolean
*
Expand Down Expand Up @@ -38,7 +39,7 @@ const _truthy = Object.freeze({
export function isTruthy(input: unknown): boolean {
if (input === true) return true;
const val = typeof input === 'string' ? input.trim().toLowerCase() : String(input);
return _truthy[val as keyof typeof _truthy] === true;
return isKey(_truthy, val);
}

/**
Expand All @@ -47,7 +48,7 @@ export function isTruthy(input: unknown): boolean {
export function isFalsy(input: unknown): boolean {
if (input === false || input == null || input === '') return true;
const val = typeof input === 'string' ? input.trim().toLowerCase() : String(input);
return _falsy[val as keyof typeof _falsy] === true;
return isKey(_falsy, val);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/utils/src/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import geoHash from 'latlon-geohash';
import pMap from 'p-map';
import { AnyObject } from './interfaces.js';
import { DataEntity } from './entities/index.js';
import { isKey } from './objects.js';

/**
* Detect if an object created by Object.create(null)
Expand Down Expand Up @@ -67,7 +68,8 @@ const _cloneTypeHandlers = Object.freeze({
});

export function cloneDeep<T = any>(input: T): T {
const handler = _cloneTypeHandlers[kindOf(input) as keyof typeof _cloneTypeHandlers] || clone;
const kind = kindOf(input);
const handler = isKey(_cloneTypeHandlers, kind) ? _cloneTypeHandlers[kind] : clone;
return handler(input);
}

Expand Down
Loading

0 comments on commit 1c8b870

Please sign in to comment.