Skip to content

API Reference

This section provides detailed documentation of the core classes and utility functions used in GenSphere.


Core Classes


GenFlow

Module: genflow.py

The GenFlow class is responsible for parsing YAML workflow definitions, constructing an execution graph, and executing nodes in the correct order. It manages the overall workflow execution process.

Class Definition

class GenFlow:
    def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):
        # Initialization code

    def parse_yaml(self):
        # Parses the YAML data and constructs nodes

    def build_graph(self):
        # Builds the execution graph

    def run(self):
        # Executes the nodes in topological order

Constructor

def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):

Parameters:

  • yaml_file (str): Path to the main YAML file defining the workflow.
  • functions_filepath (str, optional): Path to the Python file containing custom function definitions.
  • structured_output_schema_filepath (str, optional): Path to the Python file containing structured output schemas.

Description:

Initializes the GenFlow instance by loading the YAML data and preparing the environment for execution. It verifies the validity of provided file paths and ensures that the necessary files are accessible.

Raises:

  • FileNotFoundError: If the provided functions_filepath or structured_output_schema_filepath does not exist.
  • ValueError: If the provided file paths are not .py files.

Methods

parse_yaml
def parse_yaml(self):

Description:

Parses the YAML data from the main workflow file and constructs the nodes for execution. It also checks for the presence of nested workflows (yml_flow nodes) and composes them using YamlCompose if necessary. Validates the YAML file for consistency before parsing.

Raises:

  • Exception: If the YAML file fails consistency checks.

Example Usage:

flow = GenFlow('workflow.yaml', 'functions.py', 'schemas.py')
flow.parse_yaml()
build_graph
def build_graph(self):

Description:

Builds a directed acyclic graph (DAG) representing the execution order of nodes based on their dependencies. It adds nodes and edges to the graph according to the dependencies identified during parsing.

Raises:

  • ValueError: If the graph contains cycles or if a node depends on an undefined node or variable.

Example Usage:

flow.build_graph()
run
def run(self):

Description:

Executes the nodes in the order determined by the topological sort of the execution graph. It renders the parameters for each node using the outputs of previously executed nodes and handles iterative execution for nodes processing lists.

Raises:

  • Exception: If there are cycles in the graph or if an error occurs during node execution.

Example Usage:

flow.run()

After execution, the outputs from each node are stored in the outputs attribute of the GenFlow instance.


Node

Module: genflow.py

The Node class represents an individual operation or step within the workflow. It encapsulates the logic required to execute that step, including parameter rendering and function execution.

Class Definition

class Node:
    def __init__(self, node_data):
        # Initialization code

    def set_flow(self, flow):
        # Sets reference to the GenFlow instance

    def get_dependencies(self, node_names):
        # Retrieves the dependencies of the node

    def render_params(self, outputs, env):
        # Renders the parameters using previous outputs

    def execute(self, params):
        # Executes the node based on its type and parameters

Constructor

def __init__(self, node_data):

Parameters:

  • node_data (dict): Dictionary containing the node's configuration from the YAML file.

Description:

Initializes the Node instance with the given configuration. It extracts essential information such as the node's name, type, outputs, and parameters.

Methods

set_flow
def set_flow(self, flow):

Parameters:

  • flow (GenFlow): Reference to the GenFlow instance managing the workflow execution.

Description:

Sets the reference to the GenFlow instance, allowing the node to access shared resources and configurations during execution.

get_dependencies
def get_dependencies(self, node_names):

Parameters:

  • node_names (Iterable[str]): Iterable of all node names in the workflow.

Returns:

  • dependencies (Set[str]): Set of node names that the current node depends on.

Description:

Analyzes the node's parameters to determine which other nodes it depends on. This is used to build the execution graph and ensure correct execution order.

Example Usage:

dependencies = node.get_dependencies(flow.nodes.keys())
render_params
def render_params(self, outputs, env):

Parameters:

  • outputs (dict): Outputs from previously executed nodes.
  • env (jinja2.Environment): Jinja2 environment used for templating.

Returns:

  • rendered_params (dict or list of dicts): Parameters with values rendered using the outputs of previous nodes.

Description:

