Documentation / @eventkit/base
@eventkit/base
is the primary package in the eventkit project.
Installation
sh
npm i @eventkit/base
Basic Example
This is a basic example of how to use an eventkit stream. To get started, you should check out the Getting Started guide.
typescript
import { Stream, filter } from "@eventkit/base";
// Create a stream of events
const stream = new Stream<{ type: string; payload: any }>();
// Filter for specific event types
const userEvents = stream.pipe(filter((event) => event.type.startsWith("user.")));
// Subscribe to the filtered stream
userEvents.subscribe((event) => {
console.log(`Received user event: ${event.type}`);
});
// Push events to the stream
stream.push({ type: "user.login", payload: { userId: "123" } });
stream.push({ type: "system.update", payload: { version: "1.0.1" } }); // This won't be logged
// Wait for all events to be processed
await stream.drain();
Related Resources
Classes
Class | Description |
---|---|
AsyncObservable | Represents any number of values over any amount of time by way of an async generator that can be subscribed to and cancelled from. |
CallbackSubscriber | A specialized Subscriber that invokes a callback function for each value emitted by the observable. |
SingletonAsyncObservable | An extension of AsyncObservable that implements PromiseLike, allowing it to be used with await syntax. |
Stream | A Stream is a special type of AsyncObservable that allows values to be multicasted to many observers. Streams are like EventEmitters. |
Subscriber | Represents an active execution of an observable. |
Interfaces
Interface | Description |
---|---|
SchedulerLike | The interface that defines the core scheduling capabilities in eventkit. A scheduler is the logical unit that coordinates all work associated with a subject. |
SubscriptionLike | - |
Type Aliases
Type Alias | Description |
---|---|
AsyncObservableInput | Describes what types can be used as observable inputs in the from function |
MonoTypeOperatorFunction | A function type interface that describes a function that accepts and returns a parameter of the same type. |
ObservedValueOf | Extracts the type from an AsyncObservableInput<any> . If you have O extends AsyncObservableInput<any> and you pass in AsyncObservable<number> , or Promise<number> , etc, it will type as number . |
OperatorFunction | A function type interface that represents an operator that transforms an AsyncObservable of type T into an AsyncObservable of type R. |
ReadableStreamLike | The base signature eventkit will look for to identify and use a ReadableStream as an AsyncObservableInput source. |
RetryBackoff | Defines how the delay between retry attempts increases. |
RetryStrategy | Configuration options for the retry operator. |
StreamInit | Configuration options for creating a Stream. |
SubscriberCallback | The function signature for a subscriber callback. |
UnaryFunction | A function type interface that describes a function that accepts one parameter T and returns another parameter R . |
Errors
Class | Description |
---|---|
ArgumentOutOfRangeError | An error thrown when an element was queried at a certain index of an observable, but no such index or position exists in that sequence. |
InvalidConcurrencyLimitError | An error thrown when an invalid concurrency limit is provided to an operator. |
NoValuesError | An error thrown when an observable completes without emitting any valid values. |
Operators
Function | Description |
---|---|
buffer | Buffers the source observable until the pushNotifier observable emits a value. Each time the pushNotifier observable emits a value, the current buffer is emitted and a new buffer is started. |
bufferCount | Buffers the source observable until the size hits the maximum bufferSize given. |
concat | Merges the values from all provided observables into a single observable. When subscribed to, it will subscribe to the provided observables in a serial fashion, emitting the observables values, and waiting for each one to complete before subscribing to the next. The output observable will complete when all provided observables have completed, and error when any provided observable errors. |
concatAll | Converts an observable that yields observables (called a higher-order observable) into a first-order observable which delivers all the values from the inner observables in order. It only subscribes to an inner observable only after the previous inner observable has completed. All values emitted by the inner observables are emitted in order. |
concatMap | Applies a predicate function to each value yielded by the source observable, which returns a different observable that will be merged in a serialized fashion, waiting for each one to complete before subscribing to the next. |
count | Counts the number of items emitted by the source observable, and emits that number when the source observable completes. |
dlq | Returns an array with two observables with the purpose of imposing a dead letter queue on the source observable; the first observable being the values that are emitted on the source, and the second one representing errors that were thrown when executing callback actions. |
elementAt | Emits the single value at the specified index in the source observable, or a default value provided in the defaultValue argument and if the index is out of range. If the index is out of range and no default is given, an ArgumentOutOfRangeError is thrown. |
every | Determines whether all items emitted by the source observable satisfy a specified condition. Emits true if all values pass the condition, or false immediately if any value fails. |
filter | Filters items emitted by the source observable by only emitting those that satisfy a specified predicate. |
find | Emits the first value emitted by the source observable that satisfies a specified condition. If no such value is found, emits undefined when the source observable completes. |
findIndex | Emits the index of the first value emitted by the source observable that satisfies a specified condition. If no such value is found, emits -1 when the source observable completes. |
first | Emits the first value emitted by the source observable that satisfies a specified condition. If no such value is found when the source observable completes, the defaultValue is emitted if it's provided. If it isn't, a NoValuesError is thrown. |
isEmpty | Emits false if the source observable emits any values, or emits true if the source observable completes without emitting any values. |
last | Emits the last value emitted by the source observable that satisfies a specified condition. If no such value is found when the source observable completes, the defaultValue is emitted if it's provided. If it isn't, a NoValuesError is thrown. |
map | Applies a given predicate function to each value emitted by the source observable. |
max | Emits the maximum value emitted by the source observable. The source observable must emit a comparable type (numbers, strings, dates, etc.), or any type when a comparer function is provided. |
merge | Merges the values from all provided observables into a single observable. When subscribed to, it will subscribe to all provided observables and yield all values yielded by all of the provided observables. The output observable will complete when all provided observables have completed, and error when any provided observable errors. |
mergeAll | Converts an observable that yields observables (called a higher-order observable) into a first-order observable which concurrently delivers all values that are yielded on the inner observables. Each time an inner observable gets yielded, it subscribes to it and yields all the values from the inner observable. The output observable only completes when all inner observables have completed. Any error delivered by a inner observable will be immediately thrown on the output observable. |
mergeMap | Applies a predicate function to each value yielded by the source observable, which returns a different observable that will be merged into the output observable using mergeAll. |
min | Emits the minimum value emitted by the source observable. The source observable must emit a comparable type (numbers, strings, dates, etc.), or any type when a comparer function is provided. |
pairwise | Groups pairs of consecutive emissions together and emits them as a tuple of two values. In other words, it will take the current value and the previous value and emit them as a pair. |
partition | Returns an array with two observables that act as a split of the source observable; one with values that satisfy the predicate, and another with values that don't satisfy the predicate. |
reduce | Applies an accumulator function over the source generator, and returns the accumulated result when the source completes, given an optional seed value. |
retry | Returns an observable that will retry a callback action according to the provided retry strategy if an error occurs. |
skip | Returns an observable that skips the first count values emitted by the source observable. |
takeUntil | Emits values from the source observable until the stopNotifier observable emits a value. Once the stopNotifier emits, the resulting observable completes and no more values will be emitted. |
withOwnScheduler | Applies this to an independent Scheduler to an observable. Use this when you want to separate side effects from the source observable entirely. |
withScheduler | Applies a scheduler to an observable that passes side effects to the source observable, but defers the execution to the scheduler provided in the parameters. Use this when you want to control the execution of side effects independently of the source observable. |
Scheduling
Name | Description |
---|---|
CallbackAction | Represents an action that will be executed as a result of an observable yielding a value. |
CleanupAction | Represents an action that will be executed after some other work has been done. |
DeferredPassthroughScheduler | A scheduler that defers the execution of scheduled actions to a specified deferred scheduler, while still passing through work to a parent scheduler for tracking purposes. |
PassthroughScheduler | A scheduler that passes through all of it's work to a parent scheduler, and also defers the execution of scheduled actions to the parent scheduler. Optionally, a subject can be passed in the constructor to attach any work given to this scheduler to the subject in the parent scheduler. |
QueueScheduler | A scheduler that limits the number of actions that can execute concurrently. |
ScheduledAction | Represents an action that will be executed later. The callback that's passed in will be called at most once whenever execute() is called. |
Scheduler | Responsible for managing and observing any execution associated with a set of subjects (like an AsyncObservable or Subscriber). This is largely what enables eventkit objects to observe the asynchronous work that's performed as a result of creating a subscription. |
SubjectQueueScheduler | A scheduler that maintains separate queues for each subject with a concurrency limit applied independently to each queue. |
QueueSchedulerInit | Configuration options for initializing queue schedulers. |
Utilities
Function | Description |
---|---|
from | Creates an AsyncObservable from an AsyncObservableInput like object |
isAsyncObservable | Tests to see if the object can be used as an AsyncObservable. |
pipe | Creates a new function that pipes the value through a series of functions. |