Bitspace: Reactive graphs with RxJS
emilwidlund
Earlier this week, I announced Bitspace - A new kind of playground for creative endevours. I'd like to go a bit deeper into the reactivity model of Bitspace, and how it utilizes RxJS observables, observers, subjects & operators to produce complex computations.
RxJS in a nutshell
RxJS is a library for composing asynchronous and event-based programs using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array methods (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
Observables
These are lazy push collections of multiple values. It's similar to a stream where new values can be emitted at any time.
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
This Observable pushes the values 1, 2, 3 immediately (synchronously) when subscribed, and the value 4 after one second has passed since the subscribe call, then completes.
Observers
An Observer consumes values delivered by an Observable. The Observer consists of callbacks, one for each type of notification delivered by the Observable:next
, error
, and complete
.
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
Operators
RxJS is most useful for its operators. Operators are pieces that allow complex asynchronous code to be easily composed in a declarative manner.
import { of, map } from 'rxjs';
of(1, 2, 3)
.pipe(map((x) => x * x))
.subscribe((v) => console.log(`value: ${v}`));
For example, the operator called map is analogous to the Array method of the same name. Just as [1, 2, 3].map(x => x * x) will yield [1, 4, 9], the Observable created above will emit 1, 4, 9.
Subjects
An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
RxJS in Bitspace
Bitspace uses all these concepts in various degrees. Let me walk through the different Bitspace graph concepts, and how they integrate RxJS Observables, Observers, Operators & Subjects.
Inputs
Inputs are so-called BehaviorSubjects. They have a notion of "the current value". It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value" from the BehaviorSubject.
Inputs are able to emit values as it wishes, or get values from incoming connections. Incoming connections will automatically use the BehaviorSubject to emit its carrying values.
Outputs
Outputs are however ReplaySubjects, a kind of Subject which has an internal buffer of previous values.
An output is initialized by passing a source observable. Take a look at Bitspace's ToHSV
Node for example.
import { z } from 'zod';
import { Node, Input, Output, schema } from '@bitspace/circuit';
import { combineLatest, map } from 'rxjs';
import { HSVSchema } from '../../../schemas/HSVSchema';
const NumberSchema = schema('Number', z.number());
export class ToHSV extends Node {
static displayName = 'To HSV';
inputs = {
hue: new Input({
name: 'Hue',
type: NumberSchema,
defaultValue: 0
}),
saturation: new Input({
name: 'Saturation',
type: NumberSchema,
defaultValue: 0.5
}),
value: new Input({
name: 'Value',
type: NumberSchema,
defaultValue: 1
})
};
outputs = {
color: new Output({
name: 'Color',
type: HSVSchema,
observable: combineLatest([this.inputs.hue, this.inputs.saturation, this.inputs.value]).pipe(
map(([hue, saturation, value]) => ({ hue: Math.abs(hue % 360), saturation, value }))
)
})
};
}
As you can see, we're creating an observable via the combineLatest
function from RxJS. It takes an array of inputs, and combines them into an observable source, emitting an array of the last values produced by each input. If the hue changes, it will spit out an array with the last values from each source observable. We use the pipe-method to further enhance it with a map-operator.
The map
operator is very similar to map
for Arrays – using a callback function to transform every emitted observable. In the example above, we're taking hue
and rotate it around 360 using a modulo operation - giving us the remainder of division.
Another example is the Prompt
Node which can be used to interact with an LLM like ChatGPT.
import { z } from "zod";
import { Node, Input, Output, schema } from "@bitspace/circuit";
import { from, switchMap, skip, tap } from "rxjs";
/** Declare a zod schema for value validation */
const StringSchema = schema(z.string());
export class Prompt extends Node {
static displayName = "Prompt";
inputs = {
prompt: new Input({
name: "Prompt",
type: StringSchema,
defaultValue: "What is 56 * 300?",
}),
};
outputs = {
output: new Output({
name: "Output",
type: StringSchema,
observable: this.inputs.prompt.pipe(
switchMap((prompt) =>
from(
// Send the prompt to an endpoint which can carry out the LLM queries
fetch("/api/prompt", {
method: "POST",
body: JSON.stringify({ prompt }),
headers: {
"Content-Type": "application/json",
},
}).then((res) => res.json()),
),
),
),
}),
};
}
By utilizing the switchMap
operator, we can kick off a fetch-request based on the input prompt
with a cancelling effect. If the prompt changes before the fetch
request completes, the switchMap
operator will cancel the request for us.
The from
function turns any Promise
into an Observable source. A perfect solution if you want to turn fetch
requests into Observable sources.
Connections
Finally we have Connections - which are responsible for carrying values between outputs & inputs. They're very simple.
Connections establishes a Subscription to the given Output
and pushes values to the Input
.
// From is the given output which we want to connect to an input
this.subscription = this.from.subscribe((value) => {
try {
// To is the given input
this.to.type.validator.parse(value);
this.to.next(value);
} catch (err) {
this.dispose();
throw new Error("Received a value with an incompatible type");
}
});
In this example, we are also doing value validation - making sure that the value transported is of a correct type & shape.
When connections are removed, we dispose the Subscription to make sure that we free up the resources and avoid memory leaks.
Wrapping Up
RxJS is one of my favourite packages for the JavaScript ecosystem, and it fits perfectly in Bitspace. I hope you can draw some inspirations from this. Learn more about RxJS here or on their official webpage.
If you like my content, please consider supporting me with a paid Subscription.