interface AsyncReducer.Impl<I,O,S>
The AsyncReducer implementation interface defining the required methods.
Type parameters
Name | Description |
---|---|
I | the input type |
O | the output type |
S | the state type |
Properties
init
The initial state value for the reducer algorithm.
init
Definition
readonly init: (initHalt: () => void) =>
MaybePromise
<S>;
mapInput
Returns an AsyncReducer
instance that converts its input values using given mapFun
before passing them to the reducer.
mapInput
AsyncReducer
instance that converts its input values using given mapFun
before passing them to the reducer.Definition
mapInput: <I2>(mapFun: (value: I2, index: number) =>
MaybePromise
<I>) =>
AsyncReducer
<I2, O>;
AsyncReducer
.createMono(0, async (c, v) => c + v)
.mapInput(async v => v * 2)
// this reducer will double all input values before summing them
onClose
An optional function that is called when the reducer will no longer receive values.
onClose
Definition
onClose?: ((state: S, error?: unknown) =>
MaybePromise
<void>)
|
undefined;
Methods
chain
Returns a reducer that applies the given nextReducers
sequentially after this reducer has halted, and moving on to the next provided reducer until it is halted. Optionally, it provides the last output value of the previous reducer.
chain
nextReducers
sequentially after this reducer has halted, and moving on to the next provided reducer until it is halted. Optionally, it provides the last output value of the previous reducer.Definition
chain<O2 extends O>(nextReducers:
AsyncStreamSource
<AsyncOptLazy<
AsyncReducer.Accept
<I, O2>, [O2]>>):
AsyncReducer
<I, O2>;
Type parameters
Name | Constraints | Description |
---|---|---|
O2 | O |
Parameters
Name | Type | Description |
---|---|---|
nextReducers | AsyncStreamSource <AsyncOptLazy< AsyncReducer.Accept <I, O2>, [O2]>> | an number of reducers consuming and producing the same types as the current reducer. |
const result = await AsyncStream.range({ amount: 6 })
.reduce(
Reducer.sum
.takeInput(3)
.chain(
v => v > 10 ? Reducer.product : Reducer.sum
)
)
console.log(result)
// => 21
collectInput
Returns an AsyncReducer
instance that converts or filters its input values using given collectFun
before passing them to the reducer.
collectInput
AsyncReducer
instance that converts or filters its input values using given collectFun
before passing them to the reducer.Definition
collectInput<I2>(collectFun:
AsyncCollectFun
<I2, I>):
AsyncReducer
<I2, O>;
Type parameters
Name | Description |
---|---|
I2 | the new input type |
Parameters
Name | Type | Description |
---|---|---|
collectFun | AsyncCollectFun <I2, I> | a (potentially async) function receiving - value : the next value- index : the value index- skip : a token that, when returned, will not add a value to the resulting collection- halt : a function that, when called, ensures no next elements are passed |
AsyncReducer
.createMono(0, async (c, v) => c + v)
.collectInput(async (v, _, skip) => v <= 10 ? skip : v * 2)
// this reducer will double all input values larger thant 10 before summing them,
// and will skip all values smaller than 10
compile
Returns a promise that resolves to a 'runnable' instance of the current reducer specification. This instance maintains its own state and indices, so that the instance only needs to be provided the input values, and output values can be retrieved when needed. The state is kept private.
compile
Definition
compile(): Promise<
AsyncReducer.Instance
<I, O>>;
const reducer = AsyncReducer.from(Reducer.sum.mapOutput(v => v * 2));
const instance = reducer.compile();
await instance.next(3);
await instance.next(5);
console.log(await instance.getOutput());
// => 16
dropInput
Returns an AsyncReducer
instance that skips the first given amount
of input elements, and will process subsequent elements.
dropInput
AsyncReducer
instance that skips the first given amount
of input elements, and will process subsequent elements.Definition
dropInput(amount: number):
AsyncReducer
<I, O>;
Parameters
Name | Type | Description |
---|---|---|
amount | number | the amount of elements to skip |
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.dropInput(9)
)
// => 19
filterInput
Returns an AsyncReducer
instance that only passes values to the reducer that satisy the given pred
predicate.
filterInput
AsyncReducer
instance that only passes values to the reducer that satisy the given pred
predicate.Definitions
filterInput<IF extends I>(pred: (value: I, index: number, halt: () => void) => value is IF, options?: {
negate?: false
|
undefined;
}):
AsyncReducer
<IF, O>;
filterInput<IF extends I>(pred: (value: I, index: number, halt: () => void) => value is IF, options: {
negate: true;
}):
AsyncReducer
<Exclude<I, IF>, O>;
filterInput(pred: (value: I, index: number, halt: () => void) =>
MaybePromise
<boolean>, options?: {
negate?: boolean
|
undefined;
}):
AsyncReducer
<I, O>;
Type parameters
Name | Constraints | Description |
---|---|---|
IF | I |
Parameters
Name | Type | Description |
---|---|---|
pred | (value: I, index: number, halt: () => void) => value is IF | a potaentially asynchronous function that returns true if the value should be passed to the reducer based on the following inputs: - value: the current input value - index: the current input index - halt: function that, when called, ensures no more new values are passed to the reducer |
options | { negate?: false | undefined; } | (optional) an object containing the following properties: - negate: (default: false) when true will invert the given predicate |
if the predicate is a type guard, the return type is automatically inferred
AsyncReducer
.createMono(0, async (c, v) => c + v)
.filterInput(async v => v > 10)
// this reducer will only sum values larger than 10
flatMapInput
Returns an AsyncReducer
instance that converts its input values using given flatMapFun
before passing them to the reducer.
flatMapInput
AsyncReducer
instance that converts its input values using given flatMapFun
before passing them to the reducer.Definition
flatMapInput<I2>(flatMapFun: (value: I2, index: number) =>
MaybePromise
<
AsyncStreamSource
<I>>):
AsyncReducer
<I2, O>;
Type parameters
Name | Description |
---|---|
I2 | the new input type |
Parameters
Name | Type | Description |
---|---|---|
flatMapFun | (value: I2, index: number) => MaybePromise < AsyncStreamSource <I>> | a potentially asynchronous function that returns am arbitrary number of new values to pass to the reducer based on the following inputs: - value: the current input value - index: the current input index |
AsyncReducer
.createMono(0, async (c, v) => c + v)
.flatMapInput(async v => [v, v])
// this reducer will include all input values twice before summing them
mapOutput
Returns an AsyncReducer
instance that converts its output values using given mapFun
.
mapOutput
AsyncReducer
instance that converts its output values using given mapFun
.Definition
mapOutput<O2>(mapFun: (value: O, index: number, halted: boolean) =>
MaybePromise
<O2>):
AsyncReducer
<I, O2>;
Type parameters
Name | Description |
---|---|
O2 | the new output type |
Parameters
Name | Type | Description |
---|---|---|
mapFun | (value: O, index: number, halted: boolean) => MaybePromise <O2> | a potentially asynchronous function that takes the current output value and converts it to a new output value |
AsyncReducer
.createMono(0, async (c, v) => c + v)
.mapOutput(async v => String(v))
// this reducer will convert all its results to string before returning them
next
Returns the next state based on the given input values
next
Definition
next(state: S, elem: I, index: number, halt: () => void):
MaybePromise
<S>;
Parameters
Name | Type | Description |
---|---|---|
state | S | the current state |
elem | I | the current input value |
index | number | the current input index |
halt | () => void | a function that, when called, ensures no more values are passed to the reducer |
sliceInput
Returns an AsyncReducer
instance that takes given amount
of elements starting at given from
index, and ignores other elements.
sliceInput
AsyncReducer
instance that takes given amount
of elements starting at given from
index, and ignores other elements.Definition
sliceInput(from?: number
|
undefined, amount?: number
|
undefined):
AsyncReducer
<I, O>;
Parameters
Name | Type | Description |
---|---|---|
from | number | undefined | (default: 0) the index at which to start processing elements |
amount | number | undefined | (optional) the amount of elements to process, if not given, processes all elements from the from index |
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.sliceInput(1, 2)
)
// => 3
stateToResult
Returns the output value based on the given state
stateToResult
state
Definition
stateToResult(state: S, index: number, halted: boolean):
MaybePromise
<O>;
Parameters
Name | Type | Description |
---|---|---|
state | S | the current state |
index | number | the value index |
halted | boolean | a boolean indicating whether the reducer is halted |
takeInput
Returns an AsyncReducer
instance that takes at most the given amount
of input elements, and will ignore subsequent elements.
takeInput
AsyncReducer
instance that takes at most the given amount
of input elements, and will ignore subsequent elements.Definition
takeInput(amount: number):
AsyncReducer
<I, O>;
Parameters
Name | Type | Description |
---|---|---|
amount | number | the amount of elements to accept |
await AsyncStream
.from(Stream.range({ end: 10 }))
.reduce(
AsyncReducer
.createMono(0, async (c, v) => c + v)
.takeInput(2)
)
// => 1
takeOutput
Returns an 'AsyncReducerinstance that produces at most
amount` values.
takeOutput
instance that produces at most
amount` values.Definition
takeOutput(amount: number):
AsyncReducer
<I, O>;
Parameters
Name | Type | Description |
---|---|---|
amount | number | the maximum amount of values to produce. |
takeOutputUntil
Returns an 'AsyncReducerinstance that produces until the given
pred` predicate returns true for the output value.
takeOutputUntil
instance that produces until the given
pred` predicate returns true for the output value.Definition
takeOutputUntil(pred: (value: O, index: number) =>
MaybePromise
<boolean>, options?: {
negate?: boolean
|
undefined;
}):
AsyncReducer
<I, O>;
Parameters
Name | Type | Description |
---|---|---|
pred | (value: O, index: number) => MaybePromise <boolean> | a potaentially asynchronous function that returns true if the value should be passed to the reducer based on the following inputs: - value: the current input value - index: the current input index - halt: function that, when called, ensures no more new values are passed to the reducer |
options | { negate?: boolean | undefined; } | (optional) an object containing the following properties: - negate: (default: false) when true will invert the given predicate |