Async
之前的内容还挺简单,这一节内容的 async 对于异步的封装明显实用性和复杂度上了个台阶。虽然难啃,但确实很有用。我们继续开始吧
all
all 函数类似于内置的 Promise.all 或 Promise.allSettled 函数。给定一个 Promise 列表(或对象),如果抛出任何错误,所有错误都会被收集并抛出到 AggregateError 中。
将数组作为参数传递将按照相同的顺序将已解析的 Promise 值作为数组返回。
1 2 3 4 5 6 7
| import { all } from 'radash'
const [user] = await all([ api.users.create(...), s3.buckets.create(...), slack.customerSuccessChannel.sendMessage(...) ])
|
将对象作为参数传递将返回一个与已解析的 Promise 值具有相同键和值的对象。
1 2 3 4 5 6 7
| import { all } from 'radash'
const { user } = await all({ user: api.users.create(...), bucket: s3.buckets.create(...), message: slack.customerSuccessChannel.sendMessage(...) })
|
该函数接受一个Promise数组或一个包含Promise的记录器,并返回一个包含所有Promise结果的Promise。如果任何错误发生,它们将被收集并抛出一个AggregateError。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
export async function all<T extends [Promise<any>, ...Promise<any>[]]>( promises: T ): Promise<PromiseValues<T>>
export async function all<T extends Promise<any>[]>( promises: T ): Promise<PromiseValues<T>>
export async function all<T extends Record<string, Promise<any>>>( promises: T ): Promise<{ [K in keyof T]: Awaited<T[K]> }> export async function all< T extends Record<string, Promise<any>> | Promise<any>[] >(promises: T) { const entries = isArray(promises) ? promises.map(p => [null, p] as [null, Promise<any>]) : Object.entries(promises) const results = await Promise.all( entries.map(([key, value]) => value .then(result => ({ result, exc: null, key })) .catch(exc => ({ result: null, exc, key })) ) )
const exceptions = results.filter(r => r.exc)
if (exceptions.length > 0) { throw new AggregateError(exceptions.map(e => e.exc)) }
if (isArray(promises)) { return results.map(r => r.result) as T extends Promise<any>[] ? PromiseValues<T> : unknown }
return results.reduce( (acc, item) => ({ ...acc, [item.key!]: item.result }), {} as { [K in keyof T]: Awaited<T[K]> } ) }
|
这段代码实现了一个功能强大的异步操作函数,能同时处理多个 Promise 对象并返回它们的结果,设计了多种输入类型的情况下的处理逻辑,确保了操作的稳定性和灵活性
defer
运行带有延迟函数的异步函数
_.defer 函数允许您运行一个异步函数,同时注册应该推迟执行的函数直到异步函数完成,然后执行这些函数。在脚本中,这非常有用,因为在或之后的特定点失败将需要一些清理工作。这有点类似于 finally 块。
传递给 _.defer
的函数带有一个注册函数参数,您可以使用它来注册希望在函数完成时调用的工作。如果您的函数抛出错误,然后注册的清理函数也抛出错误,默认情况下会被忽略。注册函数支持一个可选的第二个 options 参数,让您配置一个重新抛出策略,以便清理函数中的错误被重新抛出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import { defer } from 'radash'
await defer(async (cleanup) => { const buildDir = await createBuildDir()
cleanup(() => fs.unlink(buildDir))
await build() })
await defer(async (register) => { const org = await api.org.create() register(async () => api.org.delete(org.id), { rethrow: true })
const user = await api.user.create() register(async () => api.users.delete(user.id), { rethrow: true })
await executeTest(org, user) })
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
export const defer = async <TResponse>( func: ( register: ( fn: (error?: any) => any, options?: { rethrow?: boolean } ) => void ) => Promise<TResponse> ): Promise<TResponse> => { const callbacks: { fn: (error?: any) => any rethrow: boolean }[] = []
const register = ( fn: (error?: any) => any, options?: { rethrow?: boolean } ) => callbacks.push({ fn, rethrow: options?.rethrow ?? false })
const [err, response] = await tryit(func)(register)
for (const { fn, rethrow } of callbacks) { const [rethrown] = await tryit(fn)(err) if (rethrown && rethrow) throw rethrown } if (err) throw err return response }
|
主要原理是通过 register 函数来注册回调函数,在异步函数执行完成后依次执行这些回调函数,同时处理可能的异常情况。这种方式可以方便地延迟执行一系列函数,并在特定时机触发它们的执行,同时处理潜在的错误情况
guard
如果函数出错则返回未定义
如果异步函数出错,您可以设置默认值。
1 2 3 4 5
| const users = (await guard(fetchUsers)) ?? []
const isInvalidUserError = (err: any) => err.code === 'INVALID_ID' const user = (await guard(fetchUser, isInvalidUserError)) ?? DEFAULT_USER
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| export const guard = <TFunction extends () => any>( func: TFunction, shouldGuard?: (err: any) => boolean ): ReturnType<TFunction> extends Promise<any> ? Promise<Awaited<ReturnType<TFunction>> | undefined> : ReturnType<TFunction> | undefined => {
const _guard = (err: any) => { if (shouldGuard && !shouldGuard(err)) throw err return undefined as any }
const isPromise = (result: any): result is Promise<any> => result instanceof Promise
try { const result = func() return isPromise(result) ? result.catch(_guard) : result } catch (err) { return _guard(err) } }
|
这段代码实现了一个异常捕获和处理的功能,根据传入的条件和函数返回值的类型来决定是否对异常进行处理,以及如何返回结果。这种机制可以帮助在执行函数时对可能抛出的异常进行判断和处理,保证程序的稳定性和健壮性
map
使用异步函数映射数组
处理返回 Promise 的回调函数的映射。
1 2 3 4 5 6 7
| import { map } from 'radash'
const userIds = [1, 2, 3, 4]
const users = await map(userIds, async (userId) => { return await api.users.find(userId) })
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| export const map = async <T, K>( array: readonly T[], asyncMapFunc: (item: T, index: number) => Promise<K> ): Promise<K[]> => { if (!array) return [] let result = [] let index = 0
for (const value of array) { const newValue = await asyncMapFunc(value, index++) result.push(newValue) } return result }
|
这段代码实现了一个异步的数组映射功能,遍历输入的数组,对每个元素应用异步映射函数,并将结果存储在一个新的数组中返回。这有助于在处理需要异步转换的数据集时,保持代码的简洁性和可读性。
parallel
并行运行多个异步函数
与 _.map 类似,但专门为并行运行异步回调函数而构建。第一个参数是允许同时运行的函数数量的限制。返回结果数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import { parallel, try as tryit } from 'radash'
const userIds1 = [1, 2, 3, 4, 5, 6, 7, 8, 9]
const users1 = await parallel(3, userIds1, async (userId) => { return await api.users1.find(userId) })
const userIds = [1, 2, 3]
const [err, users] = await tryit(parallel)(3, userIds, async (userId) => { throw new Error(`No, I don\'t want to find user ${userId}`) })
console.log(err) console.log(err.errors) console.log(err.errors[1].message)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| export const parallel = async <T, K>( limit: number, array: readonly T[], func: (item: T) => Promise<K> ): Promise<K[]> => {
const work = array.map((item, index) => ({ index, item }))
const processor = async ( res: (value: WorkItemResult<K>[]) => void ) => {
const results: WorkItemResult<K>[] = []
while (true) { const next = work.pop() if (!next) return res(results)
const [error, result] = await tryit(func)(next.item)
results.push({ error, result: result as K, index: next.index }) } }
const queues = list(1, limit).map(() => new Promise(processor))
const itemResults = (await Promise.all(queues)) as WorkItemResult<K>[][]
const [errors, results] = fork( sort(itemResults.flat(), r => r.index), x => !!x.error )
if (errors.length > 0) { throw new AggregateError(errors.map(error => error.error)) }
return results.map(r => r.result) }
|
reduce
使用异步函数减少数组
处理返回承诺的回调函数的reduce。
1 2 3 4 5 6 7 8 9 10 11
| import { reduce } from 'radash'
const userIds = [1, 2, 3, 4]
const users = await reduce(userIds, async (acc, userId) => { const user = await api.users.find(userId) return { ...acc, [userId]: user } }, {})
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| export const reduce = async <T, K>( array: readonly T[], asyncReducer: (acc: K, item: T, index: number) => Promise<K>, initValue?: K ): Promise<K> => { const initProvided = initValue !== undefined
if (!initProvided && array?.length < 1) { throw new Error('Cannot reduce empty array with no init value') }
const iter = initProvided ? array : array.slice(1) let value: any = initProvided ? initValue : array[0]
for (const [i, item] of iter.entries()) { value = await asyncReducer(value, item, i) }
return value }
|
这段代码实现了一个异步reduce函数,可以对数组中的每个元素依次应用一个异步 reduce 函数,得到最终的结果值。适用于需要在处理过程中保持异步性质的情况,例如异步处理数组中的数据并将它们合并为一个结果
sleep
异步等待时间过去
_.sleep 函数允许您以毫秒为单位进行延迟。
1 2 3
| import { sleep } from 'radash'
await sleep(2000)
|
1 2 3 4 5 6 7
| export const sleep = (milliseconds: number) => { return new Promise(res => setTimeout(res, milliseconds)) }
|
简单却实用。sleep 函数可用于实现延迟执行,比如在编写异步代码时需要等待一段时间后再执行下一步操作,或者在测试代码中模拟延迟情况。通常在需要等待或延迟执行的情况下会用到类似的实现。
retry
运行异步函数,如果失败则重试
_.retry
函数允许您运行异步函数并在失败时自动重试。给定要运行的异步函数、可选的最大重试次数 (r) 以及重试之间的可选延迟毫秒数 (d),将调用给定的异步函数,重试 r 多次,并在重试之间等待 d 毫秒。
times 选项默认为 3。delay 选项(默认为 null)可以指定尝试之间休眠的毫秒数。
退避选项类似于延迟,但使用一个函数来休眠——可以轻松实现指数退避。
1 2 3 4 5 6 7 8
| import { retry } from 'radash'
await retry({}, api.users.list) await retry({ times: 10 }, api.users.list) await retry({ times: 2, delay: 1000 }, api.users.list)
await retry({ backoff: i => 10**i }, api.users.list)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| export const retry = async <TResponse>( options: { times?: number delay?: number | null backoff?: (count: number) => number }, func: (exit: (err: any) => void) => Promise<TResponse> ): Promise<TResponse> => {
const times = options?.times ?? 3
const delay = options?.delay
const backoff = options?.backoff ?? null
for (const i of range(1, times)) { const [err, result] = (await tryit(func)((err: any) => { throw { _exited: err } })) as [any, TResponse]
if (!err) return result
if (err._exited) throw err._exited
if (i === times) throw err
if (delay) await sleep(delay)
if (backoff) await sleep(backoff(i)) }
return undefined as unknown as TResponse }
|
这段代码实现了一个具有重试功能的异步操作函数。通过设置重试次数、延迟和退避策略,实现在出现错误时重复执行异步操作,直至成功或达到最大重试次数。这对于处理一些不稳定的操作或需要容错处理的异步任务非常有用
tryit
将函数转换为错误优先函数
错误优先回调真的太酷了(这梗时效期应该过了… = =) 在执行 try/catch 时使用可变变量来提升状态并不酷。
tryit 函数让您可以包装一个函数,将其转换为错误优先函数。适用于异步和同步功能。
1 2 3 4 5 6 7 8
| import { tryit } from 'radash'
const userId = 'xxx' const [err, user] = await tryit(api.users.find)(userId)
const findUser = tryit(api.users.find) const [err, user] = await findUser(userId)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| export const tryit = <Args extends any[], Return>( func: (...args: Args) => Return ) => { return ( ...args: Args ): Return extends Promise<any> ? Promise<[Error, undefined] | [undefined, Awaited<Return>]> : [Error, undefined] | [undefined, Return] => { try { const result = func(...args)
if (isPromise(result)) { return result .then(value => [undefined, value]) .catch(err => [err, undefined]) as Return extends Promise<any> ? Promise<[Error, undefined] | [undefined, Awaited<Return>]> : [Error, undefined] | [undefined, Return] }
return [undefined, result] as Return extends Promise<any> ? Promise<[Error, undefined] | [undefined, Awaited<Return>]> : [Error, undefined] | [undefined, Return] } catch (err) { return [err as any, undefined] as Return extends Promise<any> ? Promise<[Error, undefined] | [undefined, Awaited<Return>]> : [Error, undefined] | [undefined, Return] } } }
|
这个 tryit 函数会执行传入的函数,并根据函数的返回值类型是 Promise 还是普通值,返回不同格式的结果数组。它允许在函数执行过程中处理同步和异步操作,并将结果进行适当的封装以便进行进一步处理