Skip to content

Topology api

Creating, loading and running topologies

Topology

Source code in hyrrokkin/api/topology.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
class Topology:


    def __init__(self, execution_folder:str, package_list: list[str]):
        """
        Create a topology

        Args:
            execution_folder: the folder used to store the topology definition and files
            package_list: a list of the paths to python packages containing schemas (a schema.json)


        """
        self.execution_folder = execution_folder

        self.logger = logging.getLogger("topology")

        if self.execution_folder:
            os.makedirs(self.execution_folder, exist_ok=True)

        self.schema = Schema()
        for package in package_list:
            self.schema.load_package(package)

        self.network = Network(self.schema, self.execution_folder)

        # the empty flag indicates that the topology contains no nodes and no
        # package properties or package data has been assigned
        self.empty = True

        # track which runners have been opened and are not yet closed
        self.read_only_runners = set()
        self.read_write_runners = set() # at most 1

    def load_zip(self, from_file: io.BytesIO) -> dict:
        """
        Load a topology from a binary stream

        Args:
            from_file: a binary stream, opened for reading

        Returns:
            a dictionary containing any node renamings performed to avoid id collisions with existing nodes
        """
        (added_node_ids, added_link_ids, node_renamings) = self.network.load_zip(from_file, merging=not self.empty)
        self.empty = False
        return node_renamings

    def load_dir(self):
        """
        Load a topology from the execution folder
        """
        (added_node_ids, added_link_ids, node_renamings) = self.network.load_dir({})
        self.empty = False

    def save_zip(self, to_file: io.BufferedWriter=None) ->Union[None,bytes]:
        """
        Save a topology to a binary stream

        Args:
            to_file: an opened binary file to which the topology will be saved, if provided

        Returns:
            if to_file is not provided, returns a bytes object containing the saved topology
        """
        return self.network.save_zip(to_file)

    def open_runner(self, status_event_handler: Callable[[str, str, str, str], None] = None,
                 execution_event_handler: Callable[[Union[float,None], str, str, Union[Dict, Exception, None], bool], None] = None,
                 engine_launcher: Union[EngineLauncher,None]=None,
                 read_only: bool = False) -> TopologyRunner:
        """
        Create a runner to run the topology

        Args:
            status_event_handler: specify a function to call when a node/configuration sets its status
                                 passing parameters target_id, target_type, msg, status
            execution_event_handler: specify a function to call when a node changes its execution status
                                passing parameters timestamp, node_id, state, exception, is_manual
            engine_launcher: the engine_launcher to use to run the topology in a remote process.  if not specified, select an appropriate one
                             for the packages loaded
            read_only: if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology

        Returns: a TopologyInteractor instance that allows the execution to be stopped and clients to be attached and detached
        """
        if not read_only:
            if not self.execution_folder:
                raise ValueError("An execution folder must be associated with this topology for it to be run with read_only set to False")

        if len(self.read_write_runners) > 0:
            raise ValueError("An open runner with read-write access exists")

        if not read_only:
            if len(self.read_only_runners) > 0:
                raise ValueError("Open runner(s) with read-only access exists")

        runner = TopologyRunner(network=self.network, schema=self.schema, execution_folder=self.execution_folder,
                                         engine_launcher=engine_launcher, status_event_handler=status_event_handler,
                                         execution_event_handler=execution_event_handler, read_only=read_only)

        if read_only:
            self.read_only_runners.add(runner)
            runner.set_close_callback(lambda: self.read_only_runners.remove(runner))
        else:
            self.read_write_runners.add(runner)
            runner.set_close_callback(lambda: self.read_write_runners.remove(runner))

        return runner

    def set_metadata(self, metadata: dict[str, str]):
        """
        Set metadata for this topology

        Args:
            metadata: a dictionary containing metadata, consisting of string keys and values.

        Notes:
            the following keys will be understood by hyrrokkin based tools - version, description, authors
        """
        self.network.set_metadata(metadata)

    @check_not_running
    def set_configuration(self, package_id: str, properties: dict):
        """
        Set the properties of a package's configuration

        Args:
            package_id: the id of the packahe
            properties: a dictionary containing the configuration properties, must be JSON serialisable.
        """
        dsu = self.network.get_configuration_datastore(package_id)
        dsu.set_properties(properties)
        self.empty = False

    def add_node(self, node_id: str, node_type: str, properties: dict[str, JsonType]={},
                 metadata:dict[str, JsonType]={}, x:int=0, y:int=0) -> str:
        """
        Add a node to the topology

        Args:
            node_id: the node's unique identifier, must not already exist within the topology
            node_type: the type of the node, a string of the form package_id:node_type_id
            properties: dictionary containing the node's property names and values, must be JSON serialisable
            metadata: a dictionary containing the new metadata
            x: the new x-coordinate value
            y: the new y-coordinate value
        """
        if self.network.get_node(node_id) is not None:
            raise InvalidNodeError(f"Node with id {node_id} already exists")

        node = Node(node_id, node_type, x=x, y=y, metadata=metadata)
        self.network.add_node(node)
        self.network.get_node_datastore(node_id).set_properties(properties)
        self.empty = False
        return node_id

    def remove_node(self, node_id: str):
        """
        Remove a node from the topology

        Args:
            node_id: the node's unique identifier
        """
        if self.network.get_node(node_id) is None:
            raise InvalidNodeError(f"Node with id {node_id} does not exist")
        self.network.remove_node(node_id)

    def update_node_position(self, node_id:str, x:int, y:int):
        """
        Update a node's position

        Args:
            node_id: the id of the node to update
            x: the new x-coordinate value
            y: the new y-coordinate value
        """
        self.network.move_node(node_id, x, y)

    @check_not_running
    def get_node_property(self, node_id: str, property_name: str) -> JsonType:
        """
        Gets the property of a node, or None if the node's property is not set

        Args:
            node_id: the node's identifier
            property_name: the name of the property
        """
        dsu = self.network.get_node_datastore(node_id)
        return dsu.get_property(property_name)

    @check_not_running
    def set_node_property(self, node_id: str, property_name: str, property_value: JsonType):
        """
        Update the property of a node

        Args:
            node_id: the node's identifier
            property_name: the name of the property
            property_value: the value of the property, must be JSON serialisable
        """
        dsu = self.network.get_node_datastore(node_id)
        dsu.set_property(property_name, property_value)

    @check_not_running
    def get_node_data(self, node_id: str, key: str) -> Union[bytes, str, None]:
        """
        Get binary or string data associated with this node.

        Args:
            node_id: node identifier
            key: a key to locate the data (can only contain alphanumeric characters and underscores)

        Returns:
            data or None if no data is associated with the key
        """
        dsu = self.network.get_node_datastore(node_id)
        return dsu.get_data_sync(key)

    @check_not_running
    def set_node_data(self, node_id: str, key: str, data: Union[bytes, str, None]):
        """
        Set binary or string data associated with this node.

        Args:
            node_id: node identifier
            key: a key to locate the data (can only contain alphanumeric characters and underscores)
            data: data to be stored
        """
        dsu = self.network.get_node_datastore(node_id)
        dsu.set_data_sync(key, data)

    @check_not_running
    def get_package_property(self, package_id: str, property_name: str) -> JsonType:
        """
        Gets the property of a package, or None if the package or package property is not set

        Args:
            package_id: the package's identifier
            property_name: the name of the property

        Returns:
            property value or None if no such property exists
        """
        dsu = self.network.get_configuration_datastore(package_id)
        return dsu.get_property(property_name)

    @check_not_running
    def set_package_property(self, package_id: str, property_name: str, property_value: JsonType):
        """
        Update the property of a package

        Args:
            package_id: the packae's identifier
            property_name: the name of the property
            property_value: the value of the property, must be JSON serialisable
        """
        dsu = self.network.get_configuration_datastore(package_id)
        dsu.set_property(property_name, property_value)
        self.empty = False

    @check_not_running
    def get_package_data(self, package_id: str, key: str) -> Union[bytes, str, None]:
        """
        Get binary or string data associated with a package configuration.

        Args:
            package_id: package identifier
            key: a key to locate the data (can only contain alphanumeric characters and underscores)

        Returns:
            data or None if no data is associated with the key
        """
        dsu = self.network.get_configuration_datastore(package_id)
        return dsu.get_data_sync(key)

    @check_not_running
    def set_package_data(self, package_id: str, key: str, data: Union[bytes, str, None]):
        """
        Set binary or string data associated with this node.

        Args:
            package_id: package identifier
            key: a key to locate the data (can only contain alphanumeric characters and underscores)
            data: data to be stored
        """
        dsu = self.network.get_configuration_datastore(package_id)
        dsu.set_data_sync(key, data)
        self.empty = False

    def add_link(self, link_id: str, from_node_id: str, from_port: Union[str, None], to_node_id: str,
                 to_port: Union[str, None]):
        """
        Add a link to the topology

        Args:
            link_id: a unique identifier for the link
            from_node_id: node id of the source node
            from_port: port name on the source node, can be omitted if the "from" node has only one output port
            to_node_id: node id of the destination node
            to_port: port name on the destination node, can be ommitted if the "to" node has only one input port

        Raises:
            InvalidLinkError: if the link cannot be added
        """

        if self.network.get_link(link_id) is not None:
            raise InvalidLinkError(f"Link with id {link_id} already exists")

        from_node = self.network.get_node(from_node_id)
        if from_node is None:
            raise InvalidLinkError(f"{from_node_id} does not exist")

        from_node_type_name = from_node.get_node_type()
        from_node_type = self.schema.get_node_type(from_node_type_name)

        to_node = self.network.get_node(to_node_id)
        if to_node is None:
            raise InvalidLinkError(f"{to_node_id} does not exist")
        to_node_type_name = to_node.get_node_type()
        to_node_type = self.schema.get_node_type(to_node_type_name)

        if from_port is None:
            if len(from_node_type.output_ports) == 1:
                from_port = next(iter(from_node_type.output_ports))
            else:
                output_port_names = ",".join(list(from_node_type.output_ports.keys()))
                raise InvalidLinkError(f"from_port not specified for link, should be one of ({output_port_names})")
        else:
            if from_port not in from_node_type.output_ports:
                raise InvalidLinkError(f"{from_port} is not a valid output port for node {from_node_id}")

        if to_port is None:
            if len(to_node_type.input_ports) == 1:
                to_port = next(iter(to_node_type.input_ports))
            else:
                input_port_names = ",".join(list(to_node_type.input_ports.keys()))
                raise InvalidLinkError(f"to_port not specified for link, should be one of ({input_port_names})")
        else:
            if to_port not in to_node_type.input_ports:
                raise InvalidLinkError(f"{to_port} is not a valid output port for node {to_node_id}")

        from_link_type = from_node_type.output_ports[from_port].link_type

        to_link_type = to_node_type.input_ports[to_port].link_type

        if from_link_type != to_link_type:
            raise InvalidLinkError(f"incompatible link types (from: {from_link_type}, to: {to_link_type})")

        if not from_node_type.output_ports[from_port].allows_multiple_connections():
            if len(self.network.get_outputs_from(from_node_id, from_port)) > 0:
                raise InvalidLinkError(
                    f"output port {from_node_id}/{from_port} is already connected and does not allow multiple connections")

        if not to_node_type.input_ports[to_port].allows_multiple_connections():
            if len(self.network.get_inputs_to(to_node_id, to_port)) > 0:
                raise InvalidLinkError(
                    f"input port {to_node_id}/{to_port} is already connected and does not allow multiple connections")

        link = Link(link_id, from_node_id, from_port, to_node_id, to_port, from_link_type)
        self.network.add_link(link)

    def remove_link(self, link_id: str):
        """
        Remove a link from the topology

        Args:
            link_id: the link's unique identifier
        """
        if self.network.get_link(link_id) is None:
            raise InvalidNodeError(f"Link with id {link_id} does not exist")
        else:
            self.network.remove_link(link_id)

    def get_package_ids(self) -> list[str]:
        """
        Gets the ids of all packages

        Returns:
             list of package ids
        """
        return list(self.schema.get_packages().keys())

    def get_node_ids(self) -> list[str]:
        """
        Get the ids of all nodes in the topology

        Returns:
            list of node ids
        """
        return self.network.get_node_ids()

    def get_node_type(self, node_id: str) -> tuple[str, str]:
        """
        Get the node package and type for a given node

        Args:
            node_id: the id of the node to retrieve

        Returns:
            tuple (package_id, node_type_id)
        """
        node = self.network.get_node(node_id)
        node_type = node.get_node_type()
        return self.schema.split_descriptor(node_type)

    def serialise_node(self, node_id:str) -> dict[str, JsonType]:
        """
        Serialise a node to a dictionary with self-explanatory keys

        Args:
            node_id: the id of the node to serialise

        Returns:
            a dictionary describing the node
        """
        node = self.network.get_node(node_id)
        d = {}
        d["node_id"] = node_id
        d["node_type"] = node.get_node_type()
        (x, y) = node.get_xy()
        d["x"] = x
        d["y"] = y
        d["metadata"] = node.get_metadata()
        return d

    def serialise_link(self, link_id:str) -> dict[str, JsonType]:
        """
        Serialise a link to a dictionary with self-explanatory keys

        Args:
            link_id: the id of the link to serialise

        Returns:
            a dictionary describing the link
        """
        link = self.network.get_link(link_id)
        msg = {}
        msg["link_id"] = link_id
        msg["link_type"] = link.get_link_type()
        msg["from_node"] = link.from_node_id
        msg["from_port"] = link.from_port
        msg["to_node"] = link.to_node_id
        msg["to_port"] = link.to_port
        return msg

    def serialise(self) -> dict[str,JsonType]:
        """
        Serialise the topology to a dictionary without data/properties

        Returns:
            a dictionary describing the topoology
        """

        return self.network.save()

    def get_node_metadata(self, node_id: str) -> dict[str, JsonType]:
        """
        Get the metadata of a node

        Args:
            node_id: the id of the node

        Returns:
            A dictionary containing the metadata
        """
        return self.network.get_node(node_id).get_metadata()

    def update_node_metadata(self, node_id:str, metadata:dict[str,JsonType]):
        """
        Updates the metadata of a node

        Args:
            node_id: the id of the node
            metadata: a dictionary containing the new metadata
        """
        node = self.network.get_node(node_id)
        node.update_metadata(metadata)

    def get_link_ids(self) -> list[str]:
        """
        Get the ids of all links in the topology

        Returns:
            list of link ids
        """
        return self.network.get_link_ids()

    def get_link(self, link_id:str) -> tuple[str, str, str, str]:
        """
        Get the link details for a given link

        Args:
            link_id: the id of the link to retrieve

        Returns:
            tuple (from_node_id,from_port,to_node_id,to_port)
        """
        link = self.network.get_link(link_id)
        return (link.from_node_id, link.from_port, link.to_node_id, link.to_port)

    def get_output_port_names(self, node_id: str) -> list[str]:
        """
        Get the output port names for a given node

        Args:
            node_id: the id of the node

        Returns:
            list of output port names
        """
        node = self.network.get_node(node_id)
        node_type = self.schema.get_node_type(node.get_node_type())
        return [name for (name, _) in node_type.get_output_ports()]

    def get_input_port_names(self, node_id: str) -> list[str]:
        """
        Get the input port names for a given node

        Args:
            node_id: the id of the node

        Returns:
            list of input port names
        """
        node = self.network.get_node(node_id)
        node_type = self.schema.get_node_type(node.get_node_type())
        return [name for (name, _) in node_type.get_input_ports()]

    def get_metadata(self) -> dict[str, JsonType]:
        """
        Get the metadata of the topology

        Returns:
            A dictionary containing the metadata
        """
        return self.network.get_metadata()

    @check_not_running
    def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
        """
        Get the properties for the specified node

        Args:
            node_id: the node identifier

        Returns:
            A dictionary containing the properties defined for that node
        """
        dsu = self.network.get_node_datastore(node_id)
        return dsu.get_properties()

    @check_not_running
    def get_package_properties(self, package_id: str) -> dict[str, JsonType]:
        """
        Get the properties for the specified package

        Args:
            package_id: the package identifier

        Returns:
            A dictionary containing the properties defined for that package
        """
        dsu = self.network.get_configuration_datastore(package_id)
        return dsu.get_properties()

    def clear(self):
        """
        Remove all nodes and links from the topology
        """
        self.network.clear()

