Skip to main content

interface AsyncStream<T>

A possibly infinite asynchronous sequence of elements of type T. See the Stream documentation and the AsyncStream API documentation

Companion namespace: AsyncStream

Type parameters

NameDescription
Tthe element type
example
const s1 = AsyncStream.empty<number>()
const s2 = AsyncStream.of(1, 3, 2)
const s3 = AsyncStream.from(Stream.range({ start: 10, amount: 15 }))

Methods

append

Returns the current stream succeeded by the given value

Definition

append(value: AsyncOptLazy<T>): AsyncStream.NonEmpty<T>;

Parameters

NameTypeDescription
valueAsyncOptLazy<T>the value to append
example
await AsyncStream.of(1, 2, 3).append(4).toArray()
// => [1, 2, 3, 4]
note

O(1)

assumeNonEmpty

Returns the stream as a non-empty instance.

Definition

assumeNonEmpty(): AsyncStream.NonEmpty<T>;

throws

RimbuError.EmptyCollectionAssumedNonEmptyError if the stream is known to be empty.

example
AsyncStream.from(Stream.range({ amount: 100 })).assumeNonEmpty()
// => type: AsyncStream.NonEmpty<number>
note

the function does not actually check if the stream is empty, so treat with extra care @note O(1)

asyncStream

Returns an async stream of elements of type T.

Definition

asyncStream(): this;

example
AsyncStream.of(1, 2, 3).asyncStream()
// => returns itself

collect

Returns an AsyncStream containing the resulting elements from applying the given collectFun to each element in this Stream.

Definition

collect<R>(collectFun: AsyncCollectFun<T, R>): AsyncStream<R>;

Type parameters

NameDescription
Rthe resulting element type

Parameters

NameTypeDescription
collectFunAsyncCollectFun<T, R>a potentially asynchronous function taking the parameters below and returning a new element or a skip token
- value: the next element
- index: the element index
- skip: an element that can be returned if the current element should be skipped
- halt: a function that, if called, ensures that no new elements are passed
example
await AsyncStream.of(1, 2, 3).collect(async (v, i, skip, halt) => {
if (i === 0) return skip;
if (i === 1) halt();
return String(v)
}).toArray();
// => ['1']
note

O(1)

concat

Returns an AsyncStream containing the elements of this stream followed by all elements produced by the others array of AsyncStreamSources.

Definitions

concat<T2 = T>(...others: ArrayNonEmpty<AsyncStreamSource.NonEmpty<T2>>): AsyncStream.NonEmpty<T | T2>;

concat<T2 = T>(...others: ArrayNonEmpty<AsyncStreamSource<T2>>): AsyncStream<T | T2>;

Type parameters

NameDefaultDescription
T2Tthe element type of the stream to concatenate

Parameters

NameTypeDescription
othersArrayNonEmpty<AsyncStreamSource.NonEmpty<T2>>a series of AsyncStreamSources to concatenate.
example
await AsyncStream.of(1, 2, 3).concat([4, 5], () => [6, 7]).toArray()
// [1, 2, 3, 4, 5, 6, 7]
note

O(1)

contains

Returns true if the AsyncStream contains given amount instances of given value, using given eq function.

Definition

contains(value: T, amount?: number, eq?: Eq<T>): Promise<boolean>;

Parameters

NameTypeDescription
valueTthe value to search for
amountnumber(default: 1) the amount of values the Stream should contain
eqEq<T>(optional) the equality function to use
example
const source = Stream.from('marmot')
await source.contains('m') // => true
await source.contains('m', 2) // => true
await source.contains('m', 3) // => false
await source.contains('q') // => false
note

O(N)

containsSlice

Returns true if this stream contains the same sequence of elements as the given source, false otherwise.

Definition

containsSlice(source: AsyncStreamSource.NonEmpty<T>, eq?: Eq<T>): Promise<boolean>;

Parameters

NameTypeDescription
sourceAsyncStreamSource.NonEmpty<T>a non-empty async stream source containing the element sequence to find
eqEq<T>(default: Eq.objectIs) the function to use to test element equality
example
await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4])
// => true
await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2])
// => false

count

Returns the amount of elements in the AsyncStream.

Definition

count(): Promise<number>;

example
await AsyncStream.of(1, 2, 3).count() // => 3
note

O(N) for most types of Stream @note be careful not to use on infinite streams

