Skip to main content

interface AsyncReducer.Impl<I,O,S>

The AsyncReducer implementation interface defining the required methods.

Type parameters

NameDescription
Ithe input type
Othe output type
Sthe state type

Properties

init

The initial state value for the reducer algorithm.

Definition

readonly init: (initHalt: () => void) => MaybePromise<S>;

mapInput

Returns an AsyncReducer instance that converts its input values using given mapFun before passing them to the reducer.

Definition

mapInput: <I2>(mapFun: (value: I2, index: number) => MaybePromise<I>) => AsyncReducer<I2, O>;

example
AsyncReducer
.createMono(0, async (c, v) => c + v)
.mapInput(async v => v * 2)
// this reducer will double all input values before summing them

onClose

An optional function that is called when the reducer will no longer receive values.

Definition

onClose?: ((state: S, error?: unknown) => MaybePromise<void>) | undefined;

Methods

chain

Returns a reducer that applies the given nextReducers sequentially after this reducer has halted, and moving on to the next provided reducer until it is halted. Optionally, it provides the last output value of the previous reducer.

Definition

chain<O2 extends O>(nextReducers: AsyncStreamSource<AsyncOptLazy<AsyncReducer.Accept<I, O2>, [O2]>>): AsyncReducer<I, O2>;

Type parameters

NameConstraintsDescription
O2O

Parameters

NameTypeDescription
nextReducersAsyncStreamSource<AsyncOptLazy<AsyncReducer.Accept<I, O2>, [O2]>>an number of reducers consuming and producing the same types as the current reducer.
example
const result = await AsyncStream.range({ amount: 6 })
.reduce(
Reducer.sum
.takeInput(3)
.chain(
v => v > 10 ? Reducer.product : Reducer.sum
)
)
console.log(result)
// => 21

collectInput

Returns an AsyncReducer instance that converts or filters its input values using given collectFun before passing them to the reducer.

Definition

collectInput<I2>(collectFun: AsyncCollectFun<I2, I>): AsyncReducer<I2, O>;

Type parameters

NameDescription
I2the new input type

Parameters

NameTypeDescription
collectFunAsyncCollectFun<I2, I>a (potentially async) function receiving
- value: the next value
- index: the value index
- skip: a token that, when returned, will not add a value to the resulting collection
- halt: a function that, when called, ensures no next elements are passed
example
AsyncReducer
.createMono(0, async (c, v) => c + v)
.collectInput(async (v, _, skip) => v <= 10 ? skip : v * 2)
// this reducer will double all input values larger thant 10 before summing them,
// and will skip all values smaller than 10

compile

Returns a promise that resolves to a 'runnable' instance of the current reducer specification. This instance maintains its own state and indices, so that the instance only needs to be provided the input values, and output values can be retrieved when needed. The state is kept private.

Definition

compile(): Promise<AsyncReducer.Instance<I, O>>;

example
const reducer = AsyncReducer.from(Reducer.sum.mapOutput(v => v * 2));
const instance = reducer.compile();
await instance.next(3);
await instance.next(5);
console.log(await instance.getOutput());
// => 16

dropInput

Returns an AsyncReducer instance that skips the first given amount of input elements, and will process subsequent elements.

Definition

dropInput(amount: number): AsyncReducer<I, O>;

Parameters

NameTypeDescription
amountnumberthe amount of elements to skip
example
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.dropInput(9)
)
// => 19

filterInput

Returns an AsyncReducer instance that only passes values to the reducer that satisy the given pred predicate.

Definitions

filterInput<IF extends I>(pred: (value: I, index: number, halt: () => void) => value is IF, options?: {
      negate?: false | undefined;
    }): AsyncReducer<IF, O>;

filterInput<IF extends I>(pred: (value: I, index: number, halt: () => void) => value is IF, options: {
      negate: true;
    }): AsyncReducer<Exclude<I, IF>, O>;

filterInput(pred: (value: I, index: number, halt: () => void) => MaybePromise<boolean>, options?: {
      negate?: boolean | undefined;
    }): AsyncReducer<I, O>;

Type parameters

NameConstraintsDescription
IFI

Parameters