__init__(execution_folder, package_list)

Create a topology

Parameters:

Name Type Description Default
execution_folder str

the folder used to store the topology definition and files

required
package_list list[str]

a list of the paths to python packages containing schemas (a schema.json)

required
Source code in hyrrokkin/api/topology.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(self, execution_folder:str, package_list: list[str]):
    """
    Create a topology

    Args:
        execution_folder: the folder used to store the topology definition and files
        package_list: a list of the paths to python packages containing schemas (a schema.json)


    """
    self.execution_folder = execution_folder

    self.logger = logging.getLogger("topology")

    if self.execution_folder:
        os.makedirs(self.execution_folder, exist_ok=True)

    self.schema = Schema()
    for package in package_list:
        self.schema.load_package(package)

    self.network = Network(self.schema, self.execution_folder)

    # the empty flag indicates that the topology contains no nodes and no
    # package properties or package data has been assigned
    self.empty = True

    # track which runners have been opened and are not yet closed
    self.read_only_runners = set()
    self.read_write_runners = set() # at most 1

Add a link to the topology

Parameters:

Name Type Description Default
link_id str

a unique identifier for the link

required
from_node_id str

node id of the source node

required
from_port Union[str, None]

port name on the source node, can be omitted if the "from" node has only one output port

required
to_node_id str