countElement

Returns the amount of elements that are equal according to the given eq to the given value in the AsyncStream.

Definition

countElement(value: T, eq?: Eq<T>): Promise<number>;

Parameters

NameTypeDescription
valueTthe value to compare to
eqEq<T>(optional) the Eq instance to use to test equality
example
await AsyncStream.of(1, 2, 3).countElement(2) // => 1
note

O(N) @note be careful not to use on infinite streams

countNotElement

Returns the amount of elements that are not equal according to the given eq to the given value in the AsyncStream.

Definition

countNotElement(value: T, eq?: Eq<T>): Promise<number>;

Parameters

NameTypeDescription
valueTthe value to compare to
eqEq<T>(optional) the Eq instance to use to test equality
example
await AsyncStream.of(1, 2, 3).countNotElement(2) // => 2
note

O(N) @note be careful not to use on infinite streams

distinctPrevious

Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements are compared using the optionally given eq equality function.

Definition

distinctPrevious(eq?: Eq<T>): AsyncStream<T>;

Parameters

NameTypeDescription
eqEq<T>(default: Eq.objectIs) the Eq instance to use to test equality of elements
example
await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
// => [1, 2, 3, 1]

drop

Returns an AsyncStream that skips the first amount elements of this stream and returns the rest.

Definition

drop(amount: number): AsyncStream<T>;

Parameters

NameTypeDescription
amountnumberthe amount of elements to skip
example
await AsyncStream.of(1, 2, 3).drop(1).toArray()   // => [2, 3]
note

O(N)

dropWhile

Returns an AsyncStream that contains the elements of this stream starting from the first element that does not satisfy given pred function.

Definition

dropWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element and its index
example
await AsyncStream.of(1, 2, 3).dropWhile(async v => v < 2).toArray()
// => [2, 3]
note

O(N)

elementAt

Returns the element in the AsyncStream at the given index, or a fallback value (default undefined) otherwise.

Definitions

elementAt(index: number): Promise<T | undefined>;

elementAt<O>(index: number, otherwise: AsyncOptLazy<O>): Promise<T | O>;

Parameters

NameTypeDescription
indexnumberthe index of the element to retrieve
example
await AsyncStream.of(1, 2, 3).elementAt(1)        // => 2
await AsyncStream.of(1, 2, 3).elementAt(5) // => undefined
await AsyncStream.of(1, 2, 3).elementAt(5, 'a') // => 'a'
await AsyncStream.of(1, 2, 3).elementAt(5, async () => 'a') // => 'a'
note

O(N) for most types of Stream

equals

Returns true if the sequence of elements in this stream are equal to the sequence in the other stream according to the provided eq function.

Definition

equals(other: AsyncStreamSource<T>, eq?: Eq<T>): Promise<boolean>;

Parameters

NameTypeDescription
otherAsyncStreamSource<T>the other stream to compare
eqEq<T>(optional) the equality function
example
await AsyncStream.of(1, 2, 3).equals([1, 2, 3])     // => true
await AsyncStream.of(1, 2, 3, 4).equals([1, 2, 3]) // => false
note

don't use on potentially infinite streams @note O(N)

every

Returns true if every element of the AsyncStream satifies given pred function.

Definition

every(pred: (value: T, index: number) => MaybePromise<boolean>): Promise<boolean>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element and its index
example
await AsyncStream.of(1, 2, 3).every((v, i) => v + i > 10)  // => false
await AsyncStream.of(1, 2, 3).every(async (v, i) => v + i < 10) // => true
note

O(N)

filter

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns true.

Definition

filter(pred: (value: T, index: number, halt: () => void) => MaybePromise<boolean>): AsyncStream<T>;

Parameters

NameTypeDescription
pred(value: T, index: number, halt: () => void) => MaybePromise<boolean>a potentially asynchronous function taking an element and its index, and returning true if the element should be included in the resulting stream.
note

O(1)

example
await AsyncStream.of(1, 2, 3).filter(async (v, i) => v + i !== 3).toArray()
// => [1, 3]

filterNot

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns false.

Definition

filterNot(pred: (value: T, index: number, halt: () => void) => MaybePromise<boolean>): AsyncStream<T>;

Parameters

NameTypeDescription
pred(value: T, index: number, halt: () => void) => MaybePromise<boolean>a potentially asynchronous function taking an element and its index, and returning false if the element should be included in the resulting stream.
note

