Skip to content

🧮 Optimizers

evoagentx.optimizers

SEWOptimizer

SEWOptimizer(**kwargs)

Bases: Optimizer

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

step

step(**kwargs) -> Union[SequentialWorkFlowGraph, ActionGraph]

Take a step of optimization and return the optimized graph.

Source code in evoagentx/optimizers/sew_optimizer.py
def step(self, **kwargs) -> Union[SequentialWorkFlowGraph, ActionGraph]:
    """
    Take a step of optimization and return the optimized graph.
    """
    graph = self._select_graph_with_highest_score(return_metrics=False)
    if isinstance(graph, SequentialWorkFlowGraph):
        new_graph = self._workflow_graph_step(graph)
    elif isinstance(graph, ActionGraph):
        new_graph = self._action_graph_step(graph)
    else:
        raise ValueError(f"Invalid graph type: {type(graph)}. The graph should be an instance of `WorkFlowGraph` or `ActionGraph`.")
    return new_graph

evaluate

evaluate(dataset: Benchmark, eval_mode: str = 'test', graph: Optional[Union[SequentialWorkFlowGraph, ActionGraph]] = None, indices: Optional[List[int]] = None, sample_k: Optional[int] = None, **kwargs) -> dict

Evaluate the workflow. If graph is provided, use the provided graph for evaluation. Otherwise, use the graph in the optimizer.

Parameters:

Name Type Description Default
dataset Benchmark

The dataset to evaluate the workflow on.

required
eval_mode str

The evaluation mode. Choices: ["test", "dev", "train"].

'test'
graph Union[WorkFlowGraph, ActionGraph]

The graph to evaluate. If not provided, use the graph in the optimizer.

None
indices List[int]

The indices of the data to evaluate the workflow on.

None
sample_k int

The number of data to evaluate the workflow on. If provided, a random sample of size sample_k will be used.

None

Returns:

Name Type Description
dict dict

The metrics of the workflow evaluation.

Source code in evoagentx/optimizers/sew_optimizer.py
def evaluate(
    self, 
    dataset: Benchmark, 
    eval_mode: str = "test", 
    graph: Optional[Union[SequentialWorkFlowGraph, ActionGraph]] = None,
    indices: Optional[List[int]] = None,
    sample_k: Optional[int] = None,
    **kwargs
) -> dict:
    """
    Evaluate the workflow. If `graph` is provided, use the provided graph for evaluation. Otherwise, use the graph in the optimizer. 

    Args:
        dataset (Benchmark): The dataset to evaluate the workflow on.
        eval_mode (str): The evaluation mode. Choices: ["test", "dev", "train"].
        graph (Union[WorkFlowGraph, ActionGraph], optional): The graph to evaluate. If not provided, use the graph in the optimizer.
        indices (List[int], optional): The indices of the data to evaluate the workflow on.
        sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used.

    Returns:
        dict: The metrics of the workflow evaluation.
    """
    graph = graph if graph is not None else self.graph
    metrics_list = []
    for i in range(self.eval_rounds):
        eval_info = [
            f"[{type(graph).__name__}]", 
            f"Evaluation round {i+1}/{self.eval_rounds}", 
            f"Mode: {eval_mode}"
        ]
        if indices is not None:
            eval_info.append(f"Indices: {len(indices)} samples")
        if sample_k is not None:
            eval_info.append(f"Sample size: {sample_k}")
        logger.info(" | ".join(eval_info))
        metrics = self.evaluator.evaluate(
            graph=graph, 
            benchmark=dataset, 
            eval_mode=eval_mode, 
            indices=indices, 
            sample_k=sample_k,
            **kwargs
        )
        metrics_list.append(metrics)
    avg_metrics = self.evaluator._calculate_average_score(metrics_list)

    return avg_metrics

save

save(path: str, ignore: List[str] = [])

Save the (optimized) workflow graph to a file.

Parameters:

Name Type Description Default
path str

The path to save the workflow graph.

required
ignore List[str]

The keys to ignore when saving the workflow graph.

[]
Source code in evoagentx/optimizers/sew_optimizer.py
def save(self, path: str, ignore: List[str] = []):
    """
    Save the (optimized) workflow graph to a file. 

    Args:
        path (str): The path to save the workflow graph.
        ignore (List[str]): The keys to ignore when saving the workflow graph.
    """
    self.graph.save_module(path, ignore=ignore)

AFlowOptimizer

AFlowOptimizer(**kwargs)

Bases: BaseModule

AFlow Optimizer for workflow optimization.

This optimizer iteratively improves workflows through multiple rounds of optimization using large language models. It evaluates workflow performance, identifies improvement opportunities, and applies optimizations based on experience and convergence metrics.

Attributes:

Name Type Description
question_type str

