Skip to content

Documentation / @eventkit/base / SingletonAsyncObservable

Class: SingletonAsyncObservable<T>

An extension of AsyncObservable that implements PromiseLike, allowing it to be used with await syntax.

This class is designed for observables that are expected to emit exactly one value, such as those created by operators like reduce or count. It provides a convenient way to await the single emitted value without having to manually set up a subscription.

Example

ts
// instead of
let value: T | undefined;
await observable.subscribe((v) => {
  // this observable only emits one value, so this will only be called once
  value = v;
});

// you can do
const value: T = await observable;

// like with `reduce` for instance
const result = await from([1, 2, 3]).pipe(reduce((acc, val) => acc + val, 0));
console.log(result); // 6

Extends

Type Parameters

Type ParameterDescription
TThe type of the value emitted by the observable

Implements

  • PromiseLike<T>

Constructors

Constructor

ts
new SingletonAsyncObservable<T>(generator?): SingletonAsyncObservable<T>

Parameters

ParameterTypeDescription
generator?(this, sub) => AsyncGenerator<T>The function that returns the async generator that will be used to emit values. This function will be called every time a new subscriber is created.

Returns

SingletonAsyncObservable<T>

Inherited from

ts
AsyncObservable<T>.constructor

Accessors

subscribers

Get Signature

ts
get subscribers(): Subscriber<T>[]
Returns

Subscriber<T>[]

Inherited from

AsyncObservable.subscribers

Methods

cancel()

ts
cancel(): Promise<void>

Cancels all subscribers from this AsyncObservable. This will stop the execution of all active subscribers and remove them from the internal subscriber list. While drain will resolve when all subscribers have completed, this method will send an early interrupt signal to all subscribers, causing them to exit their generator prematurely.

This is useful when you want to clean up all subscriptions at once, rather than cancelling from each subscriber individually. This method is also the implementation of the standard disposer symbols, which means that it will be called when the AsyncObservable is disposed either by calling the dispose method directly or using explicit resource management.

Returns

Promise<void>

A Promise that resolves when all subscribers have been cancelled.

Inherited from

AsyncObservable.cancel


drain()

ts
drain(): Promise<void>

Returns a promise that resolves when all the work scheduled against the observable has completed (i.e. subscriber callbacks or cleanup handlers).

Returns

Promise<void>

Inherited from

AsyncObservable.drain


finally()

ts
finally(onfinally?): Promise<any>

Schedules a cleanup action that gets executed when the observable is disposed of. Optionally, a callback can be provided to inform the behavior of the created action.

Parameters

ParameterTypeDescription
onfinally?null | () => voidOptional callback to execute after completion or error

Returns

Promise<any>

A promise that resolves when the action has completed

Inherited from

AsyncObservable.finally


stub()

ts
stub(): AsyncObservable<T>

Returns a bound AsyncObservable that will emit values from this AsyncObservable in order. This effectively creates a distinct "dummy" observable that acts as a generic wrapper around the current AsyncObservable.

Returns

AsyncObservable<T>

A new AsyncObservable that wraps this AsyncObservable and emits the same values.

Inherited from

AsyncObservable.stub


subscribe()

ts
subscribe(callback?): Subscriber<T>

Invokes an execution of an AsyncObservable and registers a new Subscriber that will call the provided callback for each value emitted by the generator. The callback will be passed the value of the current value as an argument.

subscribe is not a regular operator, but a method that calls AsyncObservable's internal generator function and returns a new Subscriber. It might be misinterpreted that AsyncObservable works like an event emitter where the callback is the event handler that is called any time a hypothetical push method is called on an instance. This is not the case (but this can be achieved using a Stream). It is a library implementation which defines what will be emitted by an AsyncObservable, and when it will be emitted. This means that calling subscribe is actually the moment when AsyncObservable starts its work, not when it is created, as it is often the thought.

Apart from starting the execution of an AsyncObservable, this method allows you to listen for values that an AsyncObservable emits, as well as waiting for the execution of the AsyncObservable to complete by using the returned Subscriber instance like you would with a Promise.

You can also subscribe without providing a callback. This may be the case where you're not interested in the values emitted by the generator, but you want to wait for the execution of the AsyncObservable to complete.

The returned Subscriber object also acts like a Promise which can be awaited to wait for the AsyncObservable's execution to complete. Any errors that are thrown by this function will be propagated to the promise's rejection handler.

Parameters

ParameterTypeDescription
callback?SubscriberCallback<T>The callback to execute for each value emitted by the generator. This callback will be passed the value as an argument.

Returns

Subscriber<T>

A new Subscriber that can be used to unsubscribe from the AsyncObservable.

Inherited from

AsyncObservable.subscribe


then()

ts
then<TResult1, TResult2>(onfulfilled?, onrejected?): Promise<TResult1 | TResult2>

Returns a promise that will subscribe to the observable and resolve when the subscriber emits its first value. This is useful in cases where you know the observable will emit one and only one value (like the result of a reduce or count operator), but you want to wait for the value to be emitted using await syntax instead of a subscription callback.

When the first value is emitted, the subscriber will immediately be cancelled and all cleanup work will be performed before the promise resolves.

Type Parameters

Type ParameterDefault type
TResult1T
TResult2never

Parameters

ParameterTypeDescription
onfulfilled?null | (value) => TResult1 | PromiseLike<TResult1>Optional callback to execute when the promise resolves successfully
onrejected?null | (reason) => TResult2 | PromiseLike<TResult2>Optional callback to execute when the promise rejects with an error

Returns

Promise<TResult1 | TResult2>

A promise that resolves with the result of the onfulfilled/onrejected handlers

Throws

If the observable completes without emitting any values.

Implementation of

ts
PromiseLike.then

Utilities

from()

ts
static from: <O>(input) => AsyncObservable<ObservedValueOf<O>>;

Method to expose the utility function #from as a static method on AsyncObservable. This is useful for creating an AsyncObservable from a common iterable or stream-like type.

Creates an AsyncObservable from an AsyncObservableInput like object

Type Parameters

Type Parameter
O extends AsyncObservableInput<any>

Parameters

ParameterTypeDescription
inputOThe source to create an AsyncObservable from

Returns

AsyncObservable<ObservedValueOf<O>>

An AsyncObservable that emits the values from the source

Param

The source to create an AsyncObservable from

Returns

An AsyncObservable that emits the values from the source

Inherited from

AsyncObservable.from

Released under the MIT License.