O(1)

example
await AsyncStream.of(1, 2, 3).filterNot(async (v, i) => v + i !== 3).toArray()
// => [2]

filterNotPure

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns false.

Definition

filterNotPure<A extends readonly unknown[]>(pred: (value: T, ...args: A) => MaybePromise<boolean>, ...args: A): AsyncStream<T>;

Type parameters

NameConstraintsDescription
Areadonly unknown[]the type of the arguments to be passed to the pred function after each element

Parameters

NameTypeDescription
pred(value: T, ...args: A) => MaybePromise<boolean>a potentially asynchronous function taking an element and the optionally given args, and returning false if the element should be included in the resulting stream.
argsA(optional) the extra arguments to pass to the given mapFun
note

O(1)

example
await AsyncStream.of(1, 2, 3).filterNotPure(Object.is, 2).toArray()
// => [1, 3]

filterPure

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns true.

Definition

filterPure<A extends readonly unknown[]>(pred: (value: T, ...args: A) => MaybePromise<boolean>, ...args: A): AsyncStream<T>;

Type parameters

NameConstraintsDescription
Areadonly unknown[]the type of the arguments to be passed to the pred function after each element

Parameters

NameTypeDescription
pred(value: T, ...args: A) => MaybePromise<boolean>a potentially asynchronous function taking an element the optionaly given args, and returning true if the element should be included in the resulting stream.
argsA(optional) the extra arguments to pass to the given mapFun
note

O(1)

example
await AsyncStream.of(1, 2, 3).filterPure(Object.is, 2).toArray()
// => [2]

find

Returns the first element for which the given pred function returns true, or a fallback value otherwise.

Definitions

find(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance?: number): Promise<T | undefined>;

find<O>(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance: number |undefined, otherwise:AsyncOptLazy<O>): Promise<T | O>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element and its index
occurrancenumber(default: 1) the occurrance number to look for
example
const isEven = async (v: number) => v % 2 === 0
await AsyncStream.of(1, 2, 3, 4).find(isEven) // => 2
await AsyncStream.of(1, 2, 3, 4).find(isEven, 2) // => 4
await AsyncStream.of(1, 2, 3, 4).find(isEven, 3) // => undefined
await AsyncStream.of(1, 2, 3, 4).find(isEven, 3, 'a') // => 'a'
note

O(N)

first

Returns the first element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

Definitions

first(): Promise<T | undefined>;

first<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;

example
await AsyncStream.of(1, 2, 3).first()      // => 1
await AsyncStream.empty<number>().first() // => undefined
await AsyncStream.empty<number>().first(0) // => 0
await AsyncStream.empty<number>().first(async () => 0) // => 0
note

O(1)

flatMap

Returns an AsyncStream consisting of the concatenation of flatMapFun applied to each element.

Definition

flatMap<T2>(flatMapFun: (value: T, index: number, halt: () => void) => AsyncStreamSource<T2>): AsyncStream<T2>;

Type parameters

NameDescription
T2the resulting element type

Parameters

NameTypeDescription
flatMapFun(value: T, index: number, halt: () => void) => AsyncStreamSource<T2>a potentially asynchronous function receiving the inputs described below and returning a StreamSource of new elements
- value: the next element
- index: the index of the element
- halt: a function that, if called, ensures that no new elements are passed
note

O(1)

example
await AsyncStream.of(1, 2, 3).flatMap(async (v, i, halt) => {
if (i >= 1) halt();
return [v, i, v + i]
}).toArray()
// => [1, 0, 1, 2, 1, 3]

flatZip

Returns an AsyncStream consisting of the concatenation of flatMapFun applied to each element, zipped with the element that was provided to the function.

Definition

flatZip<T2>(flatMapFun: (value: T, index: number, halt: () => void) => AsyncStreamSource<T2>): AsyncStream<[T, T2]>;

Type parameters

NameDescription
T2the result element type

Parameters

NameTypeDescription
flatMapFun(value: T, index: number, halt: () => void) => AsyncStreamSource<T2>a function receiving the inputs described below and returning a StreamSource of new elements
- value: the next element
- index: the index of the element
- halt: a function that, if called, ensures that no new elements are passed
note

O(1)

