-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.ts
197 lines (181 loc) · 6.07 KB
/
core.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/**
* Async function that waits for a specified number of milliseconds to resolve.
*
* @param ms Number of milliseconds to wait.
* @param signal Optional signal to abort the wait.
* @returns A promise that resolves after the specified number of milliseconds.
* @throws Only if the signal is aborted before the timeout.
*/
const wait = async (ms: number, signal?: AbortSignal): Promise<void> =>
new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(signal.reason as Error)
return
}
const timer = setTimeout(resolve, ms)
signal?.addEventListener('abort', () => {
clearTimeout(timer)
reject(signal.reason as Error)
})
})
/**
* A state object that is passed to each retry policy function. A new RetryState
* is initialized on each async operation call. An operation is considered to be
* the entire sequence of attempts, including the first one.
*/
export type RetryState = {
/**
* The number of attempts that have been made. Starts at 1, after invoking the
* function for the first time.
*/
attempt: number
/**
* The delay applied to the last attempt. Starts at 0, before invoking the
* function for the first time.
*/
delay: number
/**
* The time elapsed since the first attempt.
*/
elapsed: number
/**
* The error that was thrown on the last attempt.
*/
error: unknown
/**
* An array of all errors that have been thrown so far.
*/
errors: unknown[]
/**
* The time when the first attempt was made.
*/
start: number
}
/**
* A function to be called after each failed attempt to determine the delay
* before the next attempt. The function should return a number of milliseconds
* to wait before the next attempt, or throw an error to give up. You can also
* use this function to implement custom retry logic, run side effects
* (e.g. logging), add special handling for specific errors, wrap errors in
* custom types, etc.
*
* The function will receive the current state of the retry operation, and the
* next policy in the chain (if any). If provided, the policy can invoke the
* next policy in the chain or not, if called, it can respect the delay value it
* returned or return its own. In either case, when a policy function throws an
* error, the async operation will be aborted and the error will be propagated
* to the caller.
*/
export type RetryPolicy = (
state: Readonly<RetryState>,
next?: RetryPolicy,
) => number
export type PolicyList =
| [RetryPolicy, RetryPolicy, ...(RetryPolicy | RetryPolicy[])[]]
| [[RetryPolicy], RetryPolicy, ...(RetryPolicy | RetryPolicy[])[]]
| [
[RetryPolicy],
[RetryPolicy, ...RetryPolicy[]],
...(RetryPolicy | RetryPolicy[])[],
]
| [
RetryPolicy,
[RetryPolicy, ...RetryPolicy[]],
...(RetryPolicy | RetryPolicy[])[],
]
| [
[RetryPolicy, RetryPolicy, ...RetryPolicy[]],
...(RetryPolicy | RetryPolicy[])[],
]
/**
* Joins multiple retry policies into a single policy. The policies will be
* called from left to right, each with the next policy as the second argument.
*
* @param policies The policies to join.
* @returns A new policy that is the result of joining the provided policies.
*/
export const join = (...policies: PolicyList): RetryPolicy =>
policies.flat().reduceRight((next, policy) => (state) => policy(state, next))
export type AbortWrapper<
Args extends unknown[],
Input extends unknown[],
Output,
> = Args extends [AbortSignal]
? (...args: Input) => Promise<Output>
: Promise<Output>
export type WrappedFunction<Input extends unknown[], Output> = <
Args extends [AbortSignal] | Input,
>(
...args: Args
) => AbortWrapper<Args, Input, Output>
// TypeScript is not smart enough to narrow down the type of args when doing
// this check inline, so it has to be done in a separate guard function.
const isAbortArgs = (args: unknown[]): args is [AbortSignal] =>
args.length === 1 && args[0] instanceof AbortSignal
/**
* Wrap an async function with a retry policy. The returned function has the
* same signature as the input function, but will retry the operation according
* to the provided policy if an error is thrown.
*
* The returned function is overloaded to accept an AbortSignal as its only
* argument, in such case, a function is returned with the original signature.
*
* @example
* import { core as wrap } from 'retryyy/core'
* import type RetryPolicy from 'retryyy/core'
*
* const simpleExamplePolicy: RetryPolicy = ({ attempt, error }) => {
* // Give up after 3 tries.
* if (attempt > 3) {
* throw error
* }
*
* // Linear backoff, waits 1s, 2s, 3s, 4s, etc.
* return attempt * 1000
* }
*
* type UserShape = { id: number; name: string }
*
* export const fetchUser = wrap(async (id: number) => {
* const res = await fetch(`https://jsonplaceholder.typicode.com/users/${id}`)
* return await res.json() as UserShape
* }, simpleExamplePolicy)
*/
export const core =
<Input extends unknown[], Output>(
fn: (...input: Input) => Promise<Output>,
policy: RetryPolicy,
): WrappedFunction<Input, Output> =>
<Args extends Input | [AbortSignal]>(...args: Args) => {
let signal: AbortSignal | undefined
if (isAbortArgs(args)) {
signal = args[0]
return exec as AbortWrapper<Args, Input, Output>
}
async function exec(...input: Input): Promise<Output> {
const state: RetryState = {
attempt: 0,
delay: 0,
elapsed: 0,
error: null,
errors: [],
start: Date.now(),
}
for (;;) {
try {
return await fn(...input)
} catch (error: unknown) {
state.attempt += 1
state.elapsed = Date.now() - state.start
state.error = error
state.errors.push(error)
// This sets the "previous delay" for the next time the policy is
// evaluated. It is important to set the other state fields before
// invoking the policy.
state.delay = policy(state)
await wait(state.delay, signal)
}
}
}
return exec(...(args as Input)) as AbortWrapper<Args, Input, Output>
}