Source: hyrrokkin_engine_utils/value_iterable.js

var hyrrokkin_engine = hyrrokkin_engine || {};

hyrrokkin_engine.IteratorCombiner = class {

    constructor(input_iterators) {
        this.promises = {}; // map from iterator index to a promise returning the next value from that iterator
        this.input_iterators = input_iterators;
        for(let idx=0; idx<this.input_iterators.length; idx++) {
            this.promises[idx] = this.create_promise(idx);
        }
    }

    create_promise(index) {
        return new Promise((resolve, reject) => {
            this.input_iterators[index].next().then(
                value => {
                    value.index = index;
                    resolve(value);
                }
            );
        }, index);
    }

    async next() {
        while(Object.keys(this.promises).length > 0) {
            let current = await Promise.race(Object.values(this.promises));
            if (current.done) {
                delete this.promises[current.index];
            } else {
                this.promises[current.index] = this.create_promise(current.index);
                return current;
            }
        }
        return {"done":true}
    }
}

hyrrokkin_engine.ValueIteratorCombined = class {

    constructor(parent_iterable, input_iterators, lockstep_threshold) {
        this.parent_iterable = parent_iterable;
        this.subscriber_locks = {};
        this.value = null;
        this.value_available = false;
        this.fetch_required = true;
        this.lockstep_threshold = lockstep_threshold;
        this.subscriber_count = 0;
        this.fetched_count = 0;
        this.input_exhausted = false;
        this.exn = null;

        if (input_iterators.length > 1) {
            this.input_iterator = new hyrrokkin_engine.IteratorCombiner(input_iterators);
        } else {
            this.input_iterator = input_iterators[0];
        }
    }

    async subscribe() {
        this.subscriber_count += 1;
        let subscriber_id = "s" + this.subscriber_count;
        let lock = new hyrrokkin_engine.AsyncLock();
        if (!this.value_available) {
            await lock.acquire();
            if (this.value_available) {
                lock.release();
            }
        }
        this.subscriber_locks[subscriber_id] = lock;
        return subscriber_id;
    }

    unsubscribe(subscriber_id) {
        this.lockstep_threshold -= 1;
        this.subscriber_locks[subscriber_id].release();
        delete this.subscriber_locks[subscriber_id];
        if (this.fetched_count === this.lockstep_threshold) {
            this.fetch_required = true;
        }
    }

    async fetch(subscriber_id) {

        if (this.fetch_required) {
            this.fetch_required = false;
            try {
                let v = await this.input_iterator.next();
                if (!v.done) {
                    this.value = await this.parent_iterable.transform(v.value);
                    this.value_available = true;
                    this.fetched_count = 0;
                } else {
                    this.input_exhausted = true;
                }
            } catch (ex) {
                console.log(ex);
                this.input_exhausted = true;
                this.exn = ex;
            }

            Object.values(this.subscriber_locks).map(lock => lock.release());

        }
        await this.subscriber_locks[subscriber_id].acquire();
        if (this.input_exhausted) {
            return {"done": true};
        }
        if (this.exn !== null) {
            throw this.exn;
        }
        this.fetched_count += 1;
        if (this.fetched_count >= this.lockstep_threshold) {
            this.fetch_required = true;
        }
        return {"value":this.value}
    }
}

hyrrokkin_engine.ValueIterator = class {

    constructor() {
    }
}

hyrrokkin_engine.SyncValueIterator = class extends hyrrokkin_engine.ValueIterator {

    constructor(combined_iterator) {
        super();
        this.combined_iterator = combined_iterator;
        this.subscriber_id = null;
    }

    async next() {
        if (this.subscriber_id === null) {
            this.subscriber_id = await this.combined_iterator.subscribe();
        }
        return await this.combined_iterator.fetch(this.subscriber_id);
    }

    close() {
        this.combined_iterator.unsubscribe(this.subscriber_id);
    }
}

hyrrokkin_engine.DedicatedValueIterator = class extends hyrrokkin_engine.ValueIterator {

    constructor(parent_iterable, input_iterators) {
        super();
        this.parent_iterable = parent_iterable;
        if (input_iterators.length > 1) {
            this.input_iterator = new hyrrokkin_engine.IteratorCombiner(input_iterators);
        } else {
            this.input_iterator = input_iterators[0];
        }
    }

    async next() {
        let r = await this.input_iterator.next();
        if (r.done) {
            return r;
        }

        return {"value": await this.parent_iterable.transform(r.value)};
    }

    close() {
    }
}

/**
 * Implement an async iterable of values with some added functionality.
 *
 * Call hyrrokkin_engine.ValueIterable.create_from_iterables to create an instance of this type.
 *
 * @implements {AsyncIterable}
 *
 * @type {hyrrokkin_engine.ValueIterable}
 *
 */
hyrrokkin_engine.ValueIterable = class {

    constructor(input_iterables, transform_fn, lockstep_threshold) {
        this.input_iterables = input_iterables;
        this.lockstep_threshold = lockstep_threshold;
        if (this.lockstep_threshold > 1) {
            this.combined_iterator = new hyrrokkin_engine.ValueIteratorCombined(this,
                this.input_iterables.map(iterable => iterable[Symbol.asyncIterator]()),
                this.lockstep_threshold);
        } else {
            this.combined_iterator = null;
        }

        this.iterator_count = 0;
        this.transform_fn = transform_fn;
    }

    async transform(value) {
        if (this.transform_fn === null) {
            return value;
        }
        return await this.transform_fn(value);
    }

    /**
     * Return an async iteratable based on a set of input iterables.
     *
     * @param {AsyncIterable[]} input_iterables a list of one or more async iterables that provide input values
     * @param {(transformFunction|asyncTransformFunction)=} transform_fn a function to transform values from the input iterables
     * @param {int=} lockstep_threshold Specify that this many iterators opened over this iterable will be optimised to share a
     *                               single set of iterators obtained from the input iterables.
     *                               However, note that these iterators will yield values in lock-step.
     *                               If more than this many iterators are opened, each subsequent iterator will
     *                               open new input iterators.
     * @return {hyrrokkin_engine.ValueIterable} An async iterable
     */
    static create_from_iterables(input_iterables, transform_fn, lockstep_threshold) {
        return new hyrrokkin_engine.ValueIterable(input_iterables, transform_fn, lockstep_threshold);
    }

    [Symbol.asyncIterator]() {
        this.iterator_count += 1;
        if (this.combined_iterator !== null && this.iterator_count <= this.lockstep_threshold) {
            return new hyrrokkin_engine.SyncValueIterator(this.combined_iterator);
        } else {
            return new hyrrokkin_engine.DedicatedValueIterator(this, this.input_iterables.map(iterable => iterable[Symbol.asyncIterator]()));
        }
    }
}