node id of the destination node

required
to_port Union[str, None]

port name on the destination node, can be ommitted if the "to" node has only one input port

required

Raises:

Type Description
InvalidLinkError

if the link cannot be added

Source code in hyrrokkin/api/topology.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def add_link(self, link_id: str, from_node_id: str, from_port: Union[str, None], to_node_id: str,
             to_port: Union[str, None]):
    """
    Add a link to the topology

    Args:
        link_id: a unique identifier for the link
        from_node_id: node id of the source node
        from_port: port name on the source node, can be omitted if the "from" node has only one output port
        to_node_id: node id of the destination node
        to_port: port name on the destination node, can be ommitted if the "to" node has only one input port

    Raises:
        InvalidLinkError: if the link cannot be added
    """

    if self.network.get_link(link_id) is not None:
        raise InvalidLinkError(f"Link with id {link_id} already exists")

    from_node = self.network.get_node(from_node_id)
    if from_node is None:
        raise InvalidLinkError(f"{from_node_id} does not exist")

    from_node_type_name = from_node.get_node_type()
    from_node_type = self.schema.get_node_type(from_node_type_name)

    to_node = self.network.get_node(to_node_id)
    if to_node is None:
        raise InvalidLinkError(f"{to_node_id} does not exist")
    to_node_type_name = to_node.get_node_type()
    to_node_type = self.schema.get_node_type(to_node_type_name)

    if from_port is None:
        if len(from_node_type.output_ports) == 1:
            from_port = next(iter(from_node_type.output_ports))
        else:
            output_port_names = ",".join(list(from_node_type.output_ports.keys()))
            raise InvalidLinkError(f"from_port not specified for link, should be one of ({output_port_names})")
    else:
        if from_port not in from_node_type.output_ports:
            raise InvalidLinkError(f"{from_port} is not a valid output port for node {from_node_id}")

    if to_port is None:
        if len(to_node_type.input_ports) == 1:
            to_port = next(iter(to_node_type.input_ports))
        else:
            input_port_names = ",".join(list(to_node_type.input_ports.keys()))
            raise InvalidLinkError(f"to_port not specified for link, should be one of ({input_port_names})")
    else:
        if to_port not in to_node_type.input_ports:
            raise InvalidLinkError(f"{to_port} is not a valid output port for node {to_node_id}")

    from_link_type = from_node_type.output_ports[from_port].link_type

    to_link_type = to_node_type.input_ports[to_port].link_type

    if from_link_type != to_link_type:
        raise InvalidLinkError(f"incompatible link types (from: {from_link_type}, to: {to_link_type})")

    if not from_node_type.output_ports[from_port].allows_multiple_connections():
        if len(self.network.get_outputs_from(from_node_id, from_port)) > 0:
            raise InvalidLinkError(
                f"output port {from_node_id}/{from_port} is already connected and does not allow multiple connections")

    if not to_node_type.input_ports[to_port].allows_multiple_connections():
        if len(self.network.get_inputs_to(to_node_id, to_port)) > 0:
            raise InvalidLinkError(
                f"input port {to_node_id}/{to_port} is already connected and does not allow multiple connections")

    link = Link(link_id, from_node_id, from_port, to_node_id, to_port, from_link_type)
    self.network.add_link(link)