example
await AsyncStream.of(1, 2, 3).flatZip((v, i, halt) => {
if (i >= 1) halt();
return [v, i, v + i]
}).toArray()
// => [[1, 1], [1, 0], [1, 1], [2, 2], [2, 1], [2, 3]]

fold

Returns the value resulting from applying the given the given next function to a current state (initially the given init value), and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned.

Definition

fold<R>(init: AsyncOptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => MaybePromise<R>): Promise<R>;

Type parameters

NameDescription
Rthe resulting type

Parameters

NameTypeDescription
initAsyncOptLazy<R>the initial result/state value
next(current: R, value: T, index: number, halt: () => void) => MaybePromise<R>a potentially asynchronous function taking the parameters below and returning the new result/state value
- current: the current result/state value, initially init.
- value: the next Stream value
- index: the index of the given value
- halt: a function that, if called, ensures that no new elements are passed
example
console.log(await AsyncStream.empty<number>().fold(5, async (current, value) => current + value))
// => 5
console.log(await AsyncStream.of(1, 2, 3).fold(() => 5, (current, value) => current + value))
// => 11 (= 5 + 1 + 2 + 3)

foldStream

Returns an AsyncStream containing the values resulting from applying the given the given next function to a current state (initially the given init value), and the next stream value, and returning the new state.

Definition

foldStream<R>(init: AsyncOptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => MaybePromise<R>): AsyncStream<R>;

Type parameters

NameDescription
Rthe resulting element type

Parameters

NameTypeDescription
initAsyncOptLazy<R>the initial result/state value
next(current: R, value: T, index: number, halt: () => void) => MaybePromise<R>a function taking the parameters below and returning the new result/state value
- current: the current result/state value, initially init.
- value: the next Stream value
- index: the index of the given value
- halt: a function that, if called, ensures that no new elements are passed
example
console.log(
await AsyncStream.empty<number>()
.foldStream(5, async (current, value) => current + value)
.toArray()
)
// => []
console.log(
await AsyncStream.of(1, 2, 3)
.foldStream(() => 5, (current, value) => current + value)
.toArray()
)
// => [6, 8, 11]

forEach

Performs given function f for each element of the Stream, using given state as initial traversal state.

Definition

forEach(f: (value: T, index: number, halt: () => void) => MaybePromise<void>, state?: TraverseState): Promise<void>;

Parameters

NameTypeDescription
f(value: T, index: number, halt: () => void) => MaybePromise<void>the potentially asynchronous function to perform for each element, receiving:
- value: the next element
- index: the index of the element
- halt: a function that, if called, ensures that no new elements are passed
stateTraverseState(optional) the traverse state
note

if f is an async function, each call will be awaited consecutively

example
await AsyncStream.of(1, 2, 3).forEach(async (v, i, halt) => {
console.log(v);
if (i >= 1) halt();
})
// => 1, 2
note

O(N)

forEachPure

Performs given function f for each element of the Stream, with the optionally given args as extra arguments.

Definition

forEachPure<A extends readonly unknown[]>(f: (value: T, ...args: A) => MaybePromise<void>, ...args: A): Promise<void>;

Type parameters

NameConstraintsDescription
Areadonly unknown[]the type of the arguments to be passed to the f function after each element

Parameters

NameTypeDescription
f(value: T, ...args: A) => MaybePromise<void>the potentially asynchronous function to perform for each element, optionally receiving given extra args.
argsA(optional) a list of extra arguments to pass to given f for each element
note

if f is an async function, each call will be awaited consecutively

example
await AsyncStream.of(1, 2, 3).forEachPure(console.log, 'sheep')
// => logs:
// 1 sheep
// 2 sheep
// 3 sheep
note

O(N)

indexed

Returns an AsyncStream where each element in this stream is paired with its index

Definition

indexed(startIndex?: number): AsyncStream<[number, T]>;

Parameters

NameTypeDescription
startIndexnumber(optional) an alternative start index to use
example
await AsyncStream.of(1, 2, 3).indexed().toArray()
// => [[0, 1], [1, 2], [2, 3]]
note

O(1)

indexOf

Returns the index of the occurrance instance of given searchValue in the AsyncStream, using given eq function, or undefined if no such value is found.

Definition

indexOf(searchValue: T, occurrance?: number, eq?: Eq<T>): Promise<number | undefined>;

Parameters