NameTypeDescription
pred(value: I, index: number, halt: () => void) => value is IFa potaentially asynchronous function that returns true if the value should be passed to the reducer based on the following inputs:
- value: the current input value
- index: the current input index
- halt: function that, when called, ensures no more new values are passed to the reducer
options{
      negate?: false | undefined;
    }
(optional) an object containing the following properties:
- negate: (default: false) when true will invert the given predicate
note

if the predicate is a type guard, the return type is automatically inferred

example
AsyncReducer
.createMono(0, async (c, v) => c + v)
.filterInput(async v => v > 10)
// this reducer will only sum values larger than 10

flatMapInput

Returns an AsyncReducer instance that converts its input values using given flatMapFun before passing them to the reducer.

Definition

flatMapInput<I2>(flatMapFun: (value: I2, index: number) => MaybePromise<AsyncStreamSource<I>>): AsyncReducer<I2, O>;

Type parameters

NameDescription
I2the new input type

Parameters

NameTypeDescription
flatMapFun(value: I2, index: number) => MaybePromise<AsyncStreamSource<I>>a potentially asynchronous function that returns am arbitrary number of new values to pass to the reducer based on the following inputs:
- value: the current input value
- index: the current input index
example
AsyncReducer
.createMono(0, async (c, v) => c + v)
.flatMapInput(async v => [v, v])
// this reducer will include all input values twice before summing them

mapOutput

Returns an AsyncReducer instance that converts its output values using given mapFun.

Definition

mapOutput<O2>(mapFun: (value: O, index: number, halted: boolean) => MaybePromise<O2>): AsyncReducer<I, O2>;

Type parameters

NameDescription
O2the new output type

Parameters

NameTypeDescription
mapFun(value: O, index: number, halted: boolean) => MaybePromise<O2>a potentially asynchronous function that takes the current output value and converts it to a new output value
example
AsyncReducer
.createMono(0, async (c, v) => c + v)
.mapOutput(async v => String(v))
// this reducer will convert all its results to string before returning them

next

Returns the next state based on the given input values

Definition

next(state: S, elem: I, index: number, halt: () => void): MaybePromise<S>;

Parameters

NameTypeDescription
stateSthe current state
elemIthe current input value
indexnumberthe current input index
halt() => voida function that, when called, ensures no more values are passed to the reducer

sliceInput

Returns an AsyncReducer instance that takes given amount of elements starting at given from index, and ignores other elements.

Definition

sliceInput(from?: number |undefined, amount?: number|undefined):AsyncReducer<I, O>;

Parameters

NameTypeDescription
fromnumber | undefined(default: 0) the index at which to start processing elements
amountnumber | undefined(optional) the amount of elements to process, if not given, processes all elements from the from index
example
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.sliceInput(1, 2)
)
// => 3

stateToResult

Returns the output value based on the given state

Definition

stateToResult(state: S, index: number, halted: boolean): MaybePromise<O>;

Parameters

NameTypeDescription
stateSthe current state
indexnumberthe value index
haltedbooleana boolean indicating whether the reducer is halted

takeInput

Returns an AsyncReducer instance that takes at most the given amount of input elements, and will ignore subsequent elements.

Definition

takeInput(amount: number): AsyncReducer<I, O>;

Parameters

NameTypeDescription
amountnumberthe amount of elements to accept
example
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.takeInput(2)
)
// => 1

takeOutput

Returns an 'AsyncReducerinstance that produces at mostamount` values.

Definition

takeOutput(amount: number): AsyncReducer<I, O>;

Parameters

NameTypeDescription
amountnumberthe maximum amount of values to produce.

takeOutputUntil

Returns an 'AsyncReducerinstance that produces until the givenpred` predicate returns true for the output value.

Definition

takeOutputUntil(pred: (value: O, index: number) => MaybePromise<boolean>, options?: {
      negate?: boolean | undefined;
    }): AsyncReducer<I, O>;

Parameters

NameTypeDescription
pred(value: O, index: number) => MaybePromise<boolean>a potaentially asynchronous function that returns true if the value should be passed to the reducer based on the following inputs:
- value: the current input value
- index: the current input index
- halt: function that, when called, ensures no more new values are passed to the reducer
options{
      negate?: boolean | undefined;
    }
(optional) an object containing the following properties:
- negate: (default: false) when true will invert the given predicate