add_node(node_id, node_type, properties={}, metadata={}, x=0, y=0)

Add a node to the topology

Parameters:

Name Type Description Default
node_id str

the node's unique identifier, must not already exist within the topology

required
node_type str

the type of the node, a string of the form package_id:node_type_id

required
properties dict[str, JsonType]

dictionary containing the node's property names and values, must be JSON serialisable

{}
metadata dict[str, JsonType]

a dictionary containing the new metadata

{}
x int

the new x-coordinate value

0
y int

the new y-coordinate value

0
Source code in hyrrokkin/api/topology.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def add_node(self, node_id: str, node_type: str, properties: dict[str, JsonType]={},
             metadata:dict[str, JsonType]={}, x:int=0, y:int=0) -> str:
    """
    Add a node to the topology

    Args:
        node_id: the node's unique identifier, must not already exist within the topology
        node_type: the type of the node, a string of the form package_id:node_type_id
        properties: dictionary containing the node's property names and values, must be JSON serialisable
        metadata: a dictionary containing the new metadata
        x: the new x-coordinate value
        y: the new y-coordinate value
    """
    if self.network.get_node(node_id) is not None:
        raise InvalidNodeError(f"Node with id {node_id} already exists")

    node = Node(node_id, node_type, x=x, y=y, metadata=metadata)
    self.network.add_node(node)
    self.network.get_node_datastore(node_id).set_properties(properties)
    self.empty = False
    return node_id