Type of task to optimize for (e.g., qa, match, code)

graph_path str

Path to the workflow graph directory (must contain graph.py and prompt.py)

optimized_path str

Path to save optimized workflows (defaults to graph_path)

initial_round int

Starting round number for optimization

optimizer_llm BaseLLM

LLM used for generating optimizations

executor_llm BaseLLM

LLM used for executing the workflow

operators List[str]

List of operators available for optimization

sample int

Number of rounds to sample from for optimization

max_rounds int

Maximum number of optimization rounds to perform

validation_rounds int

Number of validation runs per optimization round

eval_rounds int

Number of evaluation runs for test mode

check_convergence bool

Whether to check for optimization convergence

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

optimize

optimize(benchmark: Benchmark)

Run the optimization process on the workflow.

Performs multiple rounds of optimization, evaluating each round against the benchmark and checking for convergence. Continues until convergence is detected or the maximum number of rounds is reached.

Parameters:

Name Type Description Default
benchmark Benchmark

The benchmark to evaluate the workflow against

required
Source code in evoagentx/optimizers/aflow_optimizer.py
def optimize(self, benchmark: Benchmark):
    """Run the optimization process on the workflow.

    Performs multiple rounds of optimization, evaluating each round against
    the benchmark and checking for convergence. Continues until convergence
    is detected or the maximum number of rounds is reached.

    Args:
        benchmark: The benchmark to evaluate the workflow against
    """
    self.benchmark = benchmark
    for _ in range(self.max_rounds):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        score = loop.run_until_complete(self._execute_with_retry(self._optimize_graph))
        self.round += 1
        logger.info(f"Score for round {self.round}: {score}")
        if self._check_convergence():
            break
        if self.round >= self.max_rounds:
            logger.info(f"Max rounds reached: {self.max_rounds}, stopping optimization.")
            break

test

test(benchmark: Benchmark, test_rounds: List[int] = None)

Run the test evaluation on optimized workflows.

Evaluates specified rounds (or the best round if none specified) against the benchmark multiple times and logs the results.

Parameters:

Name Type Description Default
benchmark Benchmark

The benchmark to evaluate against

required
test_rounds List[int]

Specific round numbers to test, or None to use the best round

None
Source code in evoagentx/optimizers/aflow_optimizer.py
def test(self, benchmark: Benchmark, test_rounds: List[int] = None):
    """Run the test evaluation on optimized workflows.

    Evaluates specified rounds (or the best round if none specified) against
    the benchmark multiple times and logs the results.

    Args:
        benchmark: The benchmark to evaluate against
        test_rounds: Specific round numbers to test, or None to use the best round
    """
    self.benchmark = benchmark
    if test_rounds is None:
        best_round = self._load_best_round()
        logger.info(f"No test rounds provided, using best round: {best_round}")
        test_rounds = [best_round]
    for _ in tqdm(range(self.eval_rounds)):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self._run_test(test_rounds))

TextGradOptimizer

TextGradOptimizer(**kwargs)

Bases: BaseModule

Optimizes the prompt templates and system prompts in a workflow using TextGrad. For more information, see https://github.com/zou-group/textgrad.

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

optimize

optimize(dataset: Benchmark, use_answers: bool = True, seed: Optional[int] = None)

Optimizes self.graph using dataset.

Parameters:

Name Type Description Default
dataset Benchmark

The dataset to use for optimization.

required
use_answers bool

Whether to use the answers (labels) in the training set for optimization. If False, dataset's training set does not need to have answers. If eval_every_n_steps is set to None, we can optimize the workflow without any labeled data.

True
seed Optional[int]

The random seed to use for shuffling the data.

