Skip to content

Transforming Data

Transforming data is a core part of eventkit. It happens through the use of operators, which are powerful utility functions that allow you to manipulate, transform, filter, and combine observables (like Streams) in a declarative way. They form a crucial part of the reactive programming patterns that eventkit employs, and enables you to create complex data processing pipelines.

Introduction to Operators

In eventkit, operators are functions that take an observable as input, and return a new observable that (most commonly) subscribes to the provided observable, transforms the data in some way, and yield that transformed data to the subscriber.

For example, the map operator is synonymous to the Array method with the same name.

ts
import { AsyncObservable, map } from "@eventkit/base";

// Array method
[1, 2, 3].map((x) => x * x); // [1, 4, 9]

// Using operators
await AsyncObservable.from([1, 2, 3])
  .pipe(map((x) => x * x))
  .subscribe((x) => console.log("squared", x));

// outputs:
// squared 1
// squared 4
// squared 9
ts
import { Stream, map } from "@eventkit/base";

// Array method
[1, 2, 3].map((x) => x * x); // [1, 4, 9]

// Using operators
const stream = new Stream<number>();
stream.pipe(map((x) => x * x)).subscribe((x) => console.log("squared", x));

stream.push(1);
stream.push(2);
stream.push(3);

await stream.drain();

// outputs:
// squared 1
// squared 4
// squared 9

Chaining Operators

Operators are in-itself ordinary functions, so they could be used like op()(obs). In practice, we tend to pipe many operators together which becomes quickly unreadable.

ts
// yuck 🤮
op4()(op3()(op2()(op1()(obs))));

For that reason AsyncObservable provides a pipe method that allows you to chain operators together. Each operator in the chain takes the output of the previous operator as its input.

ts
obs.pipe(op1(), op2(), op3(), op4());

To give a practical example of what multiple operators chained together looks like, let's say we want to square all the numbers in an observable, filter out the odd numbers, and then sum them all up:

ts
import { AsyncObservable, map, filter, reduce } from "@eventkit/base";

const obs = AsyncObservable.from([-1, 2, 3, 0, 5]);

const processed$ = obs.pipe(
  map((num) => num * num), // Square each number
  filter((num) => num % 2 === 0), // Filter out odd numbers
  reduce((sum, num) => sum + num, 0) // Sum them all
);

await processed$.subscribe(console.log);

// outputs:
// "4"

// calculation:
// [-1, 2, 3, 0, 5] -> [1, 4, 9, 0, 25] (after map)
// -> [4, 0] (after filter, only even numbers remain)
// -> 4 (after reduce, 4 + 0 = 4)

Singleton Operators

The output of some operators is an observable that only emits a single value. For those special cases, the observable that is returned is what is known as a "singleton" observable. (See SingletonAsyncObservable)

All a singleton observable does is extend AsyncObservable class and implement the PromiseLike interface, which means that when it's used in an await statement (or with the then method) it will return a promise that will subscribe to the observable and resolve with the first (and only) value emitted.

This means that when dealing with a singleton observable, all you need to do is await the observable to get the emitted value instead of calling subscribe and hoisting the value out of the callback.

For example, the first operator is a singleton operator that emits the first value of an observable and then completes.

ts
import { AsyncObservable, first } from "@eventkit/base";

const obs = AsyncObservable.from([1, 2, 3]);
const singleton = obs.pipe(first());

// instead of this:
let firstValue: number | undefined;
await obs.subscribe((value) => {
  firstValue = value;
});
console.log(firstValue); // 1

// you can just do this:
console.log(await singleton); // 1

Singleton observables are meant to provide a shorthand for dealing with observables that only emit a single value. They are still observables in every other way, so you can still use methods like pipe, subscribe, drain, etc.

Available Operators

Eventkit provides a variety of built-in operators to handle common transformations. Below is a complete list of all the operators that ship as standard with eventkit.

Join Operators

Transformation Operators

Filtering Operators

Error Handling Operators

Scheduling Operators

Boolean Operators

Aggregation Operators

Creating Custom Operators

Use the pipe() operator to make new operators

If there is a commonly used sequence of operators in your code, you can use the pipe() function to extract the sequence into a new operator. Even if a sequence is not that common, breaking it out into a single operator can improve readability and reusability.

For example, you could make a function that discarded odd values and doubled even values like this:

ts
function doubleEvens() {
  return pipe(
    filter((num: number) => num % 2 === 0),
    map((num) => num * 2)
  );
}

The pipe() operator is analogous to the .pipe() method on AsyncObservable.

TIP

When you use the pipe method, you should provide a type parameter to the first operator in the chainthat represents the type of data that will be emitted by the observable that gets operated on. This allows you to keep end-to-end type safety when hoisting transforms like this.

Creating custom operators

While eventkit provides many built-in operators, you can also create your own custom operators to implement your own transformations. An operator is most commonly a function that takes a source observable and returns a new observable that performs some transformation on the data. You'll likely never need to do this as you can achieve just about any data transformation by chaining multiple of the built-in operators together.

ts
import { AsyncObservable, type OperatorFunction } from "@eventkit/base";

// Custom operator that doubles even numbers and filters out odd numbers
function doubleEvens<T extends number>(): OperatorFunction<T, T> {
  return (source) =>
    // see note below this block on what `source.AsyncObservable` is
    new source.AsyncObservable<T>(async function* () {
      for await (const value of source) {
        if (value % 2 === 0) {
          yield (value * 2) as T;
        }
      }
    });
}

// Usage
const numbers = AsyncObservable.from([1, 2, 3, 4]);
const doubledEvens = numbers.pipe(doubleEvens());

await doubledEvens.subscribe((value) => {
  console.log(`Doubled even: ${value}`);
});

// outputs:
// first yield - nothing logged (odd)
// second yield - "Doubled even: 4"
// third yield - nothing logged (odd)
// fourth yield - "Doubled even: 8"

INFO

source.AsyncObservable is a pattern used to declare that the returned observable is a child of the source observable. This is used for scheduling purposes and allows us to add any work of the child observable to the parent observable's scheduler. More details can be found here.

TIP

In case you get stuck trying to implement your own operator, the source code of all the existing operators are available here.

Released under the MIT License.