/*
Hyrrokkin - a library for building and running executable graphs
MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
Permission is hereby granted, free of charge, to any person obtaining a copy of this software
and associated documentation files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* Hyrrokkin Engine Namespace
* @namespace
*/
var hyrrokkin_engine = hyrrokkin_engine || {};
hyrrokkin_engine.AsyncLock = class {
/**
* Implement a simple asynchronous mutex-like lock. Call acquire() to obtain the lock and release() to release it.
*/
constructor () {
this.releases = [];
this.locked = false;
}
dispatch() {
if (this.releases.length > 0 && this.locked === false) {
this.releases.shift()();
this.locked = true;
}
}
/**
* Release this lock so that another may acquire it
*/
release() {
this.locked = false;
this.dispatch();
}
/**
* Acquire this lock, blocking others from acquiring it
*
* @return {Promise<unknown>}
*/
acquire() {
return new Promise(resolve => {
this.releases.push(resolve);
this.dispatch();
});
}
}
/**
* A callback that is invoked for a subscriber when a value is received from a stream
*
* @callback hyrrokkin_engine.ValueStream~subscriberCallback
*
* @param {*} value the value received from the stream
*
* @return {Promise<undefined>}
*/
/**
* A callback that is invoked for a subscriber when a stream is closed
*
* @callback hyrrokkin_engine.ValueStream~closeCallback
*
* @param {boolean} was_cancelled set to true if the stream was interrupted due to an error
*
* @return {Promise<undefined>} a transformed value
*/
/**
* Class representing a Value Stream
*
* @typedef hyrrokkin_engine.ValueStream
*/
hyrrokkin_engine.ValueStream = class {
/**
* Create a value stream base class. Do not call this directly, use the static methods create_from_streams
* or create_source to construct an appropriate subclass
*
* @param {int} activation_threshold if supplied, block publishing onto this stream until this many subscribers are active
*/
constructor(activation_threshold) {
// map from subscriber_id to an async function that receives published values
this.subscribers = {};
// map from subscriber_id to an async function that is called when the stream closes
this.close_fns = {};
// count of subscriptions issued on this stream
this.subscriber_count = 0;
// block publication on this stream until this many subscribers are attached
this.activation_threshold = activation_threshold;
this.active = false;
this.is_closed = false;
this.was_cancelled = false
this.publication_lock = new hyrrokkin_engine.AsyncLock();
this.completion_lock = new hyrrokkin_engine.AsyncLock();
}
async prepare() {
if (this.activation_threshold) {
await this.publication_lock.acquire();
} else {
this.active = true;
}
await this.completion_lock.acquire();
}
/**
* Create a value stream based on 1 or more other value streams. The resulting stream will publish events received on
* any of the input streams.
*
* @param {hyrrokkin_engine.ValueStream[]} input_streams a list of input streams.
* @param {(transformFunction|asyncTransformFunction)=} transform_function an optional function to transform values received on the input streams. May be async or non-async.
* @param {int=} activation_threshold if supplied, block publishing onto this stream until this many subscribers are subscribed
*
* @return {Promise<hyrrokkin_engine.ValueStream>}
*/
static
async create_from_streams(input_streams, transform_function, activation_threshold) {
let stream = new hyrrokkin_engine.TransformStream(activation_threshold, input_streams, transform_function);
await stream.prepare();
return stream;
}
/**
* Create a value source stream. Call the publish method to publish values to this stream.
*
* @param {int=} activation_threshold if supplied, block publishing onto this stream until this many subscribers are active
* @return {Promise<hyrrokkin_engine.ValueStream>}
*/
static
async create_source(activation_threshold) {
let stream = new hyrrokkin_engine.SourceStream(activation_threshold);
await stream.prepare()
return stream;
}
/**
* Activate this stream, unblocking the publish method.
*
* This is not usually called directly, but automatically once enough subscribers are subscribed.
*/
activate() {
if (this.active === false) {
this.active = true;
this.publication_lock.release();
}
}
/**
* Check that values can be published to this stream
*
* @return {boolean} true iff a caller can use the publish method without being blocked or raising an exception
*
* @throws {Error} if this stream does not allow the publish method to be called
*/
can_publish() {
return (this.active && !this.is_closed);
}
/**
* Subscribe to values in this stream, passing in two callback functions
*
* @param {hyrrokkin_engine.ValueStream~subscriberCallback=} subscriber an async function that is invoked with a value published on the stream
* @param {hyrrokkin_engine.ValueStream~closeCallback=} close_fn an optional function that is called when th stream is closed
*
* @return {string} a subscriber-id that is unique to this stream. Pass this to the unsubscribe method to unsubscribe these functions from further values published on the stream
*/
subscribe(subscriber, close_fn) {
let subscriber_id = "s" + this.subscriber_count;
this.subscriber_count += 1;
this.subscribers[subscriber_id] = subscriber;
if (close_fn) {
this.close_fns[subscriber_id] = close_fn;
}
if (this.active === false && (this.activation_threshold > 0) &&
(Object.keys(this.subscribers).length >= this.activation_threshold)) {
this.activate();
}
return subscriber_id;
}
/**
* Unsubscribe from further values published on this stream
*
* @param {string} subscriber_id a subscriber id returned from a call to the subscribe method
*/
unsubscribe(subscriber_id) {
if (subscriber_id in this.subscribers) {
delete this.subscribers[subscriber_id];
}
if (subscriber_id in this.close_fns) {
delete this.close_fns[subscriber_id];
}
}
/**
* Publish a value onto the stream. This call will block until the stream becomes active.
*
* @param {*} value the value to be published
* @return {Promise<boolean>} true if the value was published, false if the stream was closed
*
* @throws {Error} if this stream does not allow the publish method to be called
*/
async publish(value) {
if (this.is_closed) {
return false;
}
await this.publication_lock.acquire();
try {
let promises = [];
for (let subscriber_id in this.subscribers) {
let subscriber = this.subscribers[subscriber_id];
promises.push(subscriber(value));
}
await Promise.all(promises);
} finally {
this.publication_lock.release();
}
return true;
}
/**
* Close this stream. Closing will prevent further values from being published to subscribers. Subscribers will be
* notified if they registered a callback for the close_fn parameter when they called the subscribe method
*
* @param {boolean} was_cancelled True iff this stream is being closed abnormally (due to an error)
*
* @return {Promise<void>}
*/
async close(was_cancelled) {
if (!this.is_closed) {
this.is_closed = true;
this.was_cancelled = was_cancelled;
this.completion_lock.release();
for (let subscrber_id in this.close_fns) {
let close_fn = this.close_fns[subscrber_id];
await close_fn(was_cancelled);
}
}
}
/**
* Block until the stream has closed
*
* @return {Promise<boolean>} return True if the stream was closed abnormally, False if the stream was closed normally.
*/
async waitfor_close() {
await this.completion_lock.acquire();
this.completion_lock.release();
return this.was_cancelled;
}
}
hyrrokkin_engine.TransformStream = class extends hyrrokkin_engine.ValueStream {
constructor(activation_threshold, input_streams, transform_fn) {
super(activation_threshold);
this.input_streams = [];
this.closed_count = 0;
this.cancelled_count = 0;
this.input_stream_count = 0;
this.subscriber_ids = [];
input_streams.forEach(input_stream => {
this.attach_to(input_stream, transform_fn);
});
}
/**
* Attach an input stream and optionally, a transform function which will transform values received from that stream
*
* @param {hyrrokkin_engine.ValueStream} input_stream an input stream
* @param {transformFunction|asyncTransformFunction} transform_fn an optional function to transform values received on the input streams. May be async or non-async.
* @return {string} the subscriber id used to subscribe to the input stream
*/
attach_to(input_stream, transform_fn) {
let subscriber_fn = async (value) => {
if (transform_fn) {
value = await transform_fn(value);
}
await super.publish(value);
}
let close_fn = async (was_cancelled) => {
this.closed_count += 1
if (was_cancelled) {
this.cancelled_count += 1;
}
if (this.closed_count === this.input_stream_count) {
await this.close(was_cancelled = (this.cancelled_count > 0));
}
}
let subscriber_id = input_stream.subscribe(subscriber_fn, close_fn);
this.subscriber_ids.push(subscriber_id);
this.input_streams.push(input_stream);
this.input_stream_count += 1;
return subscriber_id;
}
/**
* Detach from all input streams.
*/
detach() {
for (let idx = 0; idx < this.input_stream_count; idx++) {
this.input_streams[idx].unsubscribe(this.subscriber_ids[idx]);
}
this.input_streams = [];
this.subscriber_ids = [];
this.input_stream_count = 0;
}
/**
* Check that values can be published to this stream
*
* @throws {Error} Always throws an Error - this kind of stream does not allow values to be published to it
*/
can_publish() {
throw new Error("Cannot publish directly to this kind of stream");
}
/**
* Publish a value to this stream
*
* @throws {Error} Always throws an Error - this kind of stream does not allow values to be published to it
*/
async publish(value) {
throw new Error("Cannot publish directly to this kind of stream");
}
}
hyrrokkin_engine.SourceStream = class extends hyrrokkin_engine.ValueStream {
/**
* Create a value stream which enables the caller to then publish values
*
* @param {int=} activation_threshold if supplied, block publishing onto this stream until this many subscribers are active
*/
constructor(activation_threshold) {
super(activation_threshold);
}
}