Bitspace: Reactive graphs with RxJS

emilwidlund

emilwidlund

Frame 9.png

Earlier this week, I announced Bitspace - A new kind of playground for creative endevours. I'd like to go a bit deeper into the reactivity model of Bitspace, and how it utilizes RxJS observables, observers, subjects & operators to produce complex computations.

RxJS in a nutshell

RxJS is a library for composing asynchronous and event-based programs using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array methods (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.

Observables

These are lazy push collections of multiple values. It's similar to a stream where new values can be emitted at any time.

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

This Observable pushes the values 1, 2, 3 immediately (synchronously) when subscribed, and the value 4 after one second has passed since the subscribe call, then completes.

Observers

An Observer consumes values delivered by an Observable. The Observer consists of callbacks, one for each type of notification delivered by the Observable:next, error, and complete.

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

Operators

RxJS is most useful for its operators. Operators are pieces that allow complex asynchronous code to be easily composed in a declarative manner.

import { of, map } from 'rxjs';

of(1, 2, 3)
  .pipe(map((x) => x * x))
  .subscribe((v) => console.log(`value: ${v}`));

For example, the operator called map is analogous to the Array method of the same name. Just as [1, 2, 3].map(x => x * x) will yield [1, 4, 9], the Observable created above will emit 1, 4, 9.

Subjects

An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.

RxJS in Bitspace

Bitspace uses all these concepts in various degrees. Let me walk through the different Bitspace graph concepts, and how they integrate RxJS Observables, Observers, Operators & Subjects.

Inputs

Inputs are so-called BehaviorSubjects. They have a notion of "the current value". It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value" from the BehaviorSubject.

Inputs are able to emit values as it wishes, or get values from incoming connections. Incoming connections will automatically use the BehaviorSubject to emit its carrying values.

Outputs

Outputs are however ReplaySubjects, a kind of Subject which has an internal buffer of previous values.

An output is initialized by passing a source observable. Take a look at Bitspace's ToHSV Node for example.

Screenshot 2024-01-30 at 22.54.50.png

import { z } from 'zod';
import { Node, Input, Output, schema } from '@bitspace/circuit';
import { combineLatest, map } from 'rxjs';
import { HSVSchema } from '../../../schemas/HSVSchema';

const NumberSchema = schema('Number', z.number());

export class ToHSV extends Node {
    static displayName = 'To HSV';

    inputs = {
        hue: new Input({
            name: 'Hue',
            type: NumberSchema,
            defaultValue: 0
        }),
        saturation: new Input({
            name: 'Saturation',
            type: NumberSchema,
            defaultValue: 0.5
        }),
        value: new Input({
            name: 'Value',
            type: NumberSchema,
            defaultValue: 1
        })
    };

    outputs = {
        color: new Output({
            name: 'Color',
            type: HSVSchema,
            observable: combineLatest([this.inputs.hue, this.inputs.saturation, this.inputs.value]).pipe(
                map(([hue, saturation, value]) => ({ hue: Math.abs(hue % 360), saturation, value }))
            )
        })
    };
}

As you can see, we're creating an observable via the combineLatest function from RxJS. It takes an array of inputs, and combines them into an observable source, emitting an array of the last values produced by each input. If the hue changes, it will spit out an array with the last values from each source observable. We use the pipe-method to further enhance it with a map-operator.

The map operator is very similar to map for Arrays – using a callback function to transform every emitted observable. In the example above, we're taking hue and rotate it around 360 using a modulo operation - giving us the remainder of division.

Another example is the Prompt Node which can be used to interact with an LLM like ChatGPT.

import { z } from "zod";
import { Node, Input, Output, schema } from "@bitspace/circuit";
import { from, switchMap, skip, tap } from "rxjs";

/** Declare a zod schema for value validation */
const StringSchema = schema(z.string());

export class Prompt extends Node {
  static displayName = "Prompt";

  inputs = {
    prompt: new Input({
      name: "Prompt",
      type: StringSchema,
      defaultValue: "What is 56 * 300?",
    }),
  };

  outputs = {
    output: new Output({
      name: "Output",
      type: StringSchema,
      observable: this.inputs.prompt.pipe(
        switchMap((prompt) =>
          from(
            // Send the prompt to an endpoint which can carry out the LLM queries
            fetch("/api/prompt", {
              method: "POST",
              body: JSON.stringify({ prompt }),
              headers: {
                "Content-Type": "application/json",
              },
            }).then((res) => res.json()),
          ),
        ),
      ),
    }),
  };
}

By utilizing the switchMap operator, we can kick off a fetch-request based on the input prompt with a cancelling effect. If the prompt changes before the fetch request completes, the switchMap operator will cancel the request for us.

The from function turns any Promise into an Observable source. A perfect solution if you want to turn fetch requests into Observable sources.

Connections

Finally we have Connections - which are responsible for carrying values between outputs & inputs. They're very simple.

Connections establishes a Subscription to the given Output and pushes values to the Input.

// From is the given output which we want to connect to an input
this.subscription = this.from.subscribe((value) => {
  try {
    // To is the given input
    this.to.type.validator.parse(value);
    this.to.next(value);
  } catch (err) {
    this.dispose();
    throw new Error("Received a value with an incompatible type");
  }
});

In this example, we are also doing value validation - making sure that the value transported is of a correct type & shape.

When connections are removed, we dispose the Subscription to make sure that we free up the resources and avoid memory leaks.

Wrapping Up

RxJS is one of my favourite packages for the JavaScript ecosystem, and it fits perfectly in Bitspace. I hope you can draw some inspirations from this. Learn more about RxJS here or on their official webpage.

If you like my content, please consider supporting me with a paid Subscription.