Skip to content

🔁 Workflow

evoagentx.workflow

WorkFlowGenerator

WorkFlowGenerator(**kwargs)

Bases: BaseModule

Automated workflow generation system based on high-level goals.

The WorkFlowGenerator is responsible for creating complete workflow graphs from high-level goals or task descriptions. It breaks down the goal into subtasks, creates the necessary dependency connections between tasks, and assigns or generates appropriate agents for each task.

Attributes:

Name Type Description
llm Optional[BaseLLM]

Language model used for generation and planning

task_planner Optional[TaskPlanner]

Component responsible for breaking down goals into subtasks

agent_generator Optional[AgentGenerator]

Component responsible for agent assignment or creation

workflow_reviewer Optional[WorkFlowReviewer]

Component for reviewing and improving workflows

num_turns Optional[PositiveInt]

Number of refinement iterations for the workflow

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

WorkFlowGraph

WorkFlowGraph(**kwargs)

Bases: BaseModule

Represents a complete workflow as a directed graph.

WorkFlowGraph models a workflow as a directed graph where nodes represent tasks and edges represent dependencies and data flow between tasks. It provides methods for constructing, validating, traversing, and executing workflows.

The graph structure supports advanced features like detecting and handling loops, determining execution order, and tracking execution state.

Attributes:

Name Type Description
goal str

The high-level objective of this workflow

nodes Optional[List[WorkFlowNode]]

List of WorkFlowNode instances representing tasks

edges Optional[List[WorkFlowEdge]]

List of WorkFlowEdge instances representing dependencies

graph Optional[Union[MultiDiGraph, WorkFlowGraph]]

Internal NetworkX MultiDiGraph or another WorkFlowGraph

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

edge_exists

edge_exists(edge: Union[Tuple[str, str], WorkFlowEdge], **attr_filters) -> bool

Check whether an edge exists in the workflow graph. The input edge can either be a tuple or a WorkFlowEdge instance.

  1. If a tuple is passed, it should be (source, target). The function will only determin whether there is an edge between the source node and the target node. If attr_filters is passed, they will also be used to match the edge attributes.
  2. If a WorkFlowEdge is passed, it will use the eq method in WorkFlowEdge to determine

edge (Union[Tuple[str, str], WorkFlowEdge]):
    - If a tuple is provided, it should be in the format `(source, target)`. 
    The method will check whether there is an edge between the source and target nodes.
    If `attr_filters` are provided, they will be used to match edge attributes.
    - If a WorkFlowEdge instance is provided, the method will use the `__eq__` method in WorkFlowEdge 
    to determine whether the edge exists.

attr_filters (dict, optional):
    Additional attributes to filter edges when `edge` is a tuple.

bool: True if the edge exists and matches the filters (if provided); False otherwise.
Source code in evoagentx/workflow/workflow_graph.py
def edge_exists(self, edge: Union[Tuple[str, str], WorkFlowEdge], **attr_filters) -> bool:

    """
    Check whether an edge exists in the workflow graph. The input `edge` can either be a tuple or a WorkFlowEdge instance.

    1. If a tuple is passed, it should be (source, target). The function will only determin whether there is an edge between the source node and the target node. 
    If attr_filters is passed, they will also be used to match the edge attributes. 
    2. If a WorkFlowEdge is passed, it will use the __eq__ method in WorkFlowEdge to determine 

    Parameters:
    ----------
        edge (Union[Tuple[str, str], WorkFlowEdge]):
            - If a tuple is provided, it should be in the format `(source, target)`. 
            The method will check whether there is an edge between the source and target nodes.
            If `attr_filters` are provided, they will be used to match edge attributes.
            - If a WorkFlowEdge instance is provided, the method will use the `__eq__` method in WorkFlowEdge 
            to determine whether the edge exists.

        attr_filters (dict, optional):
            Additional attributes to filter edges when `edge` is a tuple.

    Returns:
    -------
        bool: True if the edge exists and matches the filters (if provided); False otherwise.
    """
    if isinstance(edge, tuple):
        assert len(edge) == 2, "edge must be a tuple (source, target) or WorkFlowEdge instance"
        source, target = edge 
        return self._edge_exists(source, target, **attr_filters)
    elif isinstance(edge, WorkFlowEdge):
        return edge in self.edges 
    else:
        raise TypeError("edge must be a tuple (source, target) or WorkFlowEdge instance")

list_nodes

list_nodes() -> List[str]

return the names of all nodes

Source code in evoagentx/workflow/workflow_graph.py
def list_nodes(self) -> List[str]:
    """
    return the names of all nodes 
    """
    return [node.name for node in self.nodes]