clear()

Remove all nodes and links from the topology

Source code in hyrrokkin/api/topology.py
616
617
618
619
620
def clear(self):
    """
    Remove all nodes and links from the topology
    """
    self.network.clear()

get_input_port_names(node_id)

Get the input port names for a given node

Parameters:

Name Type Description Default
node_id str

the id of the node

required

Returns:

Type Description
list[str]

list of input port names

Source code in hyrrokkin/api/topology.py
565
566
567
568
569
570
571
572
573
574
575
576
577
def get_input_port_names(self, node_id: str) -> list[str]:
    """
    Get the input port names for a given node

    Args:
        node_id: the id of the node

    Returns:
        list of input port names
    """
    node = self.network.get_node(node_id)
    node_type = self.schema.get_node_type(node.get_node_type())
    return [name for (name, _) in node_type.get_input_ports()]

Get the link details for a given link

Parameters:

Name Type Description Default
link_id str

the id of the link to retrieve

required

Returns:

Type Description
tuple[str, str, str, str]

tuple (from_node_id,from_port,to_node_id,to_port)

Source code in hyrrokkin/api/topology.py
538
539
540
541
542
543
544
545
546
547
548
549
def get_link(self, link_id:str) -> tuple[str, str, str, str]:
    """
    Get the link details for a given link

    Args:
        link_id: the id of the link to retrieve

    Returns:
        tuple (from_node_id,from_port,to_node_id,to_port)
    """
    link = self.network.get_link(link_id)
    return (link.from_node_id, link.from_port, link.to_node_id, link.to_port)

Get the ids of all links in the topology

Returns:

Type Description
list[str]

list of link ids

Source code in hyrrokkin/api/topology.py
529
530
531
532
533
534
535
536
def get_link_ids(self) -> list[str]:
    """
    Get the ids of all links in the topology

    Returns:
        list of link ids
    """
    return self.network.get_link_ids()

get_metadata()

Get the metadata of the topology

Returns:

Type Description
dict[str, JsonType]

A dictionary containing the metadata

Source code in hyrrokkin/api/topology.py
579
580
581
582
583
584
585
586
def get_metadata(self) -> dict[str, JsonType]:
    """
    Get the metadata of the topology

    Returns:
        A dictionary containing the metadata
    """
    return self.network.get_metadata()

get_node_data(node_id, key)

Get binary or string data associated with this node.

Parameters:

Name Type Description Default
node_id str

node identifier

required
key str

a key to locate the data (can only contain alphanumeric characters and underscores)

required

Returns:

Type Description
Union[bytes, str, None]

data or None if no data is associated with the key

Source code in hyrrokkin/api/topology.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@check_not_running
def get_node_data(self, node_id: str, key: str) -> Union[bytes, str, None]:
    """
    Get binary or string data associated with this node.

    Args:
        node_id: node identifier
        key: a key to locate the data (can only contain alphanumeric characters and underscores)

    Returns:
        data or None if no data is associated with the key
    """
    dsu = self.network.get_node_datastore(node_id)
    return dsu.get_data_sync(key)

get_node_ids()

Get the ids of all nodes in the topology

Returns:

Type Description
list[str]

list of node ids

Source code in hyrrokkin/api/topology.py
433
434
435
436
437
438
439
440
def get_node_ids(self) -> list[str]:
    """
    Get the ids of all nodes in the topology

    Returns:
        list of node ids
    """
    return self.network.get_node_ids()

get_node_metadata(node_id)

Get the metadata of a node

Parameters:

Name Type Description Default
node_id str

the id of the node

required

Returns:

Type Description
dict[str, JsonType]

A dictionary containing the metadata

Source code in hyrrokkin/api/topology.py
506
507
508
509
510
511
512
513
514
515
516
def get_node_metadata(self, node_id: str) -> dict[str, JsonType]:
    """
    Get the metadata of a node

    Args:
        node_id: the id of the node

    Returns:
        A dictionary containing the metadata
    """
    return self.network.get_node(node_id).get_metadata()

get_node_properties(node_id)

Get the properties for the specified node

Parameters:

Name Type Description Default
node_id str

the node identifier

required

Returns:

Type Description
dict[str, JsonType]

A dictionary containing the properties defined for that node

Source code in hyrrokkin/api/topology.py
588
589
590
591
592
593
594
595
596
597
598
599
600
@check_not_running
def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
    """
    Get the properties for the specified node

    Args:
        node_id: the node identifier

    Returns:
        A dictionary containing the properties defined for that node
    """
    dsu = self.network.get_node_datastore(node_id)
    return dsu.get_properties()

get_node_property(node_id, property_name)

Gets the property of a node, or None if the node's property is not set

Parameters:

Name Type Description Default
node_id str

the node's identifier

required
property_name str

the name of the property

required
Source code in hyrrokkin/api/topology.py
229
230
231
232
233
234
235
236
237
238
239
@check_not_running
def get_node_property(self, node_id: str, property_name: str) -> JsonType:
    """
    Gets the property of a node, or None if the node's property is not set

    Args:
        node_id: the node's identifier
        property_name: the name of the property
    """
    dsu = self.network.get_node_datastore(node_id)
    return dsu.get_property(property_name)

get_node_type(node_id)

Get the node package and type for a given node

Parameters:

Name Type Description Default
node_id str

the id of the node to retrieve

required

Returns:

Type Description
tuple[str, str]

tuple (package_id, node_type_id)