None
Source code in evoagentx/optimizers/textgrad_optimizer.py
def optimize(self, dataset: Benchmark, use_answers: bool = True, seed: Optional[int] = None):
    """Optimizes self.graph using `dataset`.

    Args:
        dataset (Benchmark): The dataset to use for optimization.
        use_answers (bool): Whether to use the answers (labels) in the training set for optimization.
            If False, `dataset`'s training set does not need to have answers.
            If `eval_every_n_steps` is set to None, we can optimize the workflow without any labeled data.
        seed (Optional[int]): The random seed to use for shuffling the data.
    """
    self._init_textgrad(dataset, use_answers)

    def iterator():
        epoch = 0
        while True:
            # Shuffle train data every epoch
            effective_seed = seed + epoch if seed is not None else None
            train_data = dataset.get_train_data(sample_k=len(dataset._train_data), seed=effective_seed)
            for i in range(0, len(train_data), self.batch_size):
                batch = train_data[i:i + self.batch_size]
                inputs = [self.evaluator.collate_func(x) for x in batch]
                if use_answers:
                    labels = dataset.get_labels(batch)
                else:
                    labels = None
                yield inputs, labels
            epoch += 1

    data_iterator = iterator()

    for step in tqdm(range(self.max_steps)):
        inputs, labels = next(data_iterator)
        self.step(inputs, labels, dataset, use_answers)

        if self.eval_interval is not None and (step + 1) % self.eval_interval == 0:
            logger.info(f"Evaluating the workflow at step {step+1} ...")
            with suppress_logger_info():
                metrics = self.evaluate(dataset, **self.eval_config)
            self.log_snapshot(self.graph, metrics)
            logger.info(f"Step {step+1} metrics: {metrics}")

            # If rollback is enabled, keep track of the best snapshot
            if self.rollback:
                if len(self._snapshot) == 1:
                    best_snapshot = self._snapshot[-1]
                    best_average_score = np.mean(list(metrics.values()))
                else:
                    current_average_score = np.mean(list(metrics.values()))

                    if current_average_score >= best_average_score:
                        # If the current average score is better than the best average score, update the best snapshot
                        best_snapshot = self._snapshot[-1]
                        best_average_score = current_average_score
                    else:
                        # If the current average score is worse than the best average score, roll back to the best snapshot
                        logger.info(f"Metrics are worse than the best snapshot which has {best_snapshot['metrics']}. Rolling back to the best snapshot.")
                        best_graph = SequentialWorkFlowGraph.from_dict(best_snapshot["graph"])
                        self.graph = best_graph
                        self._create_textgrad_agents()

        if self.save_interval is not None and (step + 1) % self.save_interval == 0:
            logger.info(f"Saving the workflow at step {step+1} ...")
            self.save(os.path.join(self.save_path, f"{dataset.name}_textgrad_step_{step+1}.json"))

    logger.info(f"Reached the maximum number of steps {self.max_steps}. Optimization has finished.")
    self.save(os.path.join(self.save_path, f"{dataset.name}_textgrad_final.json"))

    # Saves the best graph
    if len(self._snapshot) > 0:
        best_graph = self._select_graph_with_highest_score()
        self.save(os.path.join(self.save_path, f"{dataset.name}_textgrad_best.json"), graph=best_graph)

step

step(inputs: List[dict[str, str]], labels: Optional[List[str | dict[str, str]]], dataset: Benchmark, use_answers: bool = True)

Performs one optimization step using a batch of data.

Source code in evoagentx/optimizers/textgrad_optimizer.py
def step(self, inputs: List[dict[str, str]], labels: Optional[List[str|dict[str, str]]], dataset: Benchmark, use_answers: bool = True):
    """Performs one optimization step using a batch of data."""

    if labels is None and use_answers:
        raise ValueError("Labels must be provided if `use_answers` is True.")

    losses = []

    if use_answers:
        for input, label in zip(inputs, labels):
            output = self.forward(input)
            if isinstance(label, str):
                label = Variable(label, requires_grad=False, role_description="correct answer for the query")
            elif isinstance(label, dict):
                if not isinstance(dataset, CodingBenchmark):
                    raise ValueError("Label must be a string for non-coding benchmarks.")
                end_node_name = self.graph.find_end_nodes()[0]
                end_node = self.graph.get_node(end_node_name)
                output_name = end_node.outputs[0].name
                code = output.parsed_outputs[output_name]
                label = self._format_code_label(code, label, dataset)
                label = Variable(label, requires_grad=False, role_description="the task, the test result, and the correct code")
            loss = self.loss_fn([output, label])
            losses.append(loss)
    else:
        for input in inputs:
            output = self.forward(input)
            loss = self.loss_fn(output)
            losses.append(loss)

    total_loss = tg.sum(losses)
    total_loss.backward(self.optimizer_engine)
    self.textgrad_optimizer.step()
    self.textgrad_optimizer.zero_grad()

    # Checks if all the prompt templates contain the required inputs.
    # If not, fix them by appending the input placeholders at the end.
    for node in self.graph.nodes:
        prompt_template = node.textgrad_agent.prompt_template.value
        prompt_template = self._add_missing_input_placeholder(prompt_template, node)
        node.textgrad_agent.prompt_template.value = prompt_template

    self._update_workflow_graph()

forward

forward(inputs: dict[str, str]) -> Variable

Returns the final output from the workflow.

Source code in evoagentx/optimizers/textgrad_optimizer.py
def forward(self, inputs: dict[str, str]) -> Variable:
    """Returns the final output from the workflow."""
    self._visited_nodes = set()
    end_node = self.graph.find_end_nodes()[0]
    input_variables = self._initial_inputs_to_variables(inputs)
    output = self._compute_node(end_node, input_variables)
    return output

evaluate

