API Reference¶
This section provides detailed documentation of the core classes and utility functions used in GenSphere.
Core Classes¶
GenFlow¶
Module: genflow.py
The GenFlow
class is responsible for parsing YAML workflow definitions, constructing an execution graph, and executing nodes in the correct order. It manages the overall workflow execution process.
Class Definition¶
class GenFlow:
def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):
# Initialization code
def parse_yaml(self):
# Parses the YAML data and constructs nodes
def build_graph(self):
# Builds the execution graph
def run(self):
# Executes the nodes in topological order
Constructor¶
def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):
Parameters:
yaml_file
(str): Path to the main YAML file defining the workflow.functions_filepath
(str, optional): Path to the Python file containing custom function definitions.structured_output_schema_filepath
(str, optional): Path to the Python file containing structured output schemas.
Description:
Initializes the GenFlow
instance by loading the YAML data and preparing the environment for execution. It verifies the validity of provided file paths and ensures that the necessary files are accessible.
Raises:
FileNotFoundError
: If the providedfunctions_filepath
orstructured_output_schema_filepath
does not exist.ValueError
: If the provided file paths are not.py
files.
Methods¶
parse_yaml
¶
def parse_yaml(self):
Description:
Parses the YAML data from the main workflow file and constructs the nodes for execution. It also checks for the presence of nested workflows (yml_flow
nodes) and composes them using YamlCompose
if necessary. Validates the YAML file for consistency before parsing.
Raises:
Exception
: If the YAML file fails consistency checks.
Example Usage:
flow = GenFlow('workflow.yaml', 'functions.py', 'schemas.py')
flow.parse_yaml()
build_graph
¶
def build_graph(self):
Description:
Builds a directed acyclic graph (DAG) representing the execution order of nodes based on their dependencies. It adds nodes and edges to the graph according to the dependencies identified during parsing.
Raises:
ValueError
: If the graph contains cycles or if a node depends on an undefined node or variable.
Example Usage:
flow.build_graph()
run
¶
def run(self):
Description:
Executes the nodes in the order determined by the topological sort of the execution graph. It renders the parameters for each node using the outputs of previously executed nodes and handles iterative execution for nodes processing lists.
Raises:
Exception
: If there are cycles in the graph or if an error occurs during node execution.
Example Usage:
flow.run()
After execution, the outputs from each node are stored in the outputs
attribute of the GenFlow
instance.
Node¶
Module: genflow.py
The Node
class represents an individual operation or step within the workflow. It encapsulates the logic required to execute that step, including parameter rendering and function execution.
Class Definition¶
class Node:
def __init__(self, node_data):
# Initialization code
def set_flow(self, flow):
# Sets reference to the GenFlow instance
def get_dependencies(self, node_names):
# Retrieves the dependencies of the node
def render_params(self, outputs, env):
# Renders the parameters using previous outputs
def execute(self, params):
# Executes the node based on its type and parameters
Constructor¶
def __init__(self, node_data):
Parameters:
node_data
(dict): Dictionary containing the node's configuration from the YAML file.
Description:
Initializes the Node
instance with the given configuration. It extracts essential information such as the node's name, type, outputs, and parameters.
Methods¶
set_flow
¶
def set_flow(self, flow):
Parameters:
flow
(GenFlow): Reference to theGenFlow
instance managing the workflow execution.
Description:
Sets the reference to the GenFlow
instance, allowing the node to access shared resources and configurations during execution.
get_dependencies
¶
def get_dependencies(self, node_names):
Parameters:
node_names
(Iterable[str]): Iterable of all node names in the workflow.
Returns:
dependencies
(Set[str]): Set of node names that the current node depends on.
Description:
Analyzes the node's parameters to determine which other nodes it depends on. This is used to build the execution graph and ensure correct execution order.
Example Usage:
dependencies = node.get_dependencies(flow.nodes.keys())
render_params
¶
def render_params(self, outputs, env):
Parameters:
outputs
(dict): Outputs from previously executed nodes.env
(jinja2.Environment): Jinja2 environment used for templating.
Returns:
rendered_params
(dict or list of dicts): Parameters with values rendered using the outputs of previous nodes.
Description:
Renders the node's parameters by substituting placeholders with actual values from previous outputs. Supports handling of indexed parameters and lists for iterative processing.
Raises:
ValueError
: If a referenced variable is not found or is not iterable when expected.
execute
¶
def execute(self, params):
Parameters:
params
(dict): Parameters to be used for the node execution.
Returns:
outputs
(dict): Dictionary of outputs produced by the node execution.
Description:
Executes the node based on its type:
- For
function_call
nodes, it executes a Python function. - For
llm_service
nodes, it interacts with an LLM service like OpenAI.
Delegates to specific execution methods depending on the node type.
Raises:
NotImplementedError
: If the node type is not supported.Exception
: If an error occurs during execution.
Example Usage:
outputs = node.execute(rendered_params)
YamlCompose¶
Module: yaml_utils.py
The YamlCompose
class is responsible for composing multiple YAML workflow files into a single unified workflow. It resolves references to nested workflows (yml_flow
nodes) and adjusts node names and parameters to ensure uniqueness and consistency.
Class Definition¶
class YamlCompose:
def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):
# Initialization code
def compose(self, save_combined_yaml=False, output_file='combined.yaml'):
# Starts the composition process and returns the combined YAML data
Constructor¶
def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):
Parameters:
yaml_file
(str): Path to the root YAML file to be composed.functions_filepath
(str): Path to the Python file containing custom functions.structured_output_schema_filepath
(str): Path to the Python file containing structured output schemas.
Description:
Initializes the YamlCompose
instance and prepares for the composition process by validating the provided file paths.
Raises:
FileNotFoundError
: If the provided file paths do not exist.ValueError
: If the provided file paths are not.py
files.
Methods¶
compose
¶
def compose(self, save_combined_yaml=False, output_file='combined.yaml'):
Parameters:
save_combined_yaml
(bool, optional): IfTrue
, saves the combined YAML data to a file.output_file
(str, optional): Filename to save the combined YAML data.
Returns:
combined_data
(dict): The combined YAML data after composition.
Description:
Starts the composition process by recursively processing the root YAML file and any nested sub-flows. Adjusts node names and parameter references to ensure uniqueness across the combined workflow.
Raises:
Exception
: If validation fails during composition.
Example Usage:
composer = YamlCompose('main_workflow.yaml', 'functions.py', 'schemas.py')
combined_yaml_data = composer.compose(save_combined_yaml=True, output_file='combined.yaml')
After composition, the combined YAML file can be executed as a single workflow.
Visualizer¶
Module: visualizer.py
The Visualizer
class provides a graphical representation of GenSphere workflows using a web-based interface powered by Dash and Cytoscape. It allows users to visualize nodes, their types, dependencies, and inspect details of each node interactively.
Class Definition¶
class Visualizer:
def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):
# Initialization code
def start_visualization(self):
# Starts the Dash application for visualization
Constructor¶
def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):
Parameters:
yaml_file
(str, optional): Path to the YAML file defining the workflow.functions_filepath
(str, optional): Path to the Python file containing custom function definitions.structured_output_schema_filepath
(str, optional): Path to the Python file containing structured output schemas.address
(str, optional): The IP address to host the Dash app (default:'127.0.0.1'
).port
(int, optional): The port to host the Dash app (default:8050
).
Description:
Initializes the Visualizer
instance by setting up the necessary file paths and loading the user-provided functions and schemas. It validates the existence and correctness of the provided files and prepares the environment for visualization.
Raises:
FileNotFoundError
: If any of the provided file paths do not exist.ValueError
: If the provided files are not.py
files.
Example Usage:
from gensphere.visualizer import Visualizer
viz = Visualizer(
yaml_file='workflow.yaml',
functions_filepath='functions.py',
structured_output_schema_filepath='schemas.py',
address='127.0.0.1',
port=8050
)
Methods¶
start_visualization
¶
def start_visualization(self):
Description:
Starts the Dash application for visualizing the GenSphere workflow. The application provides an interactive interface where nodes are displayed graphically, and users can click on nodes to view detailed information such as parameters, outputs, functions, and schemas.
Features:
- Graph Visualization: Uses Cytoscape to render the workflow graph.
- Interactive Nodes: Clicking on a node displays detailed information.
- Legend: Includes a legend explaining node types and edge styles.
- Dynamic Loading: Users can input a different YAML file path and reload the graph.
Example Usage:
viz.start_visualization()
After running this method, navigate to http://127.0.0.1:8050
in your web browser to view the visualization.
Notes:
- Ensure that the YAML file and any referenced functions or schemas are correctly specified.
- The visualization runs a local web server; make sure the specified
address
andport
are accessible.
Hub¶
Module: hub.py
The Hub
class provides an interface to interact with the GenSphere Hub platform. It allows users to push workflows to the hub, pull workflows from the hub, and check the number of times a workflow has been pulled.
Class Definition¶
class Hub:
def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):
# Initialization code
def push(self, push_name=None):
# Pushes the workflow to the GenSphere Hub
def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):
# Pulls a workflow from the GenSphere Hub
def count_pulls(self, push_id):
# Retrieves the total number of times a push has been pulled
Constructor¶
def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):
Parameters:
yaml_file
(str, optional): Path to the YAML file to be pushed.functions_file
(str, optional): Path to the functions file to be pushed.schema_file
(str, optional): Path to the schema file to be pushed.api_base_url
(str, optional): Base URL for the GenSphere Hub API.
Description:
Initializes the Hub
instance with the provided file paths and API base URL. Prepares the instance for pushing and pulling workflows to and from the GenSphere Hub platform.
Example Usage:
from gensphere.hub import Hub
hub = Hub(
yaml_file='workflow.yaml',
functions_file='functions.py',
schema_file='schemas.py'
)
Methods¶
push
¶
def push(self, push_name=None):
Parameters:
push_name
(str, optional): A descriptive name for the workflow being pushed.
Returns:
result
(dict): A dictionary containing thepush_id
and a list of uploaded files.
Description:
Pushes the specified workflow files to the GenSphere Hub. Validates the YAML file for consistency before pushing. The push_id
returned can be used to pull the workflow or check its pull count.
Raises:
ValueError
: If noyaml_file
is provided or if the functions or schema files are not.py
files.Exception
: If validation fails or if an error occurs during the push.
Example Usage:
result = hub.push(push_name='My Awesome Workflow')
push_id = result.get('push_id')
print(f"Workflow pushed with push_id: {push_id}")
pull
¶
def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):
Parameters:
push_id
(str): Thepush_id
of the workflow to pull.save_to_disk
(bool, optional): IfTrue
, saves the pulled files to disk (default:True
).yaml_filename
(str, optional): Custom filename for the YAML file.functions_filename
(str, optional): Custom filename for the functions file.schema_filename
(str, optional): Custom filename for the schema file.download_path
(str, optional): Directory to save the pulled files (default:"."
).
Returns:
files_content
(dict): A dictionary containing the contents of the pulled files.
Description:
Pulls a workflow from the GenSphere Hub using the provided push_id
. Optionally saves the files to disk with custom filenames. Ensures that existing files are not overwritten by appending a counter if necessary.
Raises:
Exception
: If an error occurs during the pull operation.
Example Usage:
files = hub.pull(
push_id=push_id,
save_to_disk=True,
yaml_filename='downloaded_workflow.yaml',
functions_filename='downloaded_functions.py',
schema_filename='downloaded_schemas.py'
)
count_pulls
¶
def count_pulls(self, push_id):
Parameters:
push_id
(str): Thepush_id
of the workflow to check.
Returns:
pull_count
(int): The total number of times the workflow has been pulled.
Description:
Retrieves the total number of times a workflow has been pulled from the GenSphere Hub using the provided push_id
.
Raises:
Exception
: If an error occurs during the request.
Example Usage:
pull_count = hub.count_pulls(push_id=push_id)
print(f"The workflow has been pulled {pull_count} times.")
Utility Functions¶
This section documents the utility functions used within GenSphere, primarily for internal processing and validation.
get_function_schema¶
Module: genflow.py
def get_function_schema(func):
Parameters:
func
(function): The Python function object to generate a schema for.
Returns:
function_def
(dict): A dictionary representing the function definition, including name, description, and parameters.
Description:
Generates a schema for a given function by inspecting its signature and docstring. This schema is used for OpenAI's function calling feature in LLM service nodes. It ensures that the function parameters are properly typed and documented.
Raises:
ValueError
: If a parameter lacks a type annotation or if the function lacks a docstring.
Example Usage:
Used internally when preparing function definitions for OpenAI's function calling.
validate_yaml¶
Module: yaml_utils.py
def validate_yaml(
yaml_file,
functions_filepath=None,
structured_output_schema_filepath=None,
parent_node_names=None,
visited_files=None,
parent_params=None,
parent_node_outputs=None
):
Parameters:
yaml_file
(str): Path to the YAML file being validated.functions_filepath
(str, optional): Path to the functions file.structured_output_schema_filepath
(str, optional): Path to the schemas file.parent_node_names
(Set[str], optional): Set of node names from the parent flow.visited_files
(Set[str], optional): Set of visited YAML files to prevent circular references.parent_params
(Set[str], optional): Set of parameter names passed from the parent flow.parent_node_outputs
(Dict[str, List[str]], optional): Dictionary of node outputs from parent flows.
Returns:
validated
(bool):True
if validation passes,False
otherwise.error_msgs
(List[str]): List of error messages encountered during validation.node_outputs
(Dict[str, List[str]]): Dictionary of node outputs in the current flow.
Description:
Validates a YAML workflow file and any associated sub-flows for consistency and correctness. Checks for issues such as:
- Missing required fields (
name
,type
). - Duplicate node names.
- Undefined or duplicate outputs.
- References to undefined nodes or outputs.
- Cycles in the execution graph.
- Validity of functions and schemas.
Raises:
FileNotFoundError
: If referenced files do not exist.ValueError
: If the YAML structure is invalid.
Example Usage:
Used internally before executing or composing workflows to ensure they are valid.
collect_referenced_nodes_and_outputs¶
Module: yaml_utils.py
def collect_referenced_nodes_and_outputs(params):
Parameters:
params
(dict): Parameters dictionary from a node.
Returns:
referenced_nodes_outputs
(Set[Tuple[str, str]]): A set of tuples containing referenced node names and outputs.
Description:
Analyzes the parameters of a node to identify all referenced nodes and their outputs, which is essential for validating dependencies and ensuring that all references are valid.
collect_used_params¶
Module: yaml_utils.py
def collect_used_params(yaml_data):
Parameters:
yaml_data
(dict): The YAML data of a workflow.
Returns:
used_params
(Set[str]): A set of parameter names used within the workflow.
Description:
Collects all parameter names that are used in the workflow, particularly in the context of nested workflows (yml_flow
nodes). This helps in validating that all required parameters are provided.
collect_referenced_params¶
Module: yaml_utils.py
def collect_referenced_params(params):
Parameters:
params
(dict): Parameters dictionary from a node.
Returns:
referenced_params
(Set[str]): A set of parameter names referenced in the parameters.
Description:
Identifies all parameter names that are referenced within the node's parameters, usually in templated strings. This is used to ensure that all referenced parameters are defined.
collect_referenced_nodes¶
Module: yaml_utils.py
def collect_referenced_nodes(params):
Parameters:
params
(dict): Parameters dictionary from a node.
Returns:
referenced_nodes
(Set[str]): A set of node names referenced in the parameters.
Description:
Identifies all node names that are referenced within the node's parameters. This is crucial for building the execution graph and determining the correct execution order.
load_yaml_file¶
Module: yaml_utils.py
def load_yaml_file(yaml_file):
Parameters:
yaml_file
(str): Path to the YAML file to load.
Returns:
data
(dict): The loaded YAML data.
Description:
Loads the YAML data from a file and handles parsing errors. Ensures that the file exists and contains valid YAML.
Raises:
FileNotFoundError
: If the YAML file does not exist.ValueError
: If there is an error parsing the YAML file.
has_yml_flow_nodes¶
Module: yaml_utils.py
def has_yml_flow_nodes(yaml_data):
Parameters:
yaml_data
(dict): The YAML data of a workflow.
Returns:
bool
:True
if the workflow contains anyyml_flow
nodes,False
otherwise.
Description:
Checks whether the given YAML data contains any nested workflows (yml_flow
nodes). This helps determine if composition is necessary before execution.
get_base_output_name¶
Module: yaml_utils.py
def get_base_output_name(output_reference):
Parameters:
output_reference
(str): A string representing an output reference (e.g.,'countries_list[i]'
).
Returns:
base_output_name
(str): The base output name extracted from the reference.
Description:
Extracts the base output name from a complex output reference that may include indexing or attribute access. Used during validation to identify the actual outputs being referenced.
parse_yaml¶
Module: graph_builder.py
def parse_yaml(yaml_file):
Parameters:
yaml_file
(str): Path to the YAML file to parse.
Returns:
data
(dict): Parsed YAML data.
Description:
Parses a YAML file and returns its content as a dictionary. Validates the existence of the file and handles parsing errors.
Raises:
FileNotFoundError
: If the YAML file does not exist.yaml.YAMLError
: If an error occurs during YAML parsing.
Example Usage:
data = parse_yaml('workflow.yaml')
extract_referenced_nodes¶
Module: graph_builder.py
def extract_referenced_nodes(template_str):
Parameters:
template_str
(str): A templated string containing references to other nodes (e.g.,"{{ node.output }}"
).
Returns:
referenced_nodes
(Set[str]): A set of referenced node names.
Description:
Extracts all referenced node names from a templated string using regular expressions. Useful for identifying dependencies between nodes in a workflow.
Example Usage:
template_str = "{{ node1.output }} and {{ node2.output }}"
referenced_nodes = extract_referenced_nodes(template_str)
# referenced_nodes will be {'node1', 'node2'}
traverse_node_fields¶
Module: graph_builder.py
def traverse_node_fields(node_value):
Parameters:
node_value
(Union[str, dict, list]): The node value to traverse.
Returns:
referenced_nodes
(Set[str]): A set of referenced node names found within the node value.
Description:
Recursively traverses a node's fields to find all referenced node names. Handles strings, dictionaries, and lists. Used to identify all dependencies for a node.
Example Usage:
node_params = {
'param1': '{{ node1.output }}',
'param2': {
'subparam': '{{ node2.output }}'
}
}
referenced_nodes = traverse_node_fields(node_params)
# referenced_nodes will be {'node1', 'node2'}
identify_and_style_entrypoints_outputs¶
Module: graph_builder.py
def identify_and_style_entrypoints_outputs(elements):
Parameters:
elements
(list): List of Cytoscape elements (nodes and edges).
Returns:
elements
(list): Updated list of Cytoscape elements with styled entrypoints and output nodes.
Description:
Identifies entrypoint nodes (nodes with no incoming edges) and output nodes (nodes with no outgoing edges) in the workflow graph and styles them accordingly for visualization purposes.
Example Usage:
elements = identify_and_style_entrypoints_outputs(elements)
build_graph_data¶
Module: graph_builder.py
def build_graph_data(yaml_file):
Parameters:
yaml_file
(str): Path to the YAML file defining the workflow.
Returns:
elements
(list): List of Cytoscape elements (nodes and edges) representing the workflow graph.
Description:
Builds graph data compatible with Cytoscape from a YAML workflow definition. It processes nodes and edges, identifies dependencies, and prepares the data for visualization.
Raises:
ValueError
: If a node lacks a name or if there are duplicate node names.
Example Usage:
elements = build_graph_data('workflow.yaml')
Additional Information¶
These utility functions are primarily used internally by GenSphere to process and validate workflows. Understanding them can be helpful for advanced users who wish to extend or debug the framework.
Note: When developing custom functions or schemas for use in GenSphere workflows, ensure that:
- Functions have proper docstrings and type annotations.
- Schemas are defined using Pydantic models.
- Functions and schemas are placed in the files specified when initializing
GenFlow
orYamlCompose
.
Conclusion¶
For more examples and usage instructions, refer to the Tutorials.
If you have any questions or need further assistance, reach out on our GitHub Issues page.