Renders the node's parameters by substituting placeholders with actual values from previous outputs. Supports handling of indexed parameters and lists for iterative processing.

Raises:

  • ValueError: If a referenced variable is not found or is not iterable when expected.
execute
def execute(self, params):

Parameters:

  • params (dict): Parameters to be used for the node execution.

Returns:

  • outputs (dict): Dictionary of outputs produced by the node execution.

Description:

Executes the node based on its type:

  • For function_call nodes, it executes a Python function.
  • For llm_service nodes, it interacts with an LLM service like OpenAI.

Delegates to specific execution methods depending on the node type.

Raises:

  • NotImplementedError: If the node type is not supported.
  • Exception: If an error occurs during execution.

Example Usage:

outputs = node.execute(rendered_params)

YamlCompose

Module: yaml_utils.py

The YamlCompose class is responsible for composing multiple YAML workflow files into a single unified workflow. It resolves references to nested workflows (yml_flow nodes) and adjusts node names and parameters to ensure uniqueness and consistency.

Class Definition

class YamlCompose:
    def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):
        # Initialization code

    def compose(self, save_combined_yaml=False, output_file='combined.yaml'):
        # Starts the composition process and returns the combined YAML data

Constructor

def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):

Parameters:

  • yaml_file (str): Path to the root YAML file to be composed.
  • functions_filepath (str): Path to the Python file containing custom functions.
  • structured_output_schema_filepath (str): Path to the Python file containing structured output schemas.

Description:

Initializes the YamlCompose instance and prepares for the composition process by validating the provided file paths.

Raises:

  • FileNotFoundError: If the provided file paths do not exist.
  • ValueError: If the provided file paths are not .py files.

Methods

compose
def compose(self, save_combined_yaml=False, output_file='combined.yaml'):

Parameters:

  • save_combined_yaml (bool, optional): If True, saves the combined YAML data to a file.
  • output_file (str, optional): Filename to save the combined YAML data.

Returns:

  • combined_data (dict): The combined YAML data after composition.

Description:

Starts the composition process by recursively processing the root YAML file and any nested sub-flows. Adjusts node names and parameter references to ensure uniqueness across the combined workflow.

Raises:

  • Exception: If validation fails during composition.

Example Usage:

composer = YamlCompose('main_workflow.yaml', 'functions.py', 'schemas.py')
combined_yaml_data = composer.compose(save_combined_yaml=True, output_file='combined.yaml')

After composition, the combined YAML file can be executed as a single workflow.


Visualizer

Module: visualizer.py

The Visualizer class provides a graphical representation of GenSphere workflows using a web-based interface powered by Dash and Cytoscape. It allows users to visualize nodes, their types, dependencies, and inspect details of each node interactively.

Class Definition

class Visualizer:
    def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):
        # Initialization code

    def start_visualization(self):
        # Starts the Dash application for visualization

Constructor

def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):

Parameters:

  • yaml_file (str, optional): Path to the YAML file defining the workflow.
  • functions_filepath (str, optional): Path to the Python file containing custom function definitions.
  • structured_output_schema_filepath (str, optional): Path to the Python file containing structured output schemas.
  • address (str, optional): The IP address to host the Dash app (default: '127.0.0.1').
  • port (int, optional): The port to host the Dash app (default: 8050).

Description:

Initializes the Visualizer instance by setting up the necessary file paths and loading the user-provided functions and schemas. It validates the existence and correctness of the provided files and prepares the environment for visualization.

Raises:

  • FileNotFoundError: If any of the provided file paths do not exist.
  • ValueError: If the provided files are not .py files.

Example Usage:

from gensphere.visualizer import Visualizer

viz = Visualizer(
    yaml_file='workflow.yaml',
    functions_filepath='functions.py',
    structured_output_schema_filepath='schemas.py',
    address='127.0.0.1',
    port=8050
)

Methods

start_visualization

def start_visualization(self):

Description:

Starts the Dash application for visualizing the GenSphere workflow. The application provides an interactive interface where nodes are displayed graphically, and users can click on nodes to view detailed information such as parameters, outputs, functions, and schemas.

