Hyrrokkin User Guide
Installation
Hyrrokkin is open source software released under the MIT License and is installable as a Python package. For installation instructions and to view the source code, see https://codeberg.org/visual-topology/hyrrokkin.
Introduction - Basic Concepts
Topologies
Hyrrokkin is a python library which manages the execution of computational graphs, termed topologies.
Each topology consists of executable components called nodes.
Topologies are executed using an engine. Two engines are provided, currently:
Python Engine
This engine supports nodes written in python.
Javascript Engine
This engine supports nodes written in Javascript. It can be used if deno (https://deno.com/) is installed or within web-browsers as part of skadi (see https://codeberg.org/visual-topology/skadi).
Nodes, Ports and Links
Nodes communicate with each other via input and output ports.
Nodes are associated with a python or javascript class which implements the node's behaviour according to the hyrrokkin package API.
Most notably, the node's class will implement a run method which transforms a set of values received through that node's input ports into a set of values sent out via the node's output ports.
A topology will also contain links, which connect the output port of one node to the input port of another.
Packages and Package Configurations
Node types and link types are bundled together into a package containing related functionality.
A package must also define a configuration class, a single instance of which will be created when a topology is loaded into an engine for execution.
Configuration instances can be accessed by any node or any other configuration instance. They must also define a factory method called create_node which will be called to create instances of nodes which belong to the package.
Persistent storage
As well as persisting the structure of a topology, Hyrrokkin manages the persistent storage of key-value pairs for each node and configuration within a topology.
There are two types of persistent storage:
- properties - key-value pairs where the values are JSON-serialisable
- data - key-value pairs where the values are binary data
Clients
Clients may be attached to specific nodes and configurations and interact with them by exchanging messages. Messages consist of one or more parts, each part may be binary, text or any JSON-serialisable value.
An Example Package - textgraph
Consider a very simple example package, textgraph. Textgraph implements some simple code for calculating the frequencies of words within text documents.
The textgraph package consists of a configuration class and four node classes:
- TextgraphConfiguration
The configuration class is responsible for creating instances of the nodes defined in this package.
- InputTextNode
Stores input text and outputs this value via its output port data_out.
Clients can send a new value to this node to trigger re-execution of downstream parts of the topology.
- WordFrequencyNode
Expects text input values via its input port data_in, computes the frequencies of each word in the text and outputs this information in table form via output port data_out.
- MergeFrequenciesNode
Merges two frequency tables, either by adding or subtracting the frequencies of each word.
- DisplayTableNode
Receives a tabular via its input port data_in, and communicates those to any attached clients.
Implementing a hyrrokkin package using the package API
The Package Schema
The schema is the central file defining a hyrrokkin package.
Each Package, and the Links and Nodes it contains, is specified in a JSON formatted document that represents the schema of that Package.
{
"id": "textgraph",
"metadata": {
"name": "Text Graph",
"version": "0.0.1",
"description": "a toy example package for analysing text"
},
"node_types": {
"text_input_node": {
"metadata": {
"name": "Text Input",
"description": "Input a text document to be analysed"
},
"output_ports": {
"data_out": {
"link_type": "textgraph:text",
"allow_multiple_connections": true
}
}
},
"word_frequency_node": {
"metadata": {
"name": "Word Frequency",
"description": "Calculate the frequencies of words in the input text"
},
"input_ports": {
"data_in": {
"link_type": "textgraph:text",
"allow_multiple_connections": false
}
},
"output_ports": {
"data_out": {
"link_type": "textgraph:frequency_table",
"allow_multiple_connections": true
}
}
},
"merge_frequencies_node": {
"metadata": {
"name": "Merge Word Frequencies",
"description": "Merge two merge frequencies tables"
},
"input_ports": {
"data_in0": {
"link_type": "textgraph:frequency_table",
"allow_multiple_connections": false
},
"data_in1": {
"link_type": "textgraph:frequency_table",
"allow_multiple_connections": false
}
},
"output_ports": {
"data_out": {
"link_type": "textgraph:frequency_table",
"allow_multiple_connections": true
}
}
},
"table_display_node": {
"metadata": {
"name": "Table Display",
"description": "Display tabular data containing the results of analysing text"
},
"input_ports": {
"data_in": {
"link_type": "textgraph:frequency_table",
"allow_multiple_connections": false
}
}
}
},
"link_types": {
"text": {
"metadata": {
"name": "Text",
"description": "This type of link carries text values"
}
},
"frequency_table": {
"metadata": {
"name": "Frequency Table",
"description": "This type of link carries tables containing word,frequency pairs"
}
}
}
}
When referring to a link type, the package id should be used as a prefix, <package-id>:<link-type-id>. In this example, textgraph:text refers to the link type text defined in the textgraph example package. This allows packages to refer to link types defined in other packages when defining nodes.
The package itself defines:
- a succinct unique identifier for the package
metadataprovides descriptive information, includingname,versionanddescriptionattributes- a
node_typessection declaring the nodes that this package contains - a
link_typessection declaring names for the values that nodes input and output via their ports
Each node declared in the node_types section is associated with the following information:
input_portsandoutput_portsspecify the names and link types of the ports attached to a node- ports cannot accept multiple connections unless the
allow_multiple_connectionsis set to true metadataprovides descriptive information, includingnameanddescriptionattributes
Each Link declared in the link_types section is associated with the following information:
metadataprovides descriptive information, includingnameanddescriptionattributes
Filesystem layout
All files that comprise a packages are stored under a root directory which contains the package schema, named schema.json
Hyrrokkin currently supports packages which implement nodes and configurations using javascript or python, but the interfaces are consistent.
schema.json
python.json
python/
text_input_node.py
word_frequency_node.py
merge_frequencies_node.py
table_display_node.py
textgraph_configuration.py
schema.json
javascript.json
python/
text_input_node.js
word_frequency_node.js
merge_frequencies_node.js
table_display_node.js
textgraph_configuration.js
The file python.json / javascript.json defines how the engine will load the package configuration.
{
"configuration_class": ".python.textgraph_configuration.TextgraphConfiguration"
}
{
"source_paths": [
"javascript/textgraph_configuration.js",
"javascript/text_input_node.js",
"javascript/word_frequency_node.js",
"javascript/merge_frequencies_node.js",
"javascript/table_display_node.js"
]
}
TextgraphConfiguration
A package configuration is implemented as a class with a constructor accepting a services object.
The configuration is required to implement a method for creating node instances.
# Hyrrokkin - a library for building and running executable graphs
#
# MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
from hyrrokkin_engine.configuration_interface import ConfigurationInterface
import json
import re
from .text_input_node import TextInputNode
from .word_frequency_node import WordFrequencyNode
from .table_display_node import TableDisplayNode
from .merge_frequencies_node import MergeFrequenciesNode
class TextgraphConfiguration(ConfigurationInterface):
# https://gist.github.com/sebleier/554280 with modifications
DEFAULT_STOP_WORDS = ["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "can", "will", "just", "dont", "should", "now"]
def __init__(self, services):
self.services = services
self.clients = set()
self.properties = None
self.update_listeners = set()
self.stop_words = []
async def load(self):
keys = await self.services.get_data_keys()
if "stop_words" in keys:
self.stop_words = json.loads((await self.services.get_data("stop_words")).decode())
else:
self.stop_words = TextgraphConfiguration.DEFAULT_STOP_WORDS
def get_stop_words(self):
return self.stop_words
def add_update_listener(self, listener):
self.update_listeners.add(listener)
return listener
def remove_update_listener(self, listener):
self.update_listeners.remove(listener)
async def notify_update_listeners(self):
for update_listener in self.update_listeners:
await update_listener()
async def create_node(self, node_type_id, node_services):
match node_type_id:
case "text_input_node": return TextInputNode(node_services)
case "word_frequency_node": return WordFrequencyNode(node_services)
case "merge_frequencies_node": return MergeFrequenciesNode(node_services)
case "table_display_node": return TableDisplayNode(node_services)
case _: return None
async def encode(self, value, link_type):
if value is not None:
if link_type == "text":
return value.encode("utf-8")
elif link_type == "frequency_table":
return json.dumps(value).encode("utf-8")
return None
async def decode(self, encoded_bytes, link_type):
if encoded_bytes is not None:
if link_type == "text":
return encoded_bytes.decode("utf-8")
elif link_type == "frequency_table":
return json.loads(encoded_bytes.decode("utf-8"))
return None
async def open_client(self, client):
self.clients.add(client)
async def handle_message(stop_words):
self.stop_words = stop_words
await self.services.set_data("stop_words", json.dumps(self.stop_words).encode())
for other_client in self.clients:
if other_client != client:
other_client.send_message(stop_words)
await self.notify_update_listeners()
client.set_message_handler(handle_message)
client.send_message(self.stop_words)
async def close_client(self, client):
self.clients.remove(client)
// Hyrrokkin - a library for building and running executable graphs
//
// MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
var textgraph = textgraph || {};
textgraph.TextgraphConfiguration = class {
// https://gist.github.com/sebleier/554280 with modifications
static DEFAULT_STOP_WORDS = ["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "can", "will", "just", "dont", "should", "now"]
constructor(services) {
this.services = services;
this.clients = new Set();
this.update_listeners = new Set();
}
async load() {
let keys = await this.services.get_data_keys();
if (keys.includes("stop_words")) {
this.stop_words = JSON.parse((new TextDecoder()).decode(await this.services.get_data("stop_words")));
} else {
this.stop_words = textgraph.TextgraphConfiguration.DEFAULT_STOP_WORDS;
}
}
get_stop_words() {
return this.stop_words;
}
add_update_listener(listener) {
this.update_listeners.add(listener);
return listener;
}
remove_update_listener(listener) {
this.update_listeners.delete(listener);
}
async notify_update_listeners() {
const arr = Array.from(this.update_listeners);
for(let idx=0; idx<arr.length; idx++) {
await arr[idx]();
}
}
async create_node(node_type_id, node_services) {
switch (node_type_id) {
case "text_input_node": return new textgraph.TextInputNode(node_services);
case "word_frequency_node": return new textgraph.WordFrequencyNode(node_services);
case "merge_frequencies_node": return new textgraph.MergeFrequenciesNode(node_services);
case "table_display_node": return new textgraph.TableDisplayNode(node_services);
default: return null;
}
}
async open_client(client) {
this.clients.add(client);
let handle_message = async (stop_words) => {
this.stop_words = stop_words;
await this.services.set_data("stop_words", (new TextEncoder()).encode(JSON.stringify(this.stop_words)).buffer);
this.clients.forEach((other_client) => {
if (other_client !== client) {
other_client.send_message(stop_words);
}
});
await this.notify_update_listeners();
}
client.set_message_handler(handle_message);
client.send_message(this.stop_words);
}
async encode(value, link_type) {
if (value !== null) {
if (link_type === "text") {
return (new TextEncoder()).encode(value).buffer;
} else if (link_type === "frequency_table") {
return (new TextEncoder()).encode(JSON.stringify(value)).buffer;
}
}
return null;
}
async decode(encoded_bytes, link_type) {
if (encoded_bytes !== null) {
let txt = (new TextDecoder()).decode(encoded_bytes);
if (link_type === "text") {
return txt;
} else if (link_type === "frequency_table") {
return JSON.parse(txt);
}
}
return null;
}
async close_client(client) {
this.clients.delete(client);
}
}
hyrrokkin_engine.registry.register_configuration_factory("textgraph",(configuration_services) => new textgraph.TextgraphConfiguration(configuration_services));
TextInputNode
When a node is constructed, the constructor is passed a service API object, providing various useful services. This services API is very similar to that passed to a configuration constructor.
Consider the TextInputNode:
# Hyrrokkin - a library for building and running executable graphs
#
# MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
import asyncio
from hyrrokkin_engine.node_interface import NodeInterface
class TextInputNode(NodeInterface):
def __init__(self, services):
self.services = services
self.clients = set()
self.text = ""
async def load(self):
data = await self.services.get_data("value")
if data is None:
self.text = ""
else:
self.text = data.decode()
async def open_client(self, client):
self.clients.add(client)
async def handle_message(value):
if value != self.text:
self.text = value
await self.services.set_data("value", self.text.encode())
for other_client in self.clients:
if other_client != client:
other_client.send_message(self.text)
await self.services.request_run()
client.set_message_handler(handle_message)
client.send_message(self.text)
async def close_client(self, client):
self.clients.remove(client)
async def run(self, inputs):
if self.text:
return {"data_out":self.text}
else:
return {}
// Hyrrokkin - a library for building and running executable graphs
//
// MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
var textgraph = textgraph || {};
textgraph.TextInputNode = class {
constructor(services) {
this.services = services;
this.clients = new Set();
this.text = "";
}
async load() {
let data = await this.services.get_data("value");
if (data === null) {
this.text = "";
} else {
this.text = (new TextDecoder()).decode(data);
}
}
async open_client(client) {
this.clients.add(client);
client.set_message_handler(async (...msg) => await this.handle_message(client, ...msg));
client.send_message(this.text);
}
async close_client(client) {
this.clients.delete(client);
}
async handle_message(from_client, value) {
if (value !== this.text) {
this.text = value;
await this.services.set_data("value", (new TextEncoder()).encode(this.text).buffer);
await this.services.request_run();
this.clients.forEach((other_client) => {
if (other_client !== from_client) {
other_client.send_message(this.text);
}
});
}
}
async run(inputs) {
if (this.text) {
return {"data_out": this.text}
} else {
return {};
}
}
}
This node stores the text to output in a binary data object named value. Nodes use the service APIs get_data and set_data to read and write these data objects.
To communicate with clients, nodes (or configurations) implement open_client and close_client methods. In the example above, the TextInputNode expects messages consisting of a single string value, used to refresh the text stored by the node.
When the node is run, its stored value is output on port data_out
If the value passed by a client is not an integer, the node will issue a warning via the service api set_status. The following set of service APIs related to status updates:
| service API | Purpose |
|---|---|
| set_status(msg,"info") | sets the status as INFORMATIONAL accompanied by message msg |
| set_status(msg,"warning") | sets the status as WARNING accompanied by message msg |
| set_status(msg,"error") | sets the status as ERROR accompanied by message msg |
| set_status("") | clears the status associated with this node |
WordFrequencyNode
This node performs processing on an input text value to produce a simple table data structure containing words and word frequencies. Rows are sorted in order of decreasing frequency.
# Hyrrokkin - a library for building and running executable graphs
#
# MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
import re
from hyrrokkin_engine.node_interface import NodeInterface
class WordFrequencyNode(NodeInterface):
def __init__(self, services):
self.services = services
self.clients = set()
self.properties = None
async def configuration_updated():
await self.services.request_run()
self.update_listener = self.services.get_configuration().add_update_listener(configuration_updated)
async def load(self):
self.properties = await self.services.get_properties()
if "threshold" not in self.properties:
self.properties["threshold"] = 1
async def open_client(self, client):
self.clients.add(client)
async def handle_message(value):
self.properties["threshold"] = value
await self.services.set_properties(self.properties)
for other_client in self.clients:
if other_client != client:
other_client.send_message(value)
await self.services.request_run()
client.set_message_handler(handle_message)
client.send_message(self.properties["threshold"])
async def close_client(self, client):
self.clients.remove(client)
async def run(self, inputs):
if "data_in" in inputs:
input_text = inputs["data_in"]
input_text = input_text.replace("'","")
frequencies = {}
stop_words = self.services.get_configuration().get_stop_words()
words = re.sub(r'[^\w\s]', ' ', input_text).split(' ')
for word in words:
word = word.strip().lower()
if word and word not in stop_words:
if word not in frequencies:
frequencies[word] = 0
frequencies[word] += 1
output = {}
for word in frequencies:
if frequencies[word] >= self.properties["threshold"]:
output[word] = frequencies[word]
return {"data_out": output}
else:
return {}
def remove(self):
self.services.get_configuration().remove_update_listener(self.update_listener)
// Hyrrokkin - a library for building and running executable graphs
//
// MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
var textgraph = textgraph || {};
textgraph.WordFrequencyNode = class {
constructor(services) {
this.services = services;
this.clients = new Set();
this.properties = null;
let configuration_updated = async () => {
await this.services.request_run();
}
this.update_listener = this.services.get_configuration().add_update_listener(configuration_updated);
}
async load() {
this.properties = await this.services.get_properties();
if (!("threshold" in this.properties)) {
this.properties["threshold"] = 1;
}
}
async open_client(client) {
this.clients.add(client);
client.set_message_handler(async (...msg) => await this.handle_message(client, ...msg));
client.send_message(this.properties["threshold"]);
}
async close_client(client) {
this.clients.delete(client);
}
async handle_message(from_client, value) {
this.properties["threshold"] = value;
await this.services.set_properties(this.properties);
this.clients.forEach((other_client) => {
if (other_client !== from_client) {
other_client.send_message(value);
}
});
this.services.request_run();
}
async run(inputs) {
if ("data_in" in inputs) {
let input_text = inputs["data_in"];
input_text = input_text.replaceAll("'","");
let words = input_text.replace(/[^\w\s]/g," ").split(" ");
let frequencies = {};
let stop_words = this.services.get_configuration().get_stop_words();
words.forEach((word) => {
word = word.trim().toLowerCase();
if (word && !stop_words.includes(word)) {
if (!(word in frequencies)) {
frequencies[word] = 0;
}
frequencies[word] += 1;
}
});
let output = {};
for (let word in frequencies) {
if (word) {
if (frequencies[word] >= this.properties.threshold) {
output[word] = frequencies[word];
}
}
}
return {"data_out": output};
} else {
return {};
}
}
remove() {
this.services.get_configuration().remove_update_listener(this.update_listener);
}
}
The node uses an integer stored in a threshold property to ignore low frequency words and the services api get_properties) and set_properties(properties) are used to retrieve and update the properties.
Clients of this node can update this property by sending a new integer value as a message.
Property names must be strings and values must be JSON-serialisable objects.
MergeFrequenciesNode
This node merges two frequency tables. A property called mode is set to either "add" or "subtract" to control its behaviour. Clients of this node can send new values as messages to update the property.
# Hyrrokkin - a library for building and running executable graphs
#
# MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
from hyrrokkin_engine.node_interface import NodeInterface
class MergeFrequenciesNode(NodeInterface):
def __init__(self, services):
self.services = services
self.clients = set()
self.properties = None
async def load(self):
self.properties = await self.services.get_properties()
if "mode" not in self.properties:
self.properties["mode"] = "add"
async def open_client(self, client):
self.clients.add(client)
async def handle_message(value):
self.properties["mode"] = value
await self.services.set_properties(self.properties)
for other_client in self.clients:
if other_client != client:
other_client.send_message(value)
await self.services.request_run()
client.set_message_handler(handle_message)
client.send_message(self.properties["mode"])
async def close_client(self, client):
self.clients.remove(client)
async def run(self, inputs):
if ("data_in0" in inputs or "data_in1" in inputs):
input_0 = inputs.get("data_in0",{})
input_1 = inputs.get("data_in1",{})
output = {}
mode = self.properties["mode"]
for word in input_0:
output[word] = input_0[word]
for word in input_1:
if mode == "add":
output[word] = output.get(word,0) + input_1[word]
else:
output[word] = output.get(word,0) - input_1[word]
return { "data_out":output }
return {}
// Hyrrokkin - a library for building and running executable graphs
//
// MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
var textgraph = textgraph || {};
textgraph.MergeFrequenciesNode = class {
constructor(services) {
this.services = services;
this.clients = new Set();
this.properties = null;
}
async load() {
this.properties = await this.services.get_properties();
if (!("mode" in this.properties)) {
this.properties["mode"] = "add";
}
}
async open_client(client) {
this.clients.add(client);
let handle_message = async (value) => {
this.properties["mode"] = value;
await this.services.set_properties(this.properties);
this.clients.forEach((other_client) => {
if (other_client !== client) {
other_client.send_message(value);
}
});
await this.services.request_run();
}
client.set_message_handler(handle_message);
client.send_message(this.properties["mode"]);
}
async close_client(client) {
this.clients.delete(client);
}
async run(inputs) {
if ("data_in0" in inputs || "data_in1" in inputs) {
let input_0 = inputs["data_in0"] || {};
let input_1 = inputs["data_in1"] || {};
let output = {};
let mode = this.properties["mode"];
for(let word in input_0) {
output[word] = input_0[word];
}
for(let word in input_1) {
if (mode === "add") {
output[word] = (output[word] || 0) + input_1[word];
} else {
output[word] = (output[word] || 0) - input_1[word];
}
}
return {"data_out": output};
}
return {};
}
}
TableDisplayNode
The TableDisplayNode implements some code to read an input table data structure and send it to connected clients.
# Hyrrokkin - a library for building and running executable graphs
#
# MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
import json
from hyrrokkin_engine.node_interface import NodeInterface
class TableDisplayNode(NodeInterface):
def __init__(self, services):
self.services = services
self.clients = set()
self.table = None
async def reset_run(self):
self.table = None
for client in self.clients:
client.send_message(self.table)
async def run(self, inputs):
self.table = None
if "data_in" in inputs:
input_value = inputs["data_in"]
self.table = []
for word in input_value:
self.table.append([word, input_value[word]])
self.table = sorted(self.table, key=lambda r: r[1], reverse=True)
self.services.set_status(f"{len(self.table)} " + "{{rows}}", "info")
else:
self.services.set_status("{{no_data}}", "warning")
for client in self.clients:
client.send_message(self.table)
async def open_client(self, client):
self.clients.add(client)
client.send_message(self.table)
async def close_client(self, client):
self.clients.remove(client)
// Hyrrokkin - a library for building and running executable graphs
//
// MIT License - Copyright (C) 2022-2025 Visual Topology Ltd
var textgraph = textgraph || {};
textgraph.TableDisplayNode = class {
constructor(services) {
this.services = services;
this.clients = new Set();
this.table = null;
}
async reset_run() {
this.table = null;
this.services.set_status("","info");
this.clients.forEach((client) => {
client.send_message(this.table);
});
}
async run(inputs) {
this.table = null;
if ("data_in" in inputs) {
let input_value = inputs["data_in"];
this.table = [];
for(let word in input_value) {
this.table.push([word,input_value[word]]);
}
this.table.sort(function(r1, r2) {
return r2[1] - r1[1];
});
this.services.set_status(`${this.table.length} {{rows}}`, "info");
} else {
this.services.set_status("{{no_data}}", "warning");
}
this.clients.forEach((client) => {
client.send_message(this.table);
});
}
open_client(client) {
this.clients.add(client);
client.send_message(this.table);
}
close_client(client) {
this.clients.delete(client);
}
}
Node lifecycle - the load, reset_run, run and remove methods.
When a topology is loaded, or when any upstream node in the topology is re-run, the node will be constructed and its load method, if implemented, will be called.
As the topology is executed, the node's inputs will be collected and its run method will be called. But, before this happens, the node's reset_run method will be called, if it is implemented. A node can implement this method to inform any clients that the node's current results are invalid and the node will soon be re-run.
The reset_run method is called as soon as the framework is aware that the node's run method will need to be called.
A node may implement a remove method to receive notifications when the node is removed from a topology
The configuration is then accessed by nodes via the get_configuration service method.
For more details on the methods that a node or configuration can implement and on the services API passed to node or configuration constructors, see:
Creating, loading and running topologies using the Hyrrokkin Topology API
Hyrrokkin provides a Python API for creating, running and loading and saving topologies.
For example, lets create and run a simple topology using the textgraph package.
from hyrrokkin.api.topology import Topology
# provide the resource path to the package containing the schema file
textgraph_package = "hyrrokkin.example_packages.textgraph"
t = Topology(execution_folder=tempfile.mkdtemp(),package_list=[textgraph_package])
t.add_node("n0", "textgraph:text_input_node")
t.set_node_data("n0","value","This is some text")
t.add_node("n1", "textgraph:word_frequency_node", properties={"threshold":1})
t.add_node("n2", "textgraph:table_display_node")
t.add_link("l0", "n0", "data_out", "n1", "data_in")
t.add_link("l1", "n1", "data_out", "n2", "data_in")
runner = t.open_runner()
runner.run()
The same topology can be expressed using a YAML file
metadata:
name: test topology
nodes:
n0:
type: textgraph:text_input_node
data:
value: path/to/textfile.txt
n1:
type: word_frequency_node
properties:
threshold: 1
n2:
type: textgraph:table_display_node
links:
- n0:data_out => n1:data_in
- n1:data_out => n2:data_in
This YAML file can then be imported using the following API calls
from hyrrokkin.api.topology import Topology
from hyrrokkin.utils.yaml_importer import import_from_yaml
# provide the resource path to the package containing the schema file
textgraph_package = "hyrrokkin.example_packages.textgraph"
t = Topology(execution_folder=tempfile.mkdtemp(),package_list=[textgraph_package])
import_from_yaml(t,"topology.yaml")
Note that in the links section of the YAML file, where nodes have only one input or output port, the port name can be omitted in the links section:
metadata:
name: test topology
configuration:
...
nodes:
...
links:
- n0 => n1
- n1 => n2
Saving and loading topologies
A topology including its properties and data can be saved to and loaded from a serialised zip format file, using the following API calls. Saving first:
from hyrrokkin.api.topology import Topology
# provide the resource path to the package containing the schema file
textgraph_package = "hyrrokkin.example_packages.textgraph"
t = Topology(execution_folder=tempfile.mkdtemp(),package_list=[textgraph_package])
t.add_node("n0", "textgraph:text_input_node")
t.set_node_data("n0","value","this is some text")
t.add_node("n1", "textgraph:word_frequency_node",properties={"threshold": 1})
t.add_node("n2", "textgraph:table_display_node")
t.add_link("l0", "n0", "data_out", "n1", "data_in")
t.add_link("l1", "n1", "data_out", "n2", "data_in")
with open("topology.zip","wb") as f:
t.save_zip(f)
To load from a saved topology:
from hyrrokkin.api.topology import Topology
# provide the resource path to the package containing the schema file
textgraph_package = "hyrrokkin_example_packages.textgraph"
t = Topology(execution_folder=tempfile.mkdtemp(),package_list=[textgraph_package])
with open("topology.zip","rb") as f:
t.load_zip(f)
A utility function is also provided to export a topology to YAML format. Note that the exported YAML file contains node and configuration properties and contains paths to files containing any data stored by each node and configuration.
from hyrrokkin.api.topology import Topology
from hyrrokkin.utils.yaml_exporter import export_to_yaml
# provide the resource path to the package containing the schema file
textgraph_package = "hyrrokkin_example_packages.textgraph"
t = Topology(execution_folder=tempfile.mkdtemp(),package_list=[textgraph_package])
with open("topology.zip","rb") as f:
t.load(f)
with open("topology.yaml","w") as f:
export_to_yaml(t,f)
For full details on the topology API, see:
loading, saving and running topologies using topology_runner CLI
The Hyrrokkin package will install a hyrrokkin CLI command. Some typical usages include:
Import a topology from zip and run it:
hyrrokkin --packages hyrrokkin_example_packages.textgraph \
--execution-folder /tmp/execution_test \
--import-path topology.zip --run
Import a topology from yaml, run it and save the topology (including data) to a zip file:
hyrrokkin --packages hyrrokkin_example_packages.textgraph \
--execution-folder /tmp/execution_test \
--import-path topology.yaml \
--run --export-path topology.zip
Convert a topology from zip format to yaml format, but do not run it:
hyrrokkin --packages hyrrokkin_example_packages.textgraph \
--execution-folder /tmp/execution_test \
--import-path topology.zip \
--export-path topology.yaml
Using the Hyrrokkin expression parser
Often nodes need to work with string-based expressions, for example:
r * sin(theta)
Hyrrokkin provides a simple expression based parser which can be set up to parse simple string based expressions into a parse tree.
from hyrrokkin_engine_utils.expression_parser import ExpressionParser
import json
ep = ExpressionParser()
ep.add_binary_operator("*",1)
print(json.dumps(ep.parse("10 * sin(pi)"),indent=2))
This program will print:
{
"operator": "*",
"args": [
{
"literal": 10
},
{
"function": "sin",
"args": [
{
"name": "pi"
}
]
}
]
}
Parser limitations
- unary and binary operators must be explicitly registered with the parser
- unary operators have higher precedence than binary operators
- binary operators must be registered with a precedence