interface AsyncReducer.Impl<I,O,S>
The AsyncReducer implementation interface defining the required methods.
Type parameters
| Name | Description |
|---|---|
| I | the input type |
| O | the output type |
| S | the state type |
Properties
init
The initial state value for the reducer algorithm.
initDefinition
readonly init: (initHalt: () => void) => MaybePromise<S>;
mapInput
Returns an AsyncReducer instance that converts its input values using given mapFun before passing them to the reducer.
mapInputAsyncReducer instance that converts its input values using given mapFun before passing them to the reducer.Definition
mapInput: <I2>(mapFun: (value: I2, index: number) => MaybePromise<I>) => AsyncReducer<I2, O>;
AsyncReducer
.createMono(0, async (c, v) => c + v)
.mapInput(async v => v * 2)
// this reducer will double all input values before summing them
onClose
An optional function that is called when the reducer will no longer receive values.
onCloseDefinition
onClose?: ((state: S, error?: unknown) => MaybePromise<void>) | undefined;
Methods
chain
Returns a reducer that applies the given nextReducers sequentially after this reducer has halted, and moving on to the next provided reducer until it is halted. Optionally, it provides the last output value of the previous reducer.
chainnextReducers sequentially after this reducer has halted, and moving on to the next provided reducer until it is halted. Optionally, it provides the last output value of the previous reducer.Definition
chain<O2 extends O>(nextReducers: AsyncStreamSource<AsyncOptLazy<AsyncReducer.Accept<I, O2>, [O2]>>): AsyncReducer<I, O2>;
Type parameters
| Name | Constraints | Description |
|---|---|---|
| O2 | O |
Parameters
| Name | Type | Description |
|---|---|---|
nextReducers | AsyncStreamSource<AsyncOptLazy<AsyncReducer.Accept<I, O2>, [O2]>> | an number of reducers consuming and producing the same types as the current reducer. |
const result = await AsyncStream.range({ amount: 6 })
.reduce(
Reducer.sum
.takeInput(3)
.chain(
v => v > 10 ? Reducer.product : Reducer.sum
)
)
console.log(result)
// => 21
collectInput
Returns an AsyncReducer instance that converts or filters its input values using given collectFun before passing them to the reducer.
collectInputAsyncReducer instance that converts or filters its input values using given collectFun before passing them to the reducer.Definition
collectInput<I2>(collectFun: AsyncCollectFun<I2, I>): AsyncReducer<I2, O>;
Type parameters
| Name | Description |
|---|---|
| I2 | the new input type |
Parameters
| Name | Type | Description |
|---|---|---|
collectFun | AsyncCollectFun<I2, I> | a (potentially async) function receiving - value: the next value- index: the value index- skip: a token that, when returned, will not add a value to the resulting collection- halt: a function that, when called, ensures no next elements are passed |
AsyncReducer
.createMono(0, async (c, v) => c + v)
.collectInput(async (v, _, skip) => v <= 10 ? skip : v * 2)
// this reducer will double all input values larger thant 10 before summing them,
// and will skip all values smaller than 10
compile
Returns a promise that resolves to a 'runnable' instance of the current reducer specification. This instance maintains its own state and indices, so that the instance only needs to be provided the input values, and output values can be retrieved when needed. The state is kept private.
compileDefinition
compile(): Promise<AsyncReducer.Instance<I, O>>;
const reducer = AsyncReducer.from(Reducer.sum.mapOutput(v => v * 2));
const instance = reducer.compile();
await instance.next(3);
await instance.next(5);
console.log(await instance.getOutput());
// => 16
dropInput
Returns an AsyncReducer instance that skips the first given amount of input elements, and will process subsequent elements.
dropInputAsyncReducer instance that skips the first given amount of input elements, and will process subsequent elements.Definition
dropInput(amount: number): AsyncReducer<I, O>;
Parameters
| Name | Type | Description |
|---|---|---|
amount | number | the amount of elements to skip |
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.dropInput(9)
)
// => 19
filterInput
Returns an AsyncReducer instance that only passes values to the reducer that satisy the given pred predicate.
filterInputAsyncReducer instance that only passes values to the reducer that satisy the given pred predicate.Definitions
filterInput<IF extends I>(pred: (value: I, index: number, halt: () => void) => value is IF, options?: {
negate?: false | undefined;
}): AsyncReducer<IF, O>;
filterInput<IF extends I>(pred: (value: I, index: number, halt: () => void) => value is IF, options: {
negate: true;
}): AsyncReducer<Exclude<I, IF>, O>;
filterInput(pred: (value: I, index: number, halt: () => void) => MaybePromise<boolean>, options?: {
negate?: boolean | undefined;
}): AsyncReducer<I, O>;
Type parameters
| Name | Constraints | Description |
|---|---|---|
| IF | I |
Parameters
| Name | Type | Description |
|---|---|---|
pred | (value: I, index: number, halt: () => void) => value is IF | a potaentially asynchronous function that returns true if the value should be passed to the reducer based on the following inputs: - value: the current input value - index: the current input index - halt: function that, when called, ensures no more new values are passed to the reducer |
options | {negate?: false | undefined;} | (optional) an object containing the following properties: - negate: (default: false) when true will invert the given predicate |
if the predicate is a type guard, the return type is automatically inferred
AsyncReducer
.createMono(0, async (c, v) => c + v)
.filterInput(async v => v > 10)
// this reducer will only sum values larger than 10
flatMapInput
Returns an AsyncReducer instance that converts its input values using given flatMapFun before passing them to the reducer.
flatMapInputAsyncReducer instance that converts its input values using given flatMapFun before passing them to the reducer.Definition
flatMapInput<I2>(flatMapFun: (value: I2, index: number) => MaybePromise<AsyncStreamSource<I>>): AsyncReducer<I2, O>;
Type parameters
| Name | Description |
|---|---|
| I2 | the new input type |
Parameters
| Name | Type | Description |
|---|---|---|
flatMapFun | (value: I2, index: number) => MaybePromise<AsyncStreamSource<I>> | a potentially asynchronous function that returns am arbitrary number of new values to pass to the reducer based on the following inputs: - value: the current input value - index: the current input index |
AsyncReducer
.createMono(0, async (c, v) => c + v)
.flatMapInput(async v => [v, v])
// this reducer will include all input values twice before summing them
mapOutput
Returns an AsyncReducer instance that converts its output values using given mapFun.
mapOutputAsyncReducer instance that converts its output values using given mapFun.Definition
mapOutput<O2>(mapFun: (value: O, index: number, halted: boolean) => MaybePromise<O2>): AsyncReducer<I, O2>;
Type parameters
| Name | Description |
|---|---|
| O2 | the new output type |
Parameters
| Name | Type | Description |
|---|---|---|
mapFun | (value: O, index: number, halted: boolean) => MaybePromise<O2> | a potentially asynchronous function that takes the current output value and converts it to a new output value |
AsyncReducer
.createMono(0, async (c, v) => c + v)
.mapOutput(async v => String(v))
// this reducer will convert all its results to string before returning them
next
Returns the next state based on the given input values
nextDefinition
next(state: S, elem: I, index: number, halt: () => void): MaybePromise<S>;
Parameters
| Name | Type | Description |
|---|---|---|
state | S | the current state |
elem | I | the current input value |
index | number | the current input index |
halt | () => void | a function that, when called, ensures no more values are passed to the reducer |
sliceInput
Returns an AsyncReducer instance that takes given amount of elements starting at given from index, and ignores other elements.
sliceInputAsyncReducer instance that takes given amount of elements starting at given from index, and ignores other elements.Definition
sliceInput(from?: number |undefined, amount?: number|undefined):AsyncReducer<I, O>;
Parameters
| Name | Type | Description |
|---|---|---|
from | number | undefined | (default: 0) the index at which to start processing elements |
amount | number | undefined | (optional) the amount of elements to process, if not given, processes all elements from the from index |
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.sliceInput(1, 2)
)
// => 3
stateToResult
Returns the output value based on the given state
stateToResultstateDefinition
stateToResult(state: S, index: number, halted: boolean): MaybePromise<O>;
Parameters
| Name | Type | Description |
|---|---|---|
state | S | the current state |
index | number | the value index |
halted | boolean | a boolean indicating whether the reducer is halted |
takeInput
Returns an AsyncReducer instance that takes at most the given amount of input elements, and will ignore subsequent elements.
takeInputAsyncReducer instance that takes at most the given amount of input elements, and will ignore subsequent elements.Definition
takeInput(amount: number): AsyncReducer<I, O>;
Parameters
| Name | Type | Description |
|---|---|---|
amount | number | the amount of elements to accept |
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.takeInput(2)
)
// => 1
takeOutput
Returns an 'AsyncReducerinstance that produces at mostamount` values.
takeOutputinstance that produces at mostamount` values.Definition
takeOutput(amount: number): AsyncReducer<I, O>;
Parameters
| Name | Type | Description |
|---|---|---|
amount | number | the maximum amount of values to produce. |
takeOutputUntil
Returns an 'AsyncReducerinstance that produces until the givenpred` predicate returns true for the output value.
takeOutputUntilinstance that produces until the givenpred` predicate returns true for the output value.Definition
takeOutputUntil(pred: (value: O, index: number) => MaybePromise<boolean>, options?: {
negate?: boolean | undefined;
}): AsyncReducer<I, O>;
Parameters
| Name | Type | Description |
|---|---|---|
pred | (value: O, index: number) => MaybePromise<boolean> | a potaentially asynchronous function that returns true if the value should be passed to the reducer based on the following inputs: - value: the current input value - index: the current input index - halt: function that, when called, ensures no more new values are passed to the reducer |
options | {negate?: boolean | undefined;} | (optional) an object containing the following properties: - negate: (default: false) when true will invert the given predicate |