get_node

get_node(node_name: str) -> WorkFlowNode

return a WorkFlowNode instance based on its name.

Source code in evoagentx/workflow/workflow_graph.py
def get_node(self, node_name: str) -> WorkFlowNode:
    """
    return a WorkFlowNode instance based on its name.
    """
    if not self.node_exists(node=node_name):
        raise KeyError(f"{node_name} is an invalid node name. Currently available node names: {self.list_nodes()}")
    return self.graph.nodes[node_name]["ref"]

reset_graph

reset_graph()

set the status of all nodes to pending

Source code in evoagentx/workflow/workflow_graph.py
def reset_graph(self):
    """
    set the status of all nodes to pending
    """
    for node in self.nodes:
        node.set_status(WorkFlowNodeState.PENDING)

set_node_status

set_node_status(node: Union[str, WorkFlowNode], new_state: WorkFlowNodeState) -> bool

Update the state of a specific node.

Parameters:

Name Type Description Default
node Union[str, WorkFlowNode]

The name of a node or the node instance.

required
new_state WorkFlowNodeState

The new state to set.

required

Returns:

Name Type Description
bool bool

True if the state was updated successfully, False otherwise.

Source code in evoagentx/workflow/workflow_graph.py
def set_node_status(self, node: Union[str, WorkFlowNode], new_state: WorkFlowNodeState) -> bool:
    """
    Update the state of a specific node. 

    Args:
        node (Union[str, WorkFlowNode]): The name of a node or the node instance.
        new_state (WorkFlowNodeState): The new state to set.

    Returns:
        bool: True if the state was updated successfully, False otherwise.
    """
    flag = False
    try:
        if isinstance(node, str):
            node = self.get_node(node_name=node)
        node.set_status(new_state)
        flag = True
    except Exception as e:
        raise ValueError(f"An error occurs when setting node status: {e}")
    return flag

filter_completed_nodes

filter_completed_nodes(nodes: List[Union[str, WorkFlowNode]]) -> List[str]

remove completed nodes from nodes

Source code in evoagentx/workflow/workflow_graph.py
def filter_completed_nodes(self, nodes: List[Union[str, WorkFlowNode]]) -> List[str]:
    """
    remove completed nodes from `nodes`
    """
    node_names = [node if isinstance(node, str) else node.name for node in nodes]
    uncompleted_nodes = [] 
    for node_name in node_names:
        if self.get_node(node_name).is_complete:
            continue
        uncompleted_nodes.append(node_name)
    return uncompleted_nodes

get_candidate_children_nodes

get_candidate_children_nodes(completed_nodes: List[Union[str, WorkFlowNode]]) -> List[str]

Return the next set of possible tasks to execute. If there are no loops in the graph, consider only the uncompleted children. If there exists loops, also consider the previous completed tasks.

Parameters:

Name Type Description Default
completed_nodes List[Union[str, WorkFlowNode]]

A list of completed nodes.

required

Returns:

Type Description
List[str]

List[str]: List of node names that are candidates for execution.

Source code in evoagentx/workflow/workflow_graph.py
def get_candidate_children_nodes(self, completed_nodes: List[Union[str, WorkFlowNode]]) -> List[str]:
    """
    Return the next set of possible tasks to execute. If there are no loops in the graph, consider only the uncompleted children. 
    If there exists loops, also consider the previous completed tasks.

    Args:
        completed_nodes (List[Union[str, WorkFlowNode]]): A list of completed nodes.

    Returns:
        List[str]: List of node names that are candidates for execution.
    """
    node_names = [node if isinstance(node, str) else node.name for node in completed_nodes]
    has_loop = (len(self._loops) > 0)
    if has_loop:
        # if there exists loops, we need to check the completed nodes and their children nodes
        uncompleted_children_nodes = []
        for node_name in node_names:
            children_nodes = self.get_all_children_nodes(nodes=[node_name])
            if self.is_loop_end(node=node_name):
                current_uncompleted_children_nodes = [] 
                for child in children_nodes:
                    if self.is_loop_start(node=child):
                        # node_name是一个环的结束的时候,如果它的子节点是环的开始,那么无论它是否completed,都添加到下一步可执行的操作
                        current_uncompleted_children_nodes.append(child)
                    else:
                        # node_name是环的结束,但是子节点不是环的开始时,需要检查child是否已经completed,只添加未完成的任务
                        current_uncompleted_children_nodes.extend(self.filter_completed_nodes(nodes=[child]))
            else:
                current_uncompleted_children_nodes = self.filter_completed_nodes(nodes=children_nodes)
            for child in current_uncompleted_children_nodes:
                if child not in uncompleted_children_nodes:
                    uncompleted_children_nodes.append(child)
    else:
        # 不存在环的时候直接得到所有的子节点,并去掉其中已完成的部分
        children_nodes = self.get_all_children_nodes(nodes=node_names)
        uncompleted_children_nodes = self.filter_completed_nodes(nodes=children_nodes)

    return uncompleted_children_nodes