Features:

  • Graph Visualization: Uses Cytoscape to render the workflow graph.
  • Interactive Nodes: Clicking on a node displays detailed information.
  • Legend: Includes a legend explaining node types and edge styles.
  • Dynamic Loading: Users can input a different YAML file path and reload the graph.

Example Usage:

viz.start_visualization()

After running this method, navigate to http://127.0.0.1:8050 in your web browser to view the visualization.

Notes:

  • Ensure that the YAML file and any referenced functions or schemas are correctly specified.
  • The visualization runs a local web server; make sure the specified address and port are accessible.

Hub

Module: hub.py

The Hub class provides an interface to interact with the GenSphere Hub platform. It allows users to push workflows to the hub, pull workflows from the hub, and check the number of times a workflow has been pulled.

Class Definition

class Hub:
    def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):
        # Initialization code

    def push(self, push_name=None):
        # Pushes the workflow to the GenSphere Hub

    def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):
        # Pulls a workflow from the GenSphere Hub

    def count_pulls(self, push_id):
        # Retrieves the total number of times a push has been pulled

Constructor

def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):

Parameters:

  • yaml_file (str, optional): Path to the YAML file to be pushed.
  • functions_file (str, optional): Path to the functions file to be pushed.
  • schema_file (str, optional): Path to the schema file to be pushed.
  • api_base_url (str, optional): Base URL for the GenSphere Hub API.

Description:

Initializes the Hub instance with the provided file paths and API base URL. Prepares the instance for pushing and pulling workflows to and from the GenSphere Hub platform.

Example Usage:

from gensphere.hub import Hub

hub = Hub(
    yaml_file='workflow.yaml',
    functions_file='functions.py',
    schema_file='schemas.py'
)

Methods

push

def push(self, push_name=None):

Parameters:

  • push_name (str, optional): A descriptive name for the workflow being pushed.

Returns:

  • result (dict): A dictionary containing the push_id and a list of uploaded files.

Description:

Pushes the specified workflow files to the GenSphere Hub. Validates the YAML file for consistency before pushing. The push_id returned can be used to pull the workflow or check its pull count.

Raises:

  • ValueError: If no yaml_file is provided or if the functions or schema files are not .py files.
  • Exception: If validation fails or if an error occurs during the push.

Example Usage:

result = hub.push(push_name='My Awesome Workflow')
push_id = result.get('push_id')
print(f"Workflow pushed with push_id: {push_id}")

pull

def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):

Parameters:

  • push_id (str): The push_id of the workflow to pull.
  • save_to_disk (bool, optional): If True, saves the pulled files to disk (default: True).
  • yaml_filename (str, optional): Custom filename for the YAML file.
  • functions_filename (str, optional): Custom filename for the functions file.
  • schema_filename (str, optional): Custom filename for the schema file.
  • download_path (str, optional): Directory to save the pulled files (default: ".").

Returns:

  • files_content (dict): A dictionary containing the contents of the pulled files.

Description:

Pulls a workflow from the GenSphere Hub using the provided push_id. Optionally saves the files to disk with custom filenames. Ensures that existing files are not overwritten by appending a counter if necessary.

Raises:

  • Exception: If an error occurs during the pull operation.

Example Usage:

files = hub.pull(
    push_id=push_id,
    save_to_disk=True,
    yaml_filename='downloaded_workflow.yaml',
    functions_filename='downloaded_functions.py',
    schema_filename='downloaded_schemas.py'
)

count_pulls

def count_pulls(self, push_id):

Parameters:

  • push_id (str): The push_id of the workflow to check.

Returns:

  • pull_count (int): The total number of times the workflow has been pulled.

Description:

Retrieves the total number of times a workflow has been pulled from the GenSphere Hub using the provided push_id.

Raises:

  • Exception: If an error occurs during the request.

Example Usage:

pull_count = hub.count_pulls(push_id=push_id)
print(f"The workflow has been pulled {pull_count} times.")

Utility Functions

This section documents the utility functions used within GenSphere, primarily for internal processing and validation.


get_function_schema

Module: genflow.py

def get_function_schema(func):

Parameters:

  • func (function): The Python function object to generate a schema for.

