var hyrrokkin_engine = hyrrokkin_engine || {};
hyrrokkin_engine.IteratorCombiner = class {
constructor(input_iterators) {
this.promises = {}; // map from iterator index to a promise returning the next value from that iterator
this.input_iterators = input_iterators;
for(let idx=0; idx<this.input_iterators.length; idx++) {
this.promises[idx] = this.create_promise(idx);
}
}
create_promise(index) {
return new Promise((resolve, reject) => {
this.input_iterators[index].next().then(
value => {
value.index = index;
resolve(value);
}
);
}, index);
}
async next() {
while(Object.keys(this.promises).length > 0) {
let current = await Promise.race(Object.values(this.promises));
if (current.done) {
delete this.promises[current.index];
} else {
this.promises[current.index] = this.create_promise(current.index);
return current;
}
}
return {"done":true}
}
}
hyrrokkin_engine.ValueIteratorCombined = class {
constructor(parent_iterable, input_iterators, lockstep_threshold) {
this.parent_iterable = parent_iterable;
this.subscriber_locks = {};
this.value = null;
this.value_available = false;
this.fetch_required = true;
this.lockstep_threshold = lockstep_threshold;
this.subscriber_count = 0;
this.fetched_count = 0;
this.input_exhausted = false;
this.exn = null;
if (input_iterators.length > 1) {
this.input_iterator = new hyrrokkin_engine.IteratorCombiner(input_iterators);
} else {
this.input_iterator = input_iterators[0];
}
}
async subscribe() {
this.subscriber_count += 1;
let subscriber_id = "s" + this.subscriber_count;
let lock = new hyrrokkin_engine.AsyncLock();
if (!this.value_available) {
await lock.acquire();
if (this.value_available) {
lock.release();
}
}
this.subscriber_locks[subscriber_id] = lock;
return subscriber_id;
}
unsubscribe(subscriber_id) {
this.lockstep_threshold -= 1;
this.subscriber_locks[subscriber_id].release();
delete this.subscriber_locks[subscriber_id];
if (this.fetched_count === this.lockstep_threshold) {
this.fetch_required = true;
}
}
async fetch(subscriber_id) {
if (this.fetch_required) {
this.fetch_required = false;
try {
let v = await this.input_iterator.next();
if (!v.done) {
this.value = await this.parent_iterable.transform(v.value);
this.value_available = true;
this.fetched_count = 0;
} else {
this.input_exhausted = true;
}
} catch (ex) {
console.log(ex);
this.input_exhausted = true;
this.exn = ex;
}
Object.values(this.subscriber_locks).map(lock => lock.release());
}
await this.subscriber_locks[subscriber_id].acquire();
if (this.input_exhausted) {
return {"done": true};
}
if (this.exn !== null) {
throw this.exn;
}
this.fetched_count += 1;
if (this.fetched_count >= this.lockstep_threshold) {
this.fetch_required = true;
}
return {"value":this.value}
}
}
hyrrokkin_engine.ValueIterator = class {
constructor() {
}
}
hyrrokkin_engine.SyncValueIterator = class extends hyrrokkin_engine.ValueIterator {
constructor(combined_iterator) {
super();
this.combined_iterator = combined_iterator;
this.subscriber_id = null;
}
async next() {
if (this.subscriber_id === null) {
this.subscriber_id = await this.combined_iterator.subscribe();
}
return await this.combined_iterator.fetch(this.subscriber_id);
}
close() {
this.combined_iterator.unsubscribe(this.subscriber_id);
}
}
hyrrokkin_engine.DedicatedValueIterator = class extends hyrrokkin_engine.ValueIterator {
constructor(parent_iterable, input_iterators) {
super();
this.parent_iterable = parent_iterable;
if (input_iterators.length > 1) {
this.input_iterator = new hyrrokkin_engine.IteratorCombiner(input_iterators);
} else {
this.input_iterator = input_iterators[0];
}
}
async next() {
let r = await this.input_iterator.next();
if (r.done) {
return r;
}
return {"value": await this.parent_iterable.transform(r.value)};
}
close() {
}
}
/**
* Implement an async iterable of values with some added functionality.
*
* Call hyrrokkin_engine.ValueIterable.create_from_iterables to create an instance of this type.
*
* @implements {AsyncIterable}
*
* @type {hyrrokkin_engine.ValueIterable}
*
*/
hyrrokkin_engine.ValueIterable = class {
constructor(input_iterables, transform_fn, lockstep_threshold) {
this.input_iterables = input_iterables;
this.lockstep_threshold = lockstep_threshold;
if (this.lockstep_threshold > 1) {
this.combined_iterator = new hyrrokkin_engine.ValueIteratorCombined(this,
this.input_iterables.map(iterable => iterable[Symbol.asyncIterator]()),
this.lockstep_threshold);
} else {
this.combined_iterator = null;
}
this.iterator_count = 0;
this.transform_fn = transform_fn;
}
async transform(value) {
if (this.transform_fn === null) {
return value;
}
return await this.transform_fn(value);
}
/**
* Return an async iteratable based on a set of input iterables.
*
* @param {AsyncIterable[]} input_iterables a list of one or more async iterables that provide input values
* @param {(transformFunction|asyncTransformFunction)=} transform_fn a function to transform values from the input iterables
* @param {int=} lockstep_threshold Specify that this many iterators opened over this iterable will be optimised to share a
* single set of iterators obtained from the input iterables.
* However, note that these iterators will yield values in lock-step.
* If more than this many iterators are opened, each subsequent iterator will
* open new input iterators.
* @return {hyrrokkin_engine.ValueIterable} An async iterable
*/
static create_from_iterables(input_iterables, transform_fn, lockstep_threshold) {
return new hyrrokkin_engine.ValueIterable(input_iterables, transform_fn, lockstep_threshold);
}
[Symbol.asyncIterator]() {
this.iterator_count += 1;
if (this.combined_iterator !== null && this.iterator_count <= this.lockstep_threshold) {
return new hyrrokkin_engine.SyncValueIterator(this.combined_iterator);
} else {
return new hyrrokkin_engine.DedicatedValueIterator(this, this.input_iterables.map(iterable => iterable[Symbol.asyncIterator]()));
}
}
}