NameTypeDescription
searchValueTthe element to search for
occurrancenumber(default: 1) the occurrance to search for
eqEq<T>(optional) the equality function to use
example
const source = AsyncStream.from('marmot')
await source.indexOf('m') // => 0
await source.indexOf('m', 2) // => 3
await source.indexOf('m', 3) // => undefined
await source.indexOf('q') // => undefined
note

O(N)

indexWhere

Returns the index of the given occurrance instance of the element in the AsyncStream that satisfies given pred function, or undefined if no such instance is found.

Definition

indexWhere(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance?: number): Promise<number | undefined>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element and its index
occurrancenumber(default: 1) the occurrance to search for
example
await AsyncStream.of(1, 2, 3).indexWhere((v, i) => v + i > 2)      // => 1
await AsyncStream.of(1, 2, 3).indexWhere(async (v, i) => v + i > 2, 2) // => 2
note

O(N)

indicesOf

Returns an AsyncStream containing the indicies of the occurrance of the given searchValue, according to given eq function.

Definition

indicesOf(searchValue: T, eq?: Eq<T>): AsyncStream<number>;

Parameters

NameTypeDescription
searchValueTthe value to search for
eqEq<T>(optional) the equality function to use
example
await AsyncStream.from('marmot').indicesOf('m').toArray()
// => [0, 3]
note

O(N)

indicesWhere

Returns an AsyncStream containing the indices of the elements for which the given pred function returns true.

Definition

indicesWhere(pred: (value: T) => MaybePromise<boolean>): AsyncStream<number>;

Parameters

NameTypeDescription
pred(value: T) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element
example
await AsyncStream.of(1, 2, 3).indicesWhere((v, i) => v + i !== 3).toArray()
// => [0, 2]
note

O(N)

intersperse

Returns an AsyncStream with all elements from the given sep AsyncStreamSource between two elements of this stream.

Definition

intersperse(sep: AsyncStreamSource<T>): AsyncStream<T>;

Parameters

NameTypeDescription
sepAsyncStreamSource<T>the AsyncStreamSource to insert between each element of this Stream
example
await AsyncStream.of(1, 2, 3).intersperse("ab").toArray()
// => [1, 'a', 'b', 2, 'a', 'b', 3]
note

O(1)

join

Returns a string resulting from converting each element to string with options.valueToString, interspersed with options.sep, starting with options.start and ending with options.end.

Definition

join(options?: {
    sep?: string;
    start?: string;
    end?: string;
    valueToString?: (value: T) => MaybePromise<string>;
    ifEmpty?: string;
  }): Promise<string>;

Parameters

NameTypeDescription
options{
    sep?: string;
    start?: string;
    end?: string;
    valueToString?: (value: T) => MaybePromise<string>;
    ifEmpty?: string;
  }
(optional) object specifying the following properties
- sep: (optional) a seperator to insert between each Stream element
- start: (optional) a start string to prepend at the start
- end: (optional) an end string to append at the end
- valueToString: (default: String) a potentially asynchronous function converting a Stream element to a string
- ifEmpty: (optional) a string to return instead of the start and end tag if the stream is empty
example
await AsyncStream.of(1, 2, 3).join({ start: '<', sep: ', ', end: '>' })
// => '<1, 2, 3>'
note

O(N)

last

Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

Definitions

last(): Promise<T | undefined>;

last<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;

example
await AsyncStream.of(1, 2, 3).last()      // => 3
await AsyncStream.empty<number>().last() // => undefined
await AsyncStream.empty<number>().last(0) // => 0
await AsyncStream.empty<number>().last(async () => 0) // => 0
note

O(N)

map

Returns an AsyncStream where mapFun is applied to each element.

Definition

map<T2>(mapFun: (value: T, index: number) => MaybePromise<T2>): AsyncStream<T2>;

Type parameters

NameDescription
T2the resulting element type

Parameters

NameTypeDescription
mapFun(value: T, index: number) => MaybePromise<T2>a potentially asynchronous function taking an element and its index, and returning some new element
example
await AsyncStream.of(1, 2, 3).map(async (v, i) => `[${i}]: ${v}`).toArray()
// => ['[0]: 1', '[1]: 2', '[2]: 3']
note

O(1)

mapPure

Returns an AsyncStream where the given mapFun is applied to each value in the stream, with optionally as extra arguments the given args.

