Skip to content

Documentation / @eventkit/base / AsyncObservable

Class: AsyncObservable<T>

Represents any number of values over any amount of time by way of an async generator that can be subscribed to and cancelled from.

AsyncObservable instances can be created from common iterable and stream-like types by using the from method.

Example

ts
const observable = new AsyncObservable(async function* () {
  yield 1;
  yield 2;
  yield 3;
});

See

Observable Pattern

Extended by

Type Parameters

Type ParameterDescription
TThe type of the values emitted by the AsyncObservable.

Implements

Accessors

subscribers

Get Signature

ts
get subscribers(): Subscriber<T>[]
Returns

Subscriber<T>[]

Methods

cancel()

ts
cancel(): Promise<void>

Cancels all subscribers from this AsyncObservable. This will stop the execution of all active subscribers and remove them from the internal subscriber list. While drain will resolve when all subscribers have completed, this method will send an early interrupt signal to all subscribers, causing them to exit their generator prematurely.

This is useful when you want to clean up all subscriptions at once, rather than cancelling from each subscriber individually. This method is also the implementation of the standard disposer symbols, which means that it will be called when the AsyncObservable is disposed either by calling the dispose method directly or using explicit resource management.

Returns

Promise<void>

A Promise that resolves when all subscribers have been cancelled.

Implementation of

SubscriptionLike.cancel


drain()

ts
drain(): Promise<void>

Returns a promise that resolves when all the work scheduled against the observable has completed (i.e. subscriber callbacks or cleanup handlers).

Returns

Promise<void>


finally()

ts
finally(onfinally?): Promise<any>

Schedules a cleanup action that gets executed when the observable is disposed of. Optionally, a callback can be provided to inform the behavior of the created action.

Parameters

ParameterTypeDescription
onfinally?null | () => voidOptional callback to execute after completion or error

Returns

Promise<any>

A promise that resolves when the action has completed


stub()

ts
stub(): AsyncObservable<T>

Returns a bound AsyncObservable that will emit values from this AsyncObservable in order. This effectively creates a distinct "dummy" observable that acts as a generic wrapper around the current AsyncObservable.

Returns

AsyncObservable<T>

A new AsyncObservable that wraps this AsyncObservable and emits the same values.


subscribe()

ts
subscribe(callback?): Subscriber<T>

Invokes an execution of an AsyncObservable and registers a new Subscriber that will call the provided callback for each value emitted by the generator. The callback will be passed the value of the current value as an argument.

subscribe is not a regular operator, but a method that calls AsyncObservable's internal generator function and returns a new Subscriber. It might be misinterpreted that AsyncObservable works like an event emitter where the callback is the event handler that is called any time a hypothetical push method is called on an instance. This is not the case (but this can be achieved using a Stream). It is a library implementation which defines what will be emitted by an AsyncObservable, and when it will be emitted. This means that calling subscribe is actually the moment when AsyncObservable starts its work, not when it is created, as it is often the thought.

Apart from starting the execution of an AsyncObservable, this method allows you to listen for values that an AsyncObservable emits, as well as waiting for the execution of the AsyncObservable to complete by using the returned Subscriber instance like you would with a Promise.

You can also subscribe without providing a callback. This may be the case where you're not interested in the values emitted by the generator, but you want to wait for the execution of the AsyncObservable to complete.

The returned Subscriber object also acts like a Promise which can be awaited to wait for the AsyncObservable's execution to complete. Any errors that are thrown by this function will be propagated to the promise's rejection handler.

Parameters

ParameterTypeDescription
callback?SubscriberCallback<T>The callback to execute for each value emitted by the generator. This callback will be passed the value as an argument.

Returns

Subscriber<T>

A new Subscriber that can be used to unsubscribe from the AsyncObservable.

Utilities

from()

ts
static from: <O>(input) => AsyncObservable<ObservedValueOf<O>>;

Method to expose the utility function #from as a static method on AsyncObservable. This is useful for creating an AsyncObservable from a common iterable or stream-like type.

Creates an AsyncObservable from an AsyncObservableInput like object

Type Parameters

Type Parameter
O extends AsyncObservableInput<any>

Parameters

ParameterTypeDescription
inputOThe source to create an AsyncObservable from

Returns

AsyncObservable<ObservedValueOf<O>>

An AsyncObservable that emits the values from the source

Param

The source to create an AsyncObservable from

Returns

An AsyncObservable that emits the values from the source

Released under the MIT License.