Returns:

  • function_def (dict): A dictionary representing the function definition, including name, description, and parameters.

Description:

Generates a schema for a given function by inspecting its signature and docstring. This schema is used for OpenAI's function calling feature in LLM service nodes. It ensures that the function parameters are properly typed and documented.

Raises:

  • ValueError: If a parameter lacks a type annotation or if the function lacks a docstring.

Example Usage:

Used internally when preparing function definitions for OpenAI's function calling.


validate_yaml

Module: yaml_utils.py

def validate_yaml(
    yaml_file,
    functions_filepath=None,
    structured_output_schema_filepath=None,
    parent_node_names=None,
    visited_files=None,
    parent_params=None,
    parent_node_outputs=None
):

Parameters:

  • yaml_file (str): Path to the YAML file being validated.
  • functions_filepath (str, optional): Path to the functions file.
  • structured_output_schema_filepath (str, optional): Path to the schemas file.
  • parent_node_names (Set[str], optional): Set of node names from the parent flow.
  • visited_files (Set[str], optional): Set of visited YAML files to prevent circular references.
  • parent_params (Set[str], optional): Set of parameter names passed from the parent flow.
  • parent_node_outputs (Dict[str, List[str]], optional): Dictionary of node outputs from parent flows.

Returns:

  • validated (bool): True if validation passes, False otherwise.
  • error_msgs (List[str]): List of error messages encountered during validation.
  • node_outputs (Dict[str, List[str]]): Dictionary of node outputs in the current flow.

Description:

Validates a YAML workflow file and any associated sub-flows for consistency and correctness. Checks for issues such as:

  • Missing required fields (name, type).
  • Duplicate node names.
  • Undefined or duplicate outputs.
  • References to undefined nodes or outputs.
  • Cycles in the execution graph.
  • Validity of functions and schemas.

Raises:

  • FileNotFoundError: If referenced files do not exist.
  • ValueError: If the YAML structure is invalid.

Example Usage:

Used internally before executing or composing workflows to ensure they are valid.


collect_referenced_nodes_and_outputs

Module: yaml_utils.py

def collect_referenced_nodes_and_outputs(params):

Parameters:

  • params (dict): Parameters dictionary from a node.

Returns:

  • referenced_nodes_outputs (Set[Tuple[str, str]]): A set of tuples containing referenced node names and outputs.

Description:

Analyzes the parameters of a node to identify all referenced nodes and their outputs, which is essential for validating dependencies and ensuring that all references are valid.


collect_used_params

Module: yaml_utils.py

def collect_used_params(yaml_data):

Parameters:

  • yaml_data (dict): The YAML data of a workflow.

Returns:

  • used_params (Set[str]): A set of parameter names used within the workflow.

Description:

Collects all parameter names that are used in the workflow, particularly in the context of nested workflows (yml_flow nodes). This helps in validating that all required parameters are provided.


collect_referenced_params

Module: yaml_utils.py

def collect_referenced_params(params):

Parameters:

  • params (dict): Parameters dictionary from a node.

Returns:

  • referenced_params (Set[str]): A set of parameter names referenced in the parameters.

Description:

Identifies all parameter names that are referenced within the node's parameters, usually in templated strings. This is used to ensure that all referenced parameters are defined.


collect_referenced_nodes

Module: yaml_utils.py

def collect_referenced_nodes(params):

Parameters:

  • params (dict): Parameters dictionary from a node.

Returns:

  • referenced_nodes (Set[str]): A set of node names referenced in the parameters.

Description:

Identifies all node names that are referenced within the node's parameters. This is crucial for building the execution graph and determining the correct execution order.


load_yaml_file

Module: yaml_utils.py

def load_yaml_file(yaml_file):

Parameters:

  • yaml_file (str): Path to the YAML file to load.

Returns:

  • data (dict): The loaded YAML data.

Description:

Loads the YAML data from a file and handles parsing errors. Ensures that the file exists and contains valid YAML.

Raises:

  • FileNotFoundError: If the YAML file does not exist.
  • ValueError: If there is an error parsing the YAML file.

has_yml_flow_nodes

Module: yaml_utils.py

def has_yml_flow_nodes(yaml_data):