Source code in hyrrokkin/api/topology.py
442
443
444
445
446
447
448
449
450
451
452
453
454
def get_node_type(self, node_id: str) -> tuple[str, str]:
    """
    Get the node package and type for a given node

    Args:
        node_id: the id of the node to retrieve

    Returns:
        tuple (package_id, node_type_id)
    """
    node = self.network.get_node(node_id)
    node_type = node.get_node_type()
    return self.schema.split_descriptor(node_type)

get_output_port_names(node_id)

Get the output port names for a given node

Parameters:

Name Type Description Default
node_id str

the id of the node

required

Returns:

Type Description
list[str]

list of output port names

Source code in hyrrokkin/api/topology.py
551
552
553
554
555
556
557
558
559
560
561
562
563
def get_output_port_names(self, node_id: str) -> list[str]:
    """
    Get the output port names for a given node

    Args:
        node_id: the id of the node

    Returns:
        list of output port names
    """
    node = self.network.get_node(node_id)
    node_type = self.schema.get_node_type(node.get_node_type())
    return [name for (name, _) in node_type.get_output_ports()]

get_package_data(package_id, key)

Get binary or string data associated with a package configuration.

Parameters:

Name Type Description Default
package_id str

package identifier

required
key str

a key to locate the data (can only contain alphanumeric characters and underscores)

required

Returns:

Type Description
Union[bytes, str, None]

data or None if no data is associated with the key

Source code in hyrrokkin/api/topology.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
@check_not_running
def get_package_data(self, package_id: str, key: str) -> Union[bytes, str, None]:
    """
    Get binary or string data associated with a package configuration.

    Args:
        package_id: package identifier
        key: a key to locate the data (can only contain alphanumeric characters and underscores)

    Returns:
        data or None if no data is associated with the key
    """
    dsu = self.network.get_configuration_datastore(package_id)
    return dsu.get_data_sync(key)

get_package_ids()

Gets the ids of all packages

Returns:

Type Description
list[str]

list of package ids

Source code in hyrrokkin/api/topology.py
424
425
426
427
428
429
430
431
def get_package_ids(self) -> list[str]:
    """
    Gets the ids of all packages

    Returns:
         list of package ids
    """
    return list(self.schema.get_packages().keys())

get_package_properties(package_id)

Get the properties for the specified package

Parameters:

Name Type Description Default
package_id str

the package identifier

required

Returns:

Type Description
dict[str, JsonType]

A dictionary containing the properties defined for that package

Source code in hyrrokkin/api/topology.py
602
603
604
605
606
607
608
609
610
611
612
613
614
@check_not_running
def get_package_properties(self, package_id: str) -> dict[str, JsonType]:
    """
    Get the properties for the specified package

    Args:
        package_id: the package identifier

    Returns:
        A dictionary containing the properties defined for that package
    """
    dsu = self.network.get_configuration_datastore(package_id)
    return dsu.get_properties()

get_package_property(package_id, property_name)

Gets the property of a package, or None if the package or package property is not set

Parameters:

Name Type Description Default
package_id str

the package's identifier

required
property_name str

the name of the property

required

Returns:

Type Description
JsonType

property value or None if no such property exists

Source code in hyrrokkin/api/topology.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@check_not_running
def get_package_property(self, package_id: str, property_name: str) -> JsonType:
    """
    Gets the property of a package, or None if the package or package property is not set

    Args:
        package_id: the package's identifier
        property_name: the name of the property

    Returns:
        property value or None if no such property exists
    """
    dsu = self.network.get_configuration_datastore(package_id)
    return dsu.get_property(property_name)

load_dir()

Load a topology from the execution folder

Source code in hyrrokkin/api/topology.py
 99
100
101
102
103
104
def load_dir(self):
    """
    Load a topology from the execution folder
    """
    (added_node_ids, added_link_ids, node_renamings) = self.network.load_dir({})
    self.empty = False

load_zip(from_file)

Load a topology from a binary stream

Parameters:

Name Type Description Default
from_file BytesIO

a binary stream, opened for reading

required

Returns:

Type Description
dict

a dictionary containing any node renamings performed to avoid id collisions with existing nodes

Source code in hyrrokkin/api/topology.py
85
86
87
88
89
90
91
92
93
94
95
96
97
def load_zip(self, from_file: io.BytesIO) -> dict:
    """
    Load a topology from a binary stream

    Args:
        from_file: a binary stream, opened for reading

    Returns:
        a dictionary containing any node renamings performed to avoid id collisions with existing nodes
    """
    (added_node_ids, added_link_ids, node_renamings) = self.network.load_zip(from_file, merging=not self.empty)
    self.empty = False
    return node_renamings

open_runner(status_event_handler=None, execution_event_handler=None, engine_launcher=None, read_only=False)

Create a runner to run the topology

Parameters:

Name Type Description Default
status_event_handler Callable[[str, str, str, str], None]

specify a function to call when a node/configuration sets its status passing parameters target_id, target_type, msg, status

None
execution_event_handler Callable[[Union[float, None], str, str, Union[Dict, Exception, None], bool], None]

specify a function to call when a node changes its execution status passing parameters timestamp, node_id, state, exception, is_manual

None
engine_launcher Union[EngineLauncher, None]

the engine_launcher to use to run the topology in a remote process. if not specified, select an appropriate one for the packages loaded

None
read_only bool

if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology

False

Returns: a TopologyInteractor instance that allows the execution to be stopped and clients to be attached and detached