evaluate(dataset: Benchmark, eval_mode: str = 'dev', graph: Optional[SequentialWorkFlowGraph] = None, indices: Optional[List[int]] = None, sample_k: Optional[int] = None, **kwargs) -> dict

Evaluate the workflow. If graph is provided, use the provided graph for evaluation. Otherwise, use the graph in the optimizer.

Parameters:

Name Type Description Default
dataset Benchmark

The dataset to evaluate the workflow on.

required
eval_mode str

The evaluation mode. Choices: ["test", "dev", "train"].

'dev'
graph SequentialWorkFlowGraph

The graph to evaluate. If not provided, use the graph in the optimizer.

None
indices List[int]

The indices of the data to evaluate the workflow on.

None
sample_k int

The number of data to evaluate the workflow on. If provided, a random sample of size sample_k will be used.

None

Returns:

Name Type Description
dict dict

The metrics of the workflow evaluation.

Source code in evoagentx/optimizers/textgrad_optimizer.py
def evaluate(
    self, 
    dataset: Benchmark, 
    eval_mode: str = "dev", 
    graph: Optional[SequentialWorkFlowGraph] = None,
    indices: Optional[List[int]] = None,
    sample_k: Optional[int] = None,
    **kwargs
) -> dict:
    """Evaluate the workflow. If `graph` is provided, use the provided graph for evaluation. Otherwise, use the graph in the optimizer. 

    Args:
        dataset (Benchmark): The dataset to evaluate the workflow on.
        eval_mode (str): The evaluation mode. Choices: ["test", "dev", "train"].
        graph (SequentialWorkFlowGraph, optional): The graph to evaluate. If not provided, use the graph in the optimizer.
        indices (List[int], optional): The indices of the data to evaluate the workflow on.
        sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used.

    Returns:
        dict: The metrics of the workflow evaluation.
    """
    if graph is None:
        graph = self.graph

    metrics_list = []
    for i in range(self.eval_rounds):
        eval_info = [
            f"[{type(graph).__name__}]", 
            f"Evaluation round {i+1}/{self.eval_rounds}", 
            f"Mode: {eval_mode}"
        ]
        if indices is not None:
            eval_info.append(f"Indices: {len(indices)} samples")
        if sample_k is not None:
            eval_info.append(f"Sample size: {sample_k}")
        logger.info(" | ".join(eval_info))
        metrics = self.evaluator.evaluate(
            graph=graph, 
            benchmark=dataset, 
            eval_mode=eval_mode, 
            indices=indices, 
            sample_k=sample_k,
            update_agents=True, 
            **kwargs
        )
        metrics_list.append(metrics)
    avg_metrics = self.evaluator._calculate_average_score(metrics_list)

    return avg_metrics

save

save(path: str, graph: Optional[SequentialWorkFlowGraph] = None, ignore: List[str] = [])

Save the workflow graph containing the optimized prompts to a file.

Parameters:

Name Type Description Default
path str

The path to save the workflow graph.

required
graph SequantialWorkFlowGraph

The graph to save. If not provided, use the graph in the optimizer.

None
ignore List[str]

The keys to ignore when saving the workflow graph.

[]
Source code in evoagentx/optimizers/textgrad_optimizer.py
def save(self, path: str, graph: Optional[SequentialWorkFlowGraph] = None, ignore: List[str] = []):
    """Save the workflow graph containing the optimized prompts to a file. 

    Args:
        path (str): The path to save the workflow graph.
        graph (SequantialWorkFlowGraph, optional): The graph to save. If not provided, use the graph in the optimizer.
        ignore (List[str]): The keys to ignore when saving the workflow graph.
    """
    if graph is None:
        graph = self.graph
    graph.save_module(path, ignore=ignore)

log_snapshot

log_snapshot(graph: SequentialWorkFlowGraph, metrics: dict)

Log the snapshot of the workflow.

Source code in evoagentx/optimizers/textgrad_optimizer.py
def log_snapshot(self, graph: SequentialWorkFlowGraph, metrics: dict):
    """Log the snapshot of the workflow."""
    self._snapshot.append(
        {
            "index": len(self._snapshot),
            "graph": deepcopy(graph.get_graph_info()),
            "metrics": metrics,
        }
    )

restore_best_graph

restore_best_graph()

Restore the best graph from the snapshot and set it to self.graph.

Source code in evoagentx/optimizers/textgrad_optimizer.py
def restore_best_graph(self):
    """Restore the best graph from the snapshot and set it to `self.graph`."""
    if len(self._snapshot) == 0:
        logger.info("No snapshot found. No graph to restore.")
        return

    best_graph, best_metrics = self._select_graph_with_highest_score(return_metrics=True)
    self.graph = best_graph
    logger.info(f"Restored the best graph from snapshot with metrics {best_metrics}")