Definition

mapPure<T2, A extends readonly unknown[]>(mapFun: (value: T, ...args: A) => MaybePromise<T2>, ...args: A): AsyncStream<T2>;

Type parameters

NameConstraintsDescription
T2the result value type
Areadonly unknown[]the type of arguments to be supplied to the mapFun after each element

Parameters

NameTypeDescription
mapFun(value: T, ...args: A) => MaybePromise<T2>a potentially asynchronous function taking an element and the given args, and returning the resulting stream value
argsA(optional) the extra arguments to pass to the given mapFun
note

is mostly aimed to increase performance so that an extra function is not required @note can be used on function that really expect 1 argument, since the normal map will also pass more arguments

example
const s = AsyncStream.of({ a: 1 }, { a: 2, c: { d: true } })
const s2 = s.mapPure(JSON.stringify, ['a'], 5)
// when stream is evaluated, will call JSON.stringify on each stream element with the given extra arguments
console.log(await s2.toArray())
// => ["{\n \"a\": 1\n}", "{\n \"a\": 2\n}"]

max

Returns the maximum element of the AsyncStream according to a default compare function, or the provided otherwise fallback value if the stream is empty.

Definitions

max(): Promise<T | undefined>;

max<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;

example
await AsyncStream.of(5, 1, 3).max()         // => 1
await AsyncStream.empty<number>().max() // => undefined
await AsyncStream.empty<number>().max('a') // => 'a'
note

O(N)

maxBy

Returns the maximum element of the AsyncStream according to the provided compare function, or the provided otherwise fallback value if the stream is empty.

Definitions

maxBy(compare: (v1: T, v2: T) => number): Promise<T | undefined>;

maxBy<O>(compare: (v1: T, v2: T) => number, otherwise: AsyncOptLazy<O>): Promise<T | O>;

Parameters

NameTypeDescription
compare(v1: T, v2: T) => number
example
function compareLength(a: string, b: string): number { return b.length - a.length };
await AsyncStream.of('abc', 'a', 'ab').maxBy(compareLength) // => 'abc'
await AsyncStream.empty<string>().maxBy(compareLength) // => undefined
await AsyncStream.empty<string>().maxBy(compareLength, 'a') // => 'a'
note

O(N)

min

Returns the mimimum element of the AsyncStream according to a default compare function, or the provided otherwise fallback value if the stream is empty.

Definitions

min(): Promise<T | undefined>;

min<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;

example
await AsyncStream.of(5, 1, 3).min()         // => 1
await AsyncStream.empty<number>().min() // => undefined
await AsyncStream.empty<number>().min('a') // => 'a'
await AsyncStream.empty<number>().min(async () => 'a') // => 'a'
note

O(N)

minBy

Returns the mimimum element of the AsyncStream according to the provided compare function, or the provided otherwise fallback value if the stream is empty.

Definitions

minBy(compare: (v1: T, v2: T) => number): Promise<T | undefined>;

minBy<O>(compare: (v1: T, v2: T) => number, otherwise: AsyncOptLazy<O>): Promise<T | O>;

Parameters

NameTypeDescription
compare(v1: T, v2: T) => number
example
function compareLength(a: string, b: string): number { return b.length - a.length };
await AsyncStream.of('abc', 'a', 'ab').minBy(compareLength) // => 'a'
await AsyncStream.empty<string>().minBy(compareLength) // => undefined
await AsyncStream.empty<string>().minBy(compareLength, 'a') // => 'a'
note

O(N)

mkGroup

Returns an AsyncStream starting with options.sep, then returning the elements of this Stream interspersed with options.sep, and ending with options.end.

Definition

mkGroup(options: {
    sep?: AsyncStreamSource<T>;
    start?: AsyncStreamSource<T>;
    end?: AsyncStreamSource<T>;
  }): AsyncStream<T>;

Parameters

NameTypeDescription
options{
    sep?: AsyncStreamSource<T>;
    start?: AsyncStreamSource<T>;
    end?: AsyncStreamSource<T>;
  }
object specifying the following properties
- sep: (optional) a seperator StreamSource to insert between each Stream element
- start: (optional) a start StreamSource to prepend
- end: (optional) an end StreamSource to append
example
await AsyncStream.of(1, 2, 3).mkGroup({ start: '<<', sep: '-', end: '>>' }).toArray()
// => ['<', '<', 1, '-', 2, '-', 3, '>', '>']
note