Source code in hyrrokkin/api/topology.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def open_runner(self, status_event_handler: Callable[[str, str, str, str], None] = None,
             execution_event_handler: Callable[[Union[float,None], str, str, Union[Dict, Exception, None], bool], None] = None,
             engine_launcher: Union[EngineLauncher,None]=None,
             read_only: bool = False) -> TopologyRunner:
    """
    Create a runner to run the topology

    Args:
        status_event_handler: specify a function to call when a node/configuration sets its status
                             passing parameters target_id, target_type, msg, status
        execution_event_handler: specify a function to call when a node changes its execution status
                            passing parameters timestamp, node_id, state, exception, is_manual
        engine_launcher: the engine_launcher to use to run the topology in a remote process.  if not specified, select an appropriate one
                         for the packages loaded
        read_only: if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology

    Returns: a TopologyInteractor instance that allows the execution to be stopped and clients to be attached and detached
    """
    if not read_only:
        if not self.execution_folder:
            raise ValueError("An execution folder must be associated with this topology for it to be run with read_only set to False")

    if len(self.read_write_runners) > 0:
        raise ValueError("An open runner with read-write access exists")

    if not read_only:
        if len(self.read_only_runners) > 0:
            raise ValueError("Open runner(s) with read-only access exists")

    runner = TopologyRunner(network=self.network, schema=self.schema, execution_folder=self.execution_folder,
                                     engine_launcher=engine_launcher, status_event_handler=status_event_handler,
                                     execution_event_handler=execution_event_handler, read_only=read_only)

    if read_only:
        self.read_only_runners.add(runner)
        runner.set_close_callback(lambda: self.read_only_runners.remove(runner))
    else:
        self.read_write_runners.add(runner)
        runner.set_close_callback(lambda: self.read_write_runners.remove(runner))

    return runner

Remove a link from the topology

Parameters:

Name Type Description Default
link_id str

the link's unique identifier

required
Source code in hyrrokkin/api/topology.py
412
413
414
415
416
417
418
419
420
421
422
def remove_link(self, link_id: str):
    """
    Remove a link from the topology

    Args:
        link_id: the link's unique identifier
    """
    if self.network.get_link(link_id) is None:
        raise InvalidNodeError(f"Link with id {link_id} does not exist")
    else:
        self.network.remove_link(link_id)

remove_node(node_id)

Remove a node from the topology

Parameters:

Name Type Description Default
node_id str

the node's unique identifier

required
Source code in hyrrokkin/api/topology.py
207
208
209
210
211
212
213
214
215
216
def remove_node(self, node_id: str):
    """
    Remove a node from the topology

    Args:
        node_id: the node's unique identifier
    """
    if self.network.get_node(node_id) is None:
        raise InvalidNodeError(f"Node with id {node_id} does not exist")
    self.network.remove_node(node_id)

save_zip(to_file=None)

Save a topology to a binary stream

Parameters:

Name Type Description Default
to_file BufferedWriter

an opened binary file to which the topology will be saved, if provided

None

Returns:

Type Description
Union[None, bytes]

if to_file is not provided, returns a bytes object containing the saved topology

Source code in hyrrokkin/api/topology.py
106
107
108
109
110
111
112
113
114
115
116
def save_zip(self, to_file: io.BufferedWriter=None) ->Union[None,bytes]:
    """
    Save a topology to a binary stream

    Args:
        to_file: an opened binary file to which the topology will be saved, if provided

    Returns:
        if to_file is not provided, returns a bytes object containing the saved topology
    """
    return self.network.save_zip(to_file)

serialise()

Serialise the topology to a dictionary without data/properties

Returns:

Type Description
dict[str, JsonType]

a dictionary describing the topoology

Source code in hyrrokkin/api/topology.py
496
497
498
499
500
501
502
503
504
def serialise(self) -> dict[str,JsonType]:
    """
    Serialise the topology to a dictionary without data/properties

    Returns:
        a dictionary describing the topoology
    """

    return self.network.save()

Serialise a link to a dictionary with self-explanatory keys

Parameters:

Name Type Description Default
link_id str

the id of the link to serialise

required

Returns:

Type Description
dict[str, JsonType]

a dictionary describing the link

Source code in hyrrokkin/api/topology.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def serialise_link(self, link_id:str) -> dict[str, JsonType]:
    """
    Serialise a link to a dictionary with self-explanatory keys

    Args:
        link_id: the id of the link to serialise

    Returns:
        a dictionary describing the link
    """
    link = self.network.get_link(link_id)
    msg = {}
    msg["link_id"] = link_id
    msg["link_type"] = link.get_link_type()
    msg["from_node"] = link.from_node_id
    msg["from_port"] = link.from_port
    msg["to_node"] = link.to_node_id
    msg["to_port"] = link.to_port
    return msg

serialise_node(node_id)

Serialise a node to a dictionary with self-explanatory keys

Parameters:

Name Type Description Default
node_id str

the id of the node to serialise

required

Returns:

Type Description
dict[str, JsonType]

a dictionary describing the node

Source code in hyrrokkin/api/topology.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def serialise_node(self, node_id:str) -> dict[str, JsonType]:
    """
    Serialise a node to a dictionary with self-explanatory keys

    Args:
        node_id: the id of the node to serialise

    Returns:
        a dictionary describing the node
    """
    node = self.network.get_node(node_id)
    d = {}
    d["node_id"] = node_id
    d["node_type"] = node.get_node_type()
    (x, y) = node.get_xy()
    d["x"] = x
    d["y"] = y
    d["metadata"] = node.get_metadata()
    return d

set_configuration(package_id, properties)

Set the properties of a package's configuration

Parameters:

Name Type Description Default
package_id str

the id of the packahe

required
properties dict

a dictionary containing the configuration properties, must be JSON serialisable.

required
Source code in hyrrokkin/api/topology.py
172
173
174
175
176
177
178
179
180
181
182
183
@check_not_running
def set_configuration(self, package_id: str, properties: dict):
    """
    Set the properties of a package's configuration

    Args:
        package_id: the id of the packahe
        properties: a dictionary containing the configuration properties, must be JSON serialisable.
    """
    dsu = self.network.get_configuration_datastore(package_id)
    dsu.set_properties(properties)
    self.empty = False

