hyperstream.workflow package¶
Submodules¶
hyperstream.workflow.factor module¶
hyperstream.workflow.meta_data_manager module¶
hyperstream.workflow.node module¶
hyperstream.workflow.plate module¶
hyperstream.workflow.plate_manager module¶
hyperstream.workflow.workflow module¶
Workflow and WorkflowMonitor definitions.
-
class
hyperstream.workflow.workflow.
Workflow
(workflow_id, name, description, owner, online=False, monitor=False)[source]¶ Bases:
hyperstream.utils.containers.Printable
Workflow. This defines the graph of operations through “nodes” and “factors”.
-
static
check_multi_output_plate_compatibility
(source_plates, sink_plate)[source]¶ Check multi-output plate compatibility. This ensures that the source plates and sink plates match for a multi- output plate
Parameters: - source_plates – The source plates
- sink_plate – The sink plate
Returns: True if the plates are compatible
-
static
check_plate_compatibility
(tool, source_plate, sink_plate)[source]¶ Checks whether the source and sink plate are compatible given the tool
Parameters: - tool (Tool) – The tool
- source_plate (Plate) – The source plate
- sink_plate (Plate) – The sink plate
Returns: Either an error, or None
Return type: None | str
-
create_factor
(tool, sources, sink, alignment_node=None)[source]¶ Creates a factor. Instantiates a single tool for all of the plates, and connects the source and sink nodes with that tool.
Note that the tool parameters these are currently fixed over a plate. For parameters that vary over a plate, an extra input stream should be used
Parameters: - alignment_node (Node | None) –
- tool (Tool | dict) – The tool to use. This is either an instantiated Tool object or a dict with “name” and “parameters”
- sources (list[Node] | tuple[Node] | None) – The source nodes
- sink (Node) – The sink node
Returns: The factor object
Return type: Factor
-
create_factor_general
(*args, **kwargs)[source]¶ General signature for factor creation that tries each of the factor creation types using duck typing
Parameters: - args – The positional arguments
- kwargs – The named arguments
Returns: The created factor
-
create_multi_output_factor
(tool, source, splitting_node, sink)[source]¶ Creates a multi-output factor. This takes a single node, applies a MultiOutputTool to create multiple nodes on a new plate Instantiates a single tool for all of the input plate values, and connects the source and sink nodes with that tool.
Note that the tool parameters these are currently fixed over a plate. For parameters that vary over a plate, an extra input stream should be used
Parameters: - tool (MultiOutputTool | dict) – The tool to use. This is either an instantiated Tool object or a dict with “name” and “parameters”
- source (Node | None) – The source node
- splitting_node – The node over which to split
- sink (Node) – The sink node
Returns: The factor object
Return type: Factor
-
create_node
(stream_name, channel, plates)[source]¶ Create a node in the graph. Note: assumes that the streams already exist
Parameters: - stream_name – The name of the stream
- channel – The channel where this stream lives
- plates – The plates. The stream meta-data will be auto-generated from these
Returns: The streams associated with this node
-
create_node_creation_factor
(tool, source, output_plate, plate_manager)[source]¶ Creates a factor that itself creates an output node, and ensures that the plate for the output node exists along with all relevant meta-data
Parameters: - tool – The tool
- source – The source node
- output_plate (dict) – The details of the plate that will be created (dict)
- plate_manager (PlateManager) – The hyperstream plate manager
Returns: The created factor
-
execute
(time_interval)[source]¶ Here we execute the factors over the streams in the workflow Execute the factors in reverse order. We can’t just execute the last factor because there may be multiple “leaf” factors that aren’t triggered by upstream computations.
Parameters: time_interval – The time interval to execute this workflow over
-
static
factorgraph_viz
(d)[source]¶ Map the dictionary into factorgraph-viz format. See https://github.com/mbforbes/factorgraph-viz
Parameters: d – The dictionary Returns: The formatted dictionary
-
requested_intervals
¶ Get the requested intervals (from the database)
Returns: The requested intervals
-
to_dict
(tool_long_names=True)[source]¶ Get a representation of the workflow as a dictionary for display purposes
Parameters: tool_long_names (bool) – Indicates whether to use long names, such as SplitterFromStream(element=None, use_mapping_keys_only=True) or short names, such as splitter_from_stream Returns: The dictionary of nodes, factors and plates
-
to_json
(formatter=None, tool_long_names=True, **kwargs)[source]¶ Get a JSON representation of the workflow
Parameters: - tool_long_names – Indicates whether to use long names, such as SplitterFromStream(element=None, use_mapping_keys_only=True) or short names, such as splitter_from_stream
- formatter – The formatting function
- kwargs – Keyword arguments for the json output
Returns: A JSON string
-
static
hyperstream.workflow.workflow_manager module¶
-
class
hyperstream.workflow.workflow_manager.
WorkflowManager
(channel_manager, plate_manager)[source]¶ Bases:
hyperstream.utils.containers.Printable
Workflow manager. Responsible for reading and writing workflows to the database, and can execute all of the workflows
-
add_workflow
(workflow, commit=False)[source]¶ Add a new workflow and optionally commit it to the database :param workflow: The workflow :param commit: Whether to commit the workflow to the database :type workflow: Workflow :type commit: bool :return: None
-
commit_workflow
(workflow_id)[source]¶ Commit the workflow to the database :param workflow_id: The workflow id :return: None
-
delete_workflow
(workflow_id)[source]¶ Delete a workflow from the database :param workflow_id: :return: None
-
load_workflow
(workflow_id)[source]¶ Load workflow from the database and store in memory :param workflow_id: The workflow id :return: The workflow
-