are_dependencies_complete

are_dependencies_complete(node_name: str) -> bool

Check if all predecessors for a node are complete.

Parameters:

Name Type Description Default
node_name str

The name of the task/node to check.

required

Returns:

Name Type Description
bool bool

True if all predecessors are complete, False otherwise.

Source code in evoagentx/workflow/workflow_graph.py
def are_dependencies_complete(self, node_name: str) -> bool:
    """
    Check if all predecessors for a node are complete.

    Args:
        node_name (str): The name of the task/node to check.

    Returns:
        bool: True if all predecessors are complete, False otherwise.
    """
    has_loop = (len(self._loops) > 0)
    predecessors = self.get_node_predecessors(node=node_name)
    if has_loop and self.is_loop_start(node=node_name):
        flag = True 
        for pre in predecessors:
            if self.is_loop_end(pre):
                pass 
            else:
                flag &= self.get_node(pre).is_complete
    else:
        flag = all(self.get_node(pre).is_complete for pre in predecessors)
    return flag

display

display()

Display the workflow graph with node and edge attributes. Nodes are colored based on their status.

Source code in evoagentx/workflow/workflow_graph.py
def display(self):
    """
    Display the workflow graph with node and edge attributes.
    Nodes are colored based on their status.
    """
    import matplotlib.pyplot as plt

    # Define colors for node statuses
    status_colors = {
        WorkFlowNodeState.PENDING: 'lightgray',
        WorkFlowNodeState.RUNNING: 'orange',
        WorkFlowNodeState.COMPLETED: 'green',
        WorkFlowNodeState.FAILED: 'red'
    }

    if not self.graph.nodes:
        print("Graph is empty. No nodes to display.")
        return

    # Get node colors based on their statuses
    node_colors = [status_colors.get(self.get_node_status(node), 'lightgray') for node in self.graph.nodes]

    # Prepare node labels with additional information
    node_labels = {node: self.get_node_description(data["ref"]) for node, data in self.graph.nodes(data=True)}

    # Draw the graph
    # pos = nx.shell_layout(self.graph)
    if len(self.graph.nodes) == 1:
        single_node = list(self.graph.nodes)[0]
        pos = {single_node: (0, 0)}  # Place the single node at the center
    else:
        pos = nx.shell_layout(self.graph)

    plt.figure(figsize=(12, 8))
    nx.draw(
        self.graph, pos, with_labels=False, node_color=node_colors, edge_color='black',
        node_size=1500, font_size=8, font_color='black', font_weight='bold'
    )

    if len(self.graph.nodes) == 1:
        for node, (x, y) in pos.items():
            plt.text(x+0.005, y, node_labels[node], ha='left', va='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.7))
    else:
        # Draw node labels next to the nodes (left-aligned)
        # text_offsets = {node: (pos[node][0]-0.2, pos[node][1]-0.22) for node in self.graph.nodes}
        y_positions = [y for _, y in pos.values()]
        y_min, y_max = min(y_positions), max(y_positions)
        lower_third_boundary = y_min + (y_max - y_min) / 3

        # Adjust text offsets based on node position in the graph
        text_offsets = {}
        for node, (x, y) in pos.items():
            if y < lower_third_boundary:  # If in the lower third, display label above the node
                text_offsets[node] = (x-0.2, y + 0.23)
            else:  # Otherwise, display label below the node
                text_offsets[node] = (x-0.2, y - 0.23)

        for node, (x, y) in text_offsets.items():
            plt.text(x, y, node_labels[node], ha='left', va='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.7))

    # Draw edge labels for priorities
    edge_labels = nx.get_edge_attributes(self.graph, 'priority')
    nx.draw_networkx_edge_labels(self.graph, pos, edge_labels=edge_labels)

    # Add a legend to show node status colors
    legend_elements = [
        plt.Line2D([0], [0], marker='o', color='w', label=status.name, markersize=10, markerfacecolor=color)
        for status, color in status_colors.items()
    ]
    plt.legend(handles=legend_elements, title="Workflow Node Status", loc='upper left', fontsize='medium')

    plt.title("Workflow Graph")
    plt.show()

SequentialWorkFlowGraph

SequentialWorkFlowGraph(goal: str, tasks: List[dict], **kwargs)

Bases: WorkFlowGraph

A linear workflow graph with a single path from start to end.

Parameters:

Name Type Description Default
goal str

The goal of the workflow.

required
tasks List[dict]

A list of tasks with their descriptions and inputs. Each task should have the following format: { "name": str, "description": str, "inputs": [{"name": str, "type": str, "required": bool, "description": str}, ...], "outputs": [{"name": str, "type": str, "required": bool, "description": str}, ...], "prompt": str, "system_prompt" (optional): str, default is DEFAULT_SYSTEM_PROMPT, "output_parser" (optional): Type[ActionOutput], "parse_mode" (optional): str, default is "str" "parse_func" (optional): Callable, "parse_title" (optional): str }

required
Source code in evoagentx/workflow/workflow_graph.py
def __init__(self, goal: str, tasks: List[dict], **kwargs):
    nodes = self._infer_nodes_from_tasks(tasks=tasks)
    edges = self._infer_edges_from_nodes(nodes=nodes)
    super().__init__(goal=goal, nodes=nodes, edges=edges, **kwargs)

get_graph_info

get_graph_info(**kwargs) -> dict

Get the information of the workflow graph.

Source code in evoagentx/workflow/workflow_graph.py
def get_graph_info(self, **kwargs) -> dict:
    """
    Get the information of the workflow graph.
    """
    config = {
        "class_name": self.__class__.__name__,
        "goal": self.goal, 
        "tasks": [
            {
                "name": node.name,
                "description": node.description,
                "inputs": [param.to_dict(ignore=["class_name"]) for param in node.inputs],
                "outputs": [param.to_dict(ignore=["class_name"]) for param in node.outputs],
                "prompt": node.agents[0].get("prompt", None),
                "system_prompt": node.agents[0].get("system_prompt", None),
                "parse_mode": node.agents[0].get("parse_mode", "str"), 
                "parse_func": node.agents[0].get("parse_func", None).__name__ if node.agents[0].get("parse_func", None) else None,
                "parse_title": node.agents[0].get("parse_title", None)
            }
            for node in self.nodes
        ]
    }
    return config

save_module

save_module(path: str, ignore: List[str] = [], **kwargs)

Save the workflow graph to a module file.

Source code in evoagentx/workflow/workflow_graph.py
def save_module(self, path: str, ignore: List[str] = [], **kwargs):
    """
    Save the workflow graph to a module file.
    """
    logger.info("Saving {} to {}", self.__class__.__name__, path)
    config = self.get_graph_info()
    for ignore_key in ignore:
        config.pop(ignore_key, None)
    make_parent_folder(path)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(config, f, indent=4)
    return path

WorkFlow

WorkFlow(**kwargs)

Bases: BaseModule

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

execute

execute(inputs: dict = {}, **kwargs) -> str

Synchronous wrapper for async_execute. Creates a new event loop and runs the async method.

Parameters:

Name Type Description Default
inputs dict

Dictionary of inputs for workflow execution

{}
**kwargs Any

Additional keyword arguments

{}

Returns:

Name Type Description
str str

The output of the workflow execution

Source code in evoagentx/workflow/workflow.py
def execute(self, inputs: dict = {}, **kwargs) -> str:
    """
    Synchronous wrapper for async_execute. Creates a new event loop and runs the async method.

    Args:
        inputs: Dictionary of inputs for workflow execution
        **kwargs (Any): Additional keyword arguments

    Returns:
        str: The output of the workflow execution
    """
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        return loop.run_until_complete(self.async_execute(inputs, **kwargs))
    finally:
        loop.close()

async_execute async

async_execute(inputs: dict = {}, **kwargs) -> str

Asynchronously execute the workflow.

Parameters:

Name Type Description Default
inputs dict

Dictionary of inputs for workflow execution

{}
**kwargs Any

Additional keyword arguments

{}

Returns:

Name Type Description
str str

The output of the workflow execution

Source code in evoagentx/workflow/workflow.py
async def async_execute(self, inputs: dict = {}, **kwargs) -> str:
    """
    Asynchronously execute the workflow.

    Args:
        inputs: Dictionary of inputs for workflow execution
        **kwargs (Any): Additional keyword arguments

    Returns:
        str: The output of the workflow execution
    """
    goal = self.graph.goal
    # inputs.update({"goal": goal})
    inputs = self._prepare_inputs(inputs)

    # check the inputs and outputs of the task 
    self._validate_workflow_structure(inputs=inputs, **kwargs)
    inp_message = Message(content=inputs, msg_type=MessageType.INPUT, wf_goal=goal)
    self.environment.update(message=inp_message, state=TrajectoryState.COMPLETED)

    failed = False
    error_message = None
    while not self.graph.is_complete and not failed:
        try:
            task: WorkFlowNode = await self.get_next_task()
            if task is None:
                break
            logger.info(f"Executing subtask: {task.name}")
            await self.execute_task(task=task)
        except Exception as e:
            failed = True
            error_message = Message(
                content=f"An Error occurs when executing the workflow: {e}",
                msg_type=MessageType.ERROR, 
                wf_goal=goal
            )
            self.environment.update(message=error_message, state=TrajectoryState.FAILED, error=str(e))

    if failed:
        logger.error(error_message.content)
        return "Workflow Execution Failed"

    logger.info("Extracting WorkFlow Output ...")
    output: str = await self.workflow_manager.extract_output(graph=self.graph, env=self.environment)
    return output

execute_task async

execute_task(task: WorkFlowNode)

Asynchronously execute a workflow task.

Parameters:

Name Type Description Default
task WorkFlowNode

The workflow node to execute

required
Source code in evoagentx/workflow/workflow.py
async def execute_task(self, task: WorkFlowNode):
    """
    Asynchronously execute a workflow task.

    Args:
        task: The workflow node to execute
    """
    last_executed_task = self.environment.get_last_executed_task()
    self.graph.step(source_node=last_executed_task, target_node=task)
    next_action: NextAction = await self.workflow_manager.schedule_next_action(
        goal=self.graph.goal,
        task=task, 
        agent_manager=self.agent_manager, 
        env=self.environment
    )
    if next_action.action_graph is not None:
        await self._async_execute_task_by_action_graph(task=task, next_action=next_action)
    else:
        await self._async_execute_task_by_agents(task=task, next_action=next_action)
    self.graph.completed(node=task)

ActionGraph

ActionGraph(**kwargs)

Bases: BaseModule

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

get_graph_info

get_graph_info(**kwargs) -> dict

Get the information of the action graph, including all operators from the instance.

Source code in evoagentx/workflow/action_graph.py
def get_graph_info(self, **kwargs) -> dict:
    """
    Get the information of the action graph, including all operators from the instance.
    """
    operators = {}
    # the extra fields are the fields that are not defined in the Pydantic model 
    for extra_name, extra_value in self.__pydantic_extra__.items():
        if isinstance(extra_value, Operator):
            operators[extra_name] = extra_value

    config = {
        "class_name": self.__class__.__name__,
        "name": self.name,
        "description": self.description, 
        "operators": {
            operator_name: {
                "class_name": operator.__class__.__name__,
                "name": operator.name,
                "description": operator.description,
                "interface": operator.interface,
                "prompt": operator.prompt
            }
            for operator_name, operator in operators.items()
        }
    }
    return config

load_module classmethod

load_module(path: str, llm_config: LLMConfig = None, **kwargs) -> Dict

Load the ActionGraph from a file.

Source code in evoagentx/workflow/action_graph.py
@classmethod
def load_module(cls, path: str, llm_config: LLMConfig = None, **kwargs) -> Dict:
    """
    Load the ActionGraph from a file.
    """
    assert llm_config is not None, "must provide `llm_config` when using `load_module` or `from_file` to load the ActionGraph from local storage" 
    action_graph_data = super().load_module(path, **kwargs) 
    action_graph_data["llm_config"] = llm_config.to_dict()
    return action_graph_data

from_dict classmethod

from_dict(data: Dict[str, Any], **kwargs) -> ActionGraph

Create an ActionGraph from a dictionary.

Source code in evoagentx/workflow/action_graph.py
@classmethod
def from_dict(cls, data: Dict[str, Any], **kwargs) -> "ActionGraph":
    """
    Create an ActionGraph from a dictionary.
    """
    class_name = data.get("class_name", None)
    if class_name:
        cls = MODULE_REGISTRY.get_module(class_name)
    operators_info = data.pop("operators", None)
    module = cls._create_instance(data)
    if operators_info:
        for extra_name, extra_value in module.__pydantic_extra__.items():
            if isinstance(extra_value, Operator) and extra_name in operators_info:
                extra_value.set_operator(operators_info[extra_name])
    return module

save_module

save_module(path: str, ignore: List[str] = [], **kwargs)

Save the workflow graph to a module file.

Source code in evoagentx/workflow/action_graph.py
def save_module(self, path: str, ignore: List[str] = [], **kwargs):
    """
    Save the workflow graph to a module file.
    """
    logger.info("Saving {} to {}", self.__class__.__name__, path)
    config = self.get_graph_info()
    for ignore_key in ignore:
        config.pop(ignore_key, None)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(config, f, indent=4)

    return path