Parameters:

  • yaml_data (dict): The YAML data of a workflow.

Returns:

  • bool: True if the workflow contains any yml_flow nodes, False otherwise.

Description:

Checks whether the given YAML data contains any nested workflows (yml_flow nodes). This helps determine if composition is necessary before execution.


get_base_output_name

Module: yaml_utils.py

def get_base_output_name(output_reference):

Parameters:

  • output_reference (str): A string representing an output reference (e.g., 'countries_list[i]').

Returns:

  • base_output_name (str): The base output name extracted from the reference.

Description:

Extracts the base output name from a complex output reference that may include indexing or attribute access. Used during validation to identify the actual outputs being referenced.


parse_yaml

Module: graph_builder.py

def parse_yaml(yaml_file):

Parameters:

  • yaml_file (str): Path to the YAML file to parse.

Returns:

  • data (dict): Parsed YAML data.

Description:

Parses a YAML file and returns its content as a dictionary. Validates the existence of the file and handles parsing errors.

Raises:

  • FileNotFoundError: If the YAML file does not exist.
  • yaml.YAMLError: If an error occurs during YAML parsing.

Example Usage:

data = parse_yaml('workflow.yaml')

extract_referenced_nodes

Module: graph_builder.py

def extract_referenced_nodes(template_str):

Parameters:

  • template_str (str): A templated string containing references to other nodes (e.g., "{{ node.output }}").

Returns:

  • referenced_nodes (Set[str]): A set of referenced node names.

Description:

Extracts all referenced node names from a templated string using regular expressions. Useful for identifying dependencies between nodes in a workflow.

Example Usage:

template_str = "{{ node1.output }} and {{ node2.output }}"
referenced_nodes = extract_referenced_nodes(template_str)
# referenced_nodes will be {'node1', 'node2'}

traverse_node_fields

Module: graph_builder.py

def traverse_node_fields(node_value):

Parameters:

  • node_value (Union[str, dict, list]): The node value to traverse.

Returns:

  • referenced_nodes (Set[str]): A set of referenced node names found within the node value.

Description:

Recursively traverses a node's fields to find all referenced node names. Handles strings, dictionaries, and lists. Used to identify all dependencies for a node.

Example Usage:

node_params = {
    'param1': '{{ node1.output }}',
    'param2': {
        'subparam': '{{ node2.output }}'
    }
}
referenced_nodes = traverse_node_fields(node_params)
# referenced_nodes will be {'node1', 'node2'}

identify_and_style_entrypoints_outputs

Module: graph_builder.py

def identify_and_style_entrypoints_outputs(elements):

Parameters:

  • elements (list): List of Cytoscape elements (nodes and edges).

Returns:

  • elements (list): Updated list of Cytoscape elements with styled entrypoints and output nodes.

Description:

Identifies entrypoint nodes (nodes with no incoming edges) and output nodes (nodes with no outgoing edges) in the workflow graph and styles them accordingly for visualization purposes.

Example Usage:

elements = identify_and_style_entrypoints_outputs(elements)

build_graph_data

Module: graph_builder.py

def build_graph_data(yaml_file):

Parameters:

  • yaml_file (str): Path to the YAML file defining the workflow.

Returns:

  • elements (list): List of Cytoscape elements (nodes and edges) representing the workflow graph.

Description:

Builds graph data compatible with Cytoscape from a YAML workflow definition. It processes nodes and edges, identifies dependencies, and prepares the data for visualization.

Raises:

  • ValueError: If a node lacks a name or if there are duplicate node names.

Example Usage:

elements = build_graph_data('workflow.yaml')

Additional Information

These utility functions are primarily used internally by GenSphere to process and validate workflows. Understanding them can be helpful for advanced users who wish to extend or debug the framework.


Note: When developing custom functions or schemas for use in GenSphere workflows, ensure that:

  • Functions have proper docstrings and type annotations.
  • Schemas are defined using Pydantic models.
  • Functions and schemas are placed in the files specified when initializing GenFlow or YamlCompose.

Conclusion

For more examples and usage instructions, refer to the Tutorials.


If you have any questions or need further assistance, reach out on our GitHub Issues page.