O(N)

prepend

Returns the current stream preceded by the given value

Definition

prepend(value: AsyncOptLazy<T>): AsyncStream.NonEmpty<T>;

Parameters

NameTypeDescription
valueAsyncOptLazy<T>the value to prepend
example
await AsyncStream.of(1, 2, 3).prepend(0).toArray()
// => [0, 1, 2, 3]
note

O(1)

reduce

Applies the given (Async)Reducer to each element in the AsyncStream, and returns the final result.

Definition

reduce<R>(reducer: AsyncReducer<T, R>): Promise<R>;

Type parameters

NameDescription
Rthe resulting type

Parameters

NameTypeDescription
reducerAsyncReducer<T, R>the (Async)Reducer instance to use to apply to all stream elements.
example
console.log(await AsyncStream.of(1, 2, 4).reduce(Reducer.sum))
// => 7
console.log(await AsyncStream.of(1, 2, 4).reduce(Reducer.product))
// => 8

reduceAll

Returns a tuple where each tuple element corresponds to result of applying all AsyncStream elements to the corresponding (Async)Reducer instance of the given reducers.

Definition

reduceAll<R extends [unknown, unknown, ...unknown[]]>(...reducers: {
    [K in keyof R]: AsyncReducer<T, R[K]>;
  }): Promise<R>;

Type parameters

NameConstraintsDescription
R[unknown, unknown, ...unknown[]]the resulting tuple type

Parameters

NameTypeDescription
reducers{
    [K in keyof R]: AsyncReducer<T, R[K]>;
  }
a non-empty array of (Async)Reducer instances to use to apply to all stream elements.
note

all reducers are processed in parallel, thus only one traversal is needed

example
console.log(await AsyncStream.of(1, 2, 4).reduceAll(Reducer.sum, Reducer.product))
// => [7, 8]

reduceAllStream

Returns an AsyncStream of tuples where each tuple element corresponds to result of applying all stream elements to the corresponding (Async)Reducer instance of the given reducers. Returns one element per input stream element.

Definition

reduceAllStream<R extends [unknown, unknown, ...unknown[]]>(...reducers: {
    [K in keyof R]: AsyncReducer<T, R[K]>;
  }): AsyncStream<R>;

Type parameters

NameConstraintsDescription
R[unknown, unknown, ...unknown[]]the resulting tuple element type

Parameters

NameTypeDescription
reducers{
    [K in keyof R]: AsyncReducer<T, R[K]>;
  }
a non-empty array of (Async)Reducer instances to use to apply to all stream elements.
note

all reducers are processed in parallel, thus only one traversal is needed

example
console.log(
await ASyncStream.of(1, 2, 4)
.reduceAllStream(Reducer.sum, Reducer.product)
.toArray()
)
// => [[1, 1], [3, 2], [7, 8]]

reduceStream

Returns an AsyncStream where the given (Async)Reducer is applied to each element in the stream.

Definition

reduceStream<R>(reducer: AsyncReducer<T, R>): AsyncStream<R>;

Type parameters

NameDescription
Rthe resulting element type

Parameters

NameTypeDescription
reducerAsyncReducer<T, R>the (Async)Reducer instance to use to apply to all stream elements.
example
console.log(
await AsyncStream.of(1, 2, 4)
.reduceStream(Reducer.sum)
.toArray()
)
// => [1, 3, 7]
console.log(
await AsyncStream.of(1, 2, 4)
.reduce(Reducer.product)
.toArray()
)
// => [1, 2, 8]

repeat

Returns an AsyncStream that returns the elements from this stream given amount of times.

Definition

repeat(amount?: number): AsyncStream<T>;

Parameters

NameTypeDescription
amountnumber(default: undefined) the amount of times to return this Stream
example
const source = AsyncStream.of(1, 2, 3)
source.repeat() // => AsyncStream(1, 2, 3, 1, 2, 3, 1, 2, ...)
await source.repeat(1).toArray() // => [1, 2, 3]
await source.repeat(3).toArray() // => [1, 2, 3, 1, 2, 3, 1, 2, 3]
await source.repeat(-3).toArray() // => [1, 2, 3]
note

amount = undefined means that the AsyncStream is repeated indefintely @note amount = 1 means that the Stream is not repeated @note amount < 1 will be normalized to amount = 1 @note O(1)