set_metadata(metadata)

Set metadata for this topology

Parameters:

Name Type Description Default
metadata dict[str, str]

a dictionary containing metadata, consisting of string keys and values.

required
Notes

the following keys will be understood by hyrrokkin based tools - version, description, authors

Source code in hyrrokkin/api/topology.py
160
161
162
163
164
165
166
167
168
169
170
def set_metadata(self, metadata: dict[str, str]):
    """
    Set metadata for this topology

    Args:
        metadata: a dictionary containing metadata, consisting of string keys and values.

    Notes:
        the following keys will be understood by hyrrokkin based tools - version, description, authors
    """
    self.network.set_metadata(metadata)

set_node_data(node_id, key, data)

Set binary or string data associated with this node.

Parameters:

Name Type Description Default
node_id str

node identifier

required
key str

a key to locate the data (can only contain alphanumeric characters and underscores)

required
data Union[bytes, str, None]

data to be stored

required
Source code in hyrrokkin/api/topology.py
269
270
271
272
273
274
275
276
277
278
279
280
@check_not_running
def set_node_data(self, node_id: str, key: str, data: Union[bytes, str, None]):
    """
    Set binary or string data associated with this node.

    Args:
        node_id: node identifier
        key: a key to locate the data (can only contain alphanumeric characters and underscores)
        data: data to be stored
    """
    dsu = self.network.get_node_datastore(node_id)
    dsu.set_data_sync(key, data)

set_node_property(node_id, property_name, property_value)

Update the property of a node

Parameters:

Name Type Description Default
node_id str

the node's identifier

required
property_name str

the name of the property

required
property_value JsonType

the value of the property, must be JSON serialisable

required
Source code in hyrrokkin/api/topology.py
241
242
243
244
245
246
247
248
249
250
251
252
@check_not_running
def set_node_property(self, node_id: str, property_name: str, property_value: JsonType):
    """
    Update the property of a node

    Args:
        node_id: the node's identifier
        property_name: the name of the property
        property_value: the value of the property, must be JSON serialisable
    """
    dsu = self.network.get_node_datastore(node_id)
    dsu.set_property(property_name, property_value)

set_package_data(package_id, key, data)

Set binary or string data associated with this node.

Parameters:

Name Type Description Default
package_id str

package identifier

required
key str

a key to locate the data (can only contain alphanumeric characters and underscores)

required
data Union[bytes, str, None]

data to be stored

required
Source code in hyrrokkin/api/topology.py
326
327
328
329
330
331
332
333
334
335
336
337
338
@check_not_running
def set_package_data(self, package_id: str, key: str, data: Union[bytes, str, None]):
    """
    Set binary or string data associated with this node.

    Args:
        package_id: package identifier
        key: a key to locate the data (can only contain alphanumeric characters and underscores)
        data: data to be stored
    """
    dsu = self.network.get_configuration_datastore(package_id)
    dsu.set_data_sync(key, data)
    self.empty = False

set_package_property(package_id, property_name, property_value)

Update the property of a package

Parameters:

Name Type Description Default
package_id str

the packae's identifier

required
property_name str

the name of the property

required
property_value JsonType

the value of the property, must be JSON serialisable

required
Source code in hyrrokkin/api/topology.py
297
298
299
300
301
302
303
304
305
306
307
308
309
@check_not_running
def set_package_property(self, package_id: str, property_name: str, property_value: JsonType):
    """
    Update the property of a package

    Args:
        package_id: the packae's identifier
        property_name: the name of the property
        property_value: the value of the property, must be JSON serialisable
    """
    dsu = self.network.get_configuration_datastore(package_id)
    dsu.set_property(property_name, property_value)
    self.empty = False

update_node_metadata(node_id, metadata)

Updates the metadata of a node

Parameters:

Name Type Description Default
node_id str

the id of the node

required
metadata dict[str, JsonType]

a dictionary containing the new metadata

required
Source code in hyrrokkin/api/topology.py
518
519
520
521
522
523
524
525
526
527
def update_node_metadata(self, node_id:str, metadata:dict[str,JsonType]):
    """
    Updates the metadata of a node

    Args:
        node_id: the id of the node
        metadata: a dictionary containing the new metadata
    """
    node = self.network.get_node(node_id)
    node.update_metadata(metadata)

update_node_position(node_id, x, y)

Update a node's position

Parameters:

Name Type Description Default
node_id str

the id of the node to update

required
x int

the new x-coordinate value

required
y int

the new y-coordinate value

required
Source code in hyrrokkin/api/topology.py
218
219
220
221
222
223
224
225
226
227
def update_node_position(self, node_id:str, x:int, y:int):
    """
    Update a node's position

    Args:
        node_id: the id of the node to update
        x: the new x-coordinate value
        y: the new y-coordinate value
    """
    self.network.move_node(node_id, x, y)

check_not_running(func)

Decorator that prevents access to a method if any runners are running :param func: the method to be decorated :return: wrapped method

Source code in hyrrokkin/api/topology.py
39
40
41
42
43
44
45
46
47
48
49
def check_not_running(func):
    """
    Decorator that prevents access to a method if any runners are running
    :param func: the method to be decorated
    :return: wrapped method
    """
    def threadsafe_wrapper(self, *args, **kwargs):
        if len(self.read_only_runners) > 0 or len(self.read_write_runners) > 0:
            pass # raise Exception("Runners are active")
        return func(self, *args, **kwargs)
    return threadsafe_wrapper