Skip to content

Commit

Permalink
refactor(spawn): remove onStdout and onStderr handlers in favor o…
Browse files Browse the repository at this point in the history
…r `on('stdout', () => {...})`
  • Loading branch information
antongolub committed Mar 15, 2024
1 parent 76441c5 commit 7950a6c
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 37 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,7 @@ const { error } = await p
error.message // 'The operation was aborted'
```

- [ ] Stdout limit

## License
[MIT](./LICENSE)
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"build:stamp": "npx buildstamp",
"test": "concurrently 'npm:test:*'",
"test:lint": "eslint -c src/test/lint/.eslintrc.json src",
"test:unit": "c8 -r lcov -r text -o target/coverage -x src/scripts -x src/test -x target node --loader ts-node/esm --experimental-specifier-resolution=node src/scripts/test.mjs"
"test:unit": "c8 -r lcov -r text -o target/coverage -x src/scripts -x src/test -x target node --loader ts-node/esm --experimental-specifier-resolution=node src/scripts/test.mjs",
"publish:draft": "yarn build && npm publish --no-git-tag-version"
},
"repository": {
"type": "git",
Expand Down
1 change: 0 additions & 1 deletion src/main/ts/foo.ts

This file was deleted.

73 changes: 49 additions & 24 deletions src/main/ts/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as cp from 'node:child_process'
import process from 'node:process'
import { Readable, Writable, Stream, Transform } from 'node:stream'
import { assign, noop } from './util.js'
import EventEmitter from 'node:events'

export type TSpawnError = any

Expand All @@ -25,14 +26,16 @@ export type TChild = ReturnType<typeof cp.spawn>
export type TInput = string | Buffer | Stream

export interface TSpawnCtxNormalized {
id: string,
cwd: string
cmd: string
sync: boolean
args: ReadonlyArray<string>
input: TInput | null
env: Record<string, string | undefined>
stdio: ['pipe', 'pipe', 'pipe']
detached: boolean
env: Record<string, string | undefined>
ee: EventEmitter
ac: AbortController
shell: string | true | undefined
spawn: typeof cp.spawn
Expand All @@ -48,16 +51,17 @@ export interface TSpawnCtxNormalized {
fulfilled?: TSpawnResult
error?: any
run: (cb: () => void, ctx: TSpawnCtxNormalized) => void
// kill: (signal: number) => void
}

export const normalizeCtx = (...ctxs: TSpawnCtx[]): TSpawnCtxNormalized => assign({
id: Math.random().toString(36).slice(2),
cmd: '',
cwd: process.cwd(),
sync: false,
args: [],
input: null,
env: process.env,
ee: new EventEmitter(),
ac: new AbortController(),
detached: true,
shell: true,
Expand Down Expand Up @@ -112,11 +116,14 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => {
if (c.sync) {
const opts = buildSpawnOpts(c)
const result = c.spawnSync(c.cmd, c.args, opts)

c.stdout.write(result.stdout)
c.stderr.write(result.stderr)
c.onStdout(result.stdout)
c.onStderr(result.stderr)
if (result.stdout.length) {
c.stdout.write(result.stdout)
c.ee.emit('stdout', result.stdout, c)
}
if (result.stderr.length) {
c.stderr.write(result.stderr)
c.ee.emit('stderr', result.stderr, c)
}
c.callback(null, c.fulfilled = {
...result,
stdout: result.stdout.toString(),
Expand All @@ -126,6 +133,7 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => {
duration: Date.now() - now,
_ctx: c
})
c.ee.emit('end', c.fulfilled, c)

} else {
c.run(() => {
Expand All @@ -138,7 +146,9 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => {
const child = c.spawn(c.cmd, c.args, opts)
c.child = child

opts.signal.addEventListener('abort', () => {
opts.signal.addEventListener('abort', event => {
c.ee.emit('abort', event, c)

if (opts.detached && child.pid) {
try {
// https://github.com/nodejs/node/issues/51766
Expand All @@ -150,23 +160,36 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => {
})
processInput(child, c.input || c.stdin)

child.stdout.pipe(c.stdout).on('data', (d) => { stdout.push(d); stdall.push(d); c.onStdout(d) })
child.stderr.pipe(c.stderr).on('data', (d) => { stderr.push(d); stdall.push(d); c.onStderr(d) })
child.on('error', (e) => error = e)
// child.on('exit', (_status) => status = _status)
child.on('close', (status, signal) => {
c.callback(error, c.fulfilled = {
error,
status,
signal,
stdout: stdout.join(''),
stderr: stderr.join(''),
stdall: stdall.join(''),
stdio: [c.stdin, c.stdout, c.stderr],
duration: Date.now() - now,
_ctx: c
})
child.stdout.pipe(c.stdout).on('data', d => {
stdout.push(d)
stdall.push(d)
c.ee.emit('stdout', d, c)
})
child.stderr.pipe(c.stderr).on('data', d => {
stderr.push(d)
stdall.push(d)
c.ee.emit('stderr', d, c)
})
child
.on('error', (e: any) => {
error = e
c.ee.emit('err', error, c)
})
// .on('exit', (_status) => status = _status)
.on('close', (status, signal) => {
c.callback(error, c.fulfilled = {
error,
status,
signal,
stdout: stdout.join(''),
stderr: stderr.join(''),
stdall: stdall.join(''),
stdio: [c.stdin, c.stdout, c.stderr],
duration: Date.now() - now,
_ctx: c
})
c.ee.emit('end', c.fulfilled, c)
})
}, c)
}
} catch (error: unknown) {
Expand All @@ -184,6 +207,8 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => {
_ctx: c
}
)
c.ee.emit('err', error, c)
c.ee.emit('end', c.fulfilled, c)
}

return c
Expand Down
3 changes: 2 additions & 1 deletion src/main/ts/x.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
TZurk,
TZurkPromise,
TZurkOptions,
TZurkCtx
TZurkCtx, TZurkListener
} from './zurk.js'
import { type Promisified, type TVoidCallback, isPromiseLike, isStringLiteral, assign, quote } from './util.js'
import { pipeMixin } from './mixin/pipe.js'
Expand Down Expand Up @@ -51,6 +51,7 @@ export type TShellOptions = Omit<TZurkOptions, 'input'> & {
export interface TShellResponse extends Omit<Promisified<TZurk>, 'stdio' | '_ctx'>, Promise<TZurk & TShellResponseExtra<TShellResponse>>, TShellResponseExtra<TShellResponse> {
stdio: [Readable | Writable, Writable, Writable]
_ctx: TShellCtx
on: (event: string | symbol, listener: TZurkListener) => TShellResponse
}

export interface TShellResponseSync extends TZurk, TShellResponseExtra<TShellResponseSync> {
Expand Down
26 changes: 20 additions & 6 deletions src/main/ts/zurk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,26 @@ import {
TSpawnCtxNormalized,
TSpawnResult,
} from './spawn.js'
import { isPromiseLike, makeDeferred, type Promisified } from './util.js'
import { isPromiseLike, makeDeferred, type Promisified, type TVoidCallback } from './util.js'

export const ZURK = Symbol('Zurk')

export type TZurkListener = (value: any, ctx: TZurkCtx) => void

export interface TZurk extends TSpawnResult {
_ctx: TZurkCtx
on(event: string | symbol, listener: TZurkListener): TZurk
}

export type TZurkCtx = TSpawnCtxNormalized & { nothrow?: boolean, nohandle?: boolean }

export type TZurkOptions = Partial<Omit<TZurkCtx, 'callback'>>

export type TZurkPromise = Promise<TZurk> & Promisified<TZurk> & { _ctx: TZurkCtx, stdio: TZurkCtx['stdio'] }
export type TZurkPromise = Promise<TZurk> & Promisified<TZurk> & {
_ctx: TZurkCtx
stdio: TZurkCtx['stdio']
on(event: string | symbol, listener: TZurkListener): TZurkPromise
}

export const zurk = <T extends TZurkOptions = TZurkOptions, R = T extends {sync: true} ? TZurk : TZurkPromise>(opts: T): R =>
(opts.sync ? zurkSync(opts) : zurkAsync(opts)) as R
Expand Down Expand Up @@ -56,22 +63,28 @@ export const zurkSync = (opts: TZurkOptions): TZurk => {
}

// eslint-disable-next-line sonarjs/cognitive-complexity
export const zurkifyPromise = (target: Promise<TZurk> | TZurkPromise, ctx: TSpawnCtxNormalized) => isPromiseLike(target) && !util.types.isProxy(target)
? new Proxy(target, {
export const zurkifyPromise = (target: Promise<TZurk> | TZurkPromise, ctx: TSpawnCtxNormalized) => {
if (!isPromiseLike(target) || util.types.isProxy(target)) {
return target as TZurkPromise
}
const proxy = new Proxy(target, {
get(target: Promise<TZurk>, p: string | symbol, receiver: any): any {
if (p === ZURK) return ZURK
if (p === 'then') return target.then.bind(target)
if (p === 'catch') return target.catch.bind(target)
if (p === 'finally') return target.finally.bind(target)
if (p === 'stdio') return ctx.stdio
if (p === '_ctx') return ctx
if (p === ZURK) return ZURK
if (p === 'on') return function (name: string, cb: VoidFunction){ ctx.ee.on(name, cb); return proxy }

if (p in target) return Reflect.get(target, p, receiver)

return target.then(v => Reflect.get(v, p, receiver))
}
}) as TZurkPromise
: target as TZurkPromise

return proxy
}

export const getError = (data: TSpawnResult) => {
if (data.error) return data.error
Expand All @@ -93,6 +106,7 @@ class Zurk implements TZurk {
constructor(ctx: TZurkCtx) {
this._ctx = ctx
}
on(name: string, cb: TVoidCallback): this { this._ctx.ee.on(name, cb); return this }
get status() { return this._ctx.fulfilled?.status ?? null }
get signal() { return this._ctx.fulfilled?.signal ?? null }
get error() { return this._ctx.error }
Expand Down
9 changes: 6 additions & 3 deletions src/test/ts/spawn.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import * as assert from 'node:assert'
import { describe, it } from 'node:test'
import EventEmitter from 'node:events'
import { invoke, normalizeCtx, TSpawnCtx, TSpawnResult } from '../../main/ts/spawn.js'
import { makeDeferred } from '../../main/ts/util.js'

describe('invoke()', () => {
it('calls a given cmd', async () => {
const results = []
const results: string[] = []
const callback: TSpawnCtx['callback'] = (_err, result) => results.push(result.stdout)
const { promise, resolve, reject } = makeDeferred<TSpawnResult>()

invoke(normalizeCtx({
sync: true,
cmd: 'echo',
args: ['hello'],
callback
callback,
}))

invoke(normalizeCtx({
Expand All @@ -22,7 +23,7 @@ describe('invoke()', () => {
args: ['world'],
callback(err, result) {
err ? reject(err) : resolve(result)
}
},
}))

await promise.then((result) => callback(null, result))
Expand Down Expand Up @@ -61,5 +62,7 @@ describe('normalizeCtx()', () => {
assert.equal(normalized.cwd, 'a')
assert.equal(normalized.cwd, 'b')
assert.equal(normalized.cwd, 'c')
assert.ok(normalized.ee instanceof EventEmitter)
assert.ok(normalized.ac instanceof AbortController)
})
})
7 changes: 6 additions & 1 deletion src/test/ts/x.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ describe('mixins', () => {

it('handles `abort`', async () => {
const p = $({nothrow: true})`sleep 10`
const events: any[] = []

setTimeout(() => p.abort(), 25)
p
.on('abort', () => events.push('abort'))
.on('end', () => events.push('end'))

const { error } = await p

assert.equal(error.message, 'The operation was aborted')
assert.deepEqual(events, ['abort', 'end'])
})
})

Expand Down

0 comments on commit 7950a6c

Please sign in to comment.