single

Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.

Definitions

single(): Promise<T | undefined>;

single<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;

example
await AsyncStream.empty<number>().single()  // => undefined
await AsyncStream.of(1, 2, 3).single() // => undefined
await AsyncStream.of(1).single() // => 1
await AsyncStream.of(1, 2, 3).single(0) // => 0

some

Returns true if any element of the AsyncStream satifies given pred function.

Definition

some(pred: (value: T, index: number) => MaybePromise<boolean>): Promise<boolean>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchonous predicate function taking an element and its index
example
await AsyncStream.of(1, 2, 3).some((v, i) => v + i > 10) // => false
await AsyncStream.of(1, 2, 3).some(async (v, i) => v + i > 1) // => true
note

O(N)

splitOn

Returns an AsyncStream of arrays of stream elements, where each array is filled with elements of this stream up to the next element that equals given sepElem according to the given eq function.

Definition

splitOn(sepElem: T, eq?: Eq<T>): AsyncStream<T[]>;

Parameters

NameTypeDescription
sepElemTthe separator element to look for
eqEq<T>(optional) the equality function to use
example
await AsyncStream.from('marmot').splitOn('m').toArray()  // => [[], ['a', 'r'], ['o', 't']]
note

O(1)

splitWhere

Returns an AsyncStream of arrays of stream elements, where each array is filled with elements of this stream up to the next element that satisfies give function pred.

Definition

splitWhere(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T[]>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element and its index
example
await AsyncStream.of(1, 2, 3, 4).splitWhere(async v => v == 3).toArray()
// => [[1, 2], [4]]
note

O(1)

take

Returns an AsyncStream that contains the elements of this stream up to a maximum of amount elements.

Definition

take(amount: number): AsyncStream<T>;

Parameters

NameTypeDescription
amountnumberthe maximum amount of elements to return from the resulting Stream
example
await AsyncStream.of(1, 2, 3).take(2).toArray()   // => [1, 2]
note

O(N)

takeWhile

Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given pred function.

Definition

takeWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>;

Parameters

NameTypeDescription
pred(value: T, index: number) => MaybePromise<boolean>a potentially asynchronous predicate function taking an element and its index
example
await AsyncStream.of(1, 2, 3).takeWhile(async v => v < 3).toArray()
// => [1, 2]
note

O(N)

toArray

Returns an Array containing all elements in the AsyncStream.

Definition

toArray(): Promise<T[]>;

example
await AsyncStream.of(1, 2, 3).toArray()   // => [1, 2, 3]

toJSON

Returns a JSON representation of the AsyncStream.

Definition

toJSON(): Promise<ToJSON<T[], 'AsyncStream'>>;

note

take care not to call on infinite Streams

example
await AsyncStream.of(1, 2, 3).toJSON()   // => { dataType: 'AsyncStream', value: [1, 2, 3] }

toString

Returns a string representation of the AsyncStream.

Definition

toString(): string;

note

to avoid issues with potentially infinite stream, this method does not list the stream elements. To do this, use join.

example
AsyncStream.of(1, 2, 3).toString()   // => 'AsyncStream(...<potentially empty>)'

transform

Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given reducer to each element.

Definition

transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>;

Type parameters

NameDescription
Rthe resulting element type

Parameters

NameTypeDescription
transformerAsyncTransformer<T, R>an async reducer taking elements ot type T as input, and returing an AsyncStreamSource of element type R
note

O(1)

example
await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray()
// => [[1, 2, 3], [4, 5, 6]]

window

Returns an AsyncStream containing windows of windowSize consecutive elements of the source stream, with each window starting skipAmount elements after the previous one.

Definitions

window(windowSize: number, skipAmount?: number): AsyncStream<T[]>;

window<R>(windowSize: number, skipAmount?: number, collector?: AsyncReducer<T, R>): AsyncStream<R>;

Parameters

NameTypeDescription
windowSizenumberthe size in elements of the windows
skipAmountnumber(default: windowSize) the amount of elements to skip to start the next window
example
await Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray()
// => [[1, 2, 3], [4, 5, 6]]
await Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray()
// => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
await Stream.of(1, 2, 3, 4).window(2, 2, AsyncReducer.toJSSet()).toArray()
// => [Set(1, 2), Set(3, 4)]