Skip to content

πŸ§‘β€βš–οΈ Evaluators

evoagentx.evaluators

Evaluator

Evaluator(llm: BaseLLM, num_workers: int = 1, agent_manager: Optional[AgentManager] = None, collate_func: Optional[Callable] = None, output_postprocess_func: Optional[Callable] = None, verbose: Optional[bool] = None, **kwargs)

A class for evaluating the performance of a workflow.

Initialize the Evaluator.

Parameters:

Name Type Description Default
llm BaseLLM

The LLM to use for evaluation.

required
num_workers int

The number of parallel workers to use for evaluation. Default is 1.

1
agent_manager AgentManager

The agent manager used to construct the workflow. Only used when the workflow graph is a WorkFlowGraph.

None
collate_func Callable

A function to collate the benchmark data. It receives a single example from the benchmark and the output (which should be a dictionary) will serve as inputs
to the execute function of an WorkFlow (or ActionGraph) instance. Note that the keys in the collated output should match the inputs of the workflow. The default is a lambda function that returns the example itself.

None
output_postprocess_func Callable

A function to postprocess the output of the workflow. It receives the output of an WorkFlow instance (str) or an ActionGraph instance (dict) as input and the output will be passed to the evaluate function of the benchmark. The default is a lambda function that returns the output itself.

None
verbose bool

Whether to print the evaluation progress.

None
Source code in evoagentx/evaluators/evaluator.py
def __init__(
    self, 
    llm: BaseLLM,
    num_workers: int = 1, 
    agent_manager: Optional[AgentManager] = None,
    collate_func: Optional[Callable] = None, 
    output_postprocess_func: Optional[Callable] = None, 
    verbose: Optional[bool] = None, 
    **kwargs
):
    """
    Initialize the Evaluator.

    Args:
        llm (BaseLLM): The LLM to use for evaluation.
        num_workers (int): The number of parallel workers to use for evaluation. Default is 1. 
        agent_manager (AgentManager, optional): The agent manager used to construct the workflow. Only used when the workflow graph is a WorkFlowGraph.
        collate_func (Callable, optional): A function to collate the benchmark data. 
            It receives a single example from the benchmark and the output (which should be a dictionary) will serve as inputs  
            to the `execute` function of an WorkFlow (or ActionGraph) instance. 
            Note that the keys in the collated output should match the inputs of the workflow.
            The default is a lambda function that returns the example itself. 
        output_postprocess_func (Callable, optional): A function to postprocess the output of the workflow. 
            It receives the output of an WorkFlow instance (str) or an ActionGraph instance (dict) as input 
            and the output will be passed to the `evaluate` function of the benchmark. 
            The default is a lambda function that returns the output itself.
        verbose (bool, optional): Whether to print the evaluation progress.
    """
    self.llm = llm
    self.num_workers = num_workers
    self.agent_manager = agent_manager
    self._thread_agent_managers = {}
    self.collate_func = collate_func or (lambda x: x)
    self.output_postprocess_func = output_postprocess_func or (lambda x: x)
    self.verbose = verbose
    # {example_id: {"prediction": Any, "label": Any, "metrics": dict, "trajectory" (WorkFlowGraph only): List[Message]}}
    self._evaluation_records = {}
    self.kwargs = kwargs

evaluate

evaluate(graph: Union[WorkFlowGraph, ActionGraph], benchmark: Benchmark, eval_mode: str = 'test', indices: Optional[List[int]] = None, sample_k: Optional[int] = None, seed: Optional[int] = None, verbose: Optional[bool] = None, update_agents: Optional[bool] = False, **kwargs) -> dict

Evaluate the performance of the workflow on the benchmark.

Parameters:

Name Type Description Default
graph WorkFlowGraph or ActionGraph

The workflow to evaluate.

required
benchmark Benchmark

The benchmark to evaluate the workflow on.

required
eval_mode str

which split of the benchmark to evaluate the workflow on. Choices: ["test", "dev", "train"].

'test'
indices List[int]

The indices of the data to evaluate the workflow on.

None
sample_k int

The number of data to evaluate the workflow on. If provided, a random sample of size sample_k will be used.

None
verbose bool

Whether to print the evaluation progress. If not provided, the self.verbose will be used.

None
update_agents bool

Whether to update the agents in the agent manager. Only used when the workflow graph is a WorkFlowGraph.

False

Returns: dict: The average metrics of the workflow evaluation.

Source code in evoagentx/evaluators/evaluator.py
def evaluate(
    self, 
    graph: Union[WorkFlowGraph, ActionGraph],
    benchmark: Benchmark, 
    eval_mode: str = "test", 
    indices: Optional[List[int]] = None, 
    sample_k: Optional[int] = None, 
    seed: Optional[int] = None, 
    verbose: Optional[bool] = None,
    update_agents: Optional[bool] = False,
    **kwargs
) -> dict:
    """
    Evaluate the performance of the workflow on the benchmark.

    Args:
        graph (WorkFlowGraph or ActionGraph): The workflow to evaluate.
        benchmark (Benchmark): The benchmark to evaluate the workflow on.
        eval_mode (str): which split of the benchmark to evaluate the workflow on. Choices: ["test", "dev", "train"].
        indices (List[int], optional): The indices of the data to evaluate the workflow on.
        sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used.
        verbose (bool, optional): Whether to print the evaluation progress. If not provided, the `self.verbose` will be used.
        update_agents (bool, optional): Whether to update the agents in the agent manager. Only used when the workflow graph is a WorkFlowGraph.
    Returns:
        dict: The average metrics of the workflow evaluation.
    """
    # clear the evaluation records
    self._evaluation_records.clear()

    # update the agents in the agent manager
    if isinstance(graph, WorkFlowGraph) and update_agents:
        if self.agent_manager is None:
            raise ValueError(f"`agent_manager` is not provided in {type(self).__name__}. Please provide an agent manager when evaluating a WorkFlowGraph.")
        self.agent_manager.update_agents_from_workflow(workflow_graph=graph, llm_config=self.llm.config, **kwargs)

    data = self._get_eval_data(benchmark=benchmark, eval_mode=eval_mode, indices=indices, sample_k=sample_k, seed=seed)
    results = self._evaluate_graph(graph=graph, data=data, benchmark=benchmark, verbose=verbose, **kwargs)
    return results

get_example_evaluation_record

get_example_evaluation_record(benchmark: Benchmark, example: Any) -> Optional[dict]

Get the evaluation record for a given example.

Source code in evoagentx/evaluators/evaluator.py
def get_example_evaluation_record(self, benchmark: Benchmark, example: Any) -> Optional[dict]:
    """
    Get the evaluation record for a given example.
    """
    example_id = benchmark.get_id(example=example)
    return self._evaluation_records.get(example_id, None)

get_evaluation_record_by_id

get_evaluation_record_by_id(benchmark: Benchmark, example_id: str, eval_mode: str = 'test') -> Optional[dict]

Get the evaluation record for a given example id.

Source code in evoagentx/evaluators/evaluator.py
def get_evaluation_record_by_id(self, benchmark: Benchmark, example_id: str, eval_mode: str = "test") -> Optional[dict]:
    """
    Get the evaluation record for a given example id.
    """
    example = benchmark.get_example_by_id(example_id=example_id, mode=eval_mode)
    return self.get_example_evaluation_record(benchmark=benchmark, example=example)

get_all_evaluation_records

get_all_evaluation_records() -> dict

Get all the evaluation records.

Source code in evoagentx/evaluators/evaluator.py
def get_all_evaluation_records(self) -> dict:
    """
    Get all the evaluation records.
    """
    return self._evaluation_records.copy()

async_evaluate async

async_evaluate(graph: Union[WorkFlowGraph, ActionGraph], benchmark: Benchmark, eval_mode: str = 'test', indices: Optional[List[int]] = None, sample_k: Optional[int] = None, seed: Optional[int] = None, verbose: Optional[bool] = None, **kwargs) -> dict

Asynchronously evaluate the performance of the workflow on the benchmark.

Parameters:

Name Type Description Default
graph WorkFlowGraph or ActionGraph

The workflow to evaluate.

required
benchmark Benchmark

The benchmark to evaluate the workflow on.

required
eval_mode str

which split of the benchmark to evaluate the workflow on. Choices: ["test", "dev", "train"].

'test'
indices List[int]

The indices of the data to evaluate the workflow on.

None
sample_k int

The number of data to evaluate the workflow on. If provided, a random sample of size sample_k will be used.

None
verbose bool

Whether to print the evaluation progress. If not provided, the self.verbose will be used.

None

Returns:

Name Type Description
dict dict

The average metrics of the workflow evaluation.

Source code in evoagentx/evaluators/evaluator.py
async def async_evaluate(
    self, 
    graph: Union[WorkFlowGraph, ActionGraph],
    benchmark: Benchmark, 
    eval_mode: str = "test", 
    indices: Optional[List[int]] = None, 
    sample_k: Optional[int] = None, 
    seed: Optional[int] = None, 
    verbose: Optional[bool] = None,
    **kwargs
) -> dict:
    """
    Asynchronously evaluate the performance of the workflow on the benchmark.

    Args:
        graph (WorkFlowGraph or ActionGraph): The workflow to evaluate.
        benchmark (Benchmark): The benchmark to evaluate the workflow on.
        eval_mode (str): which split of the benchmark to evaluate the workflow on. Choices: ["test", "dev", "train"].
        indices (List[int], optional): The indices of the data to evaluate the workflow on.
        sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used.
        verbose (bool, optional): Whether to print the evaluation progress. If not provided, the `self.verbose` will be used.

    Returns:
        dict: The average metrics of the workflow evaluation.
    """
    # clear the evaluation records
    self._evaluation_records.clear()
    data = self._get_eval_data(benchmark=benchmark, eval_mode=eval_mode, indices=indices, sample_k=sample_k, seed=seed)

    if not data:
        logger.warning("No data to evaluate. Return an empty dictionary.")
        return {}

    verbose = verbose if verbose is not None else self.verbose

    # Create a semaphore to limit concurrent executions
    sem = asyncio.Semaphore(self.num_workers)

    async def process_with_semaphore(example):
        async with sem:
            try:
                return await self._async_evaluate_single_example(
                    graph=graph, 
                    example=example, 
                    benchmark=benchmark, 
                    **kwargs
                )
            except Exception as e:
                logger.warning(f"Async evaluation failed for example with semaphore: {str(e)}")
                return None

    # Create tasks for concurrent execution with semaphore
    tasks = [process_with_semaphore(example) for example in data]

    # Execute all tasks with progress bar if verbose
    if verbose:
        results = await tqdm_asyncio.gather(
            *tasks,
            desc=f"Evaluating {benchmark.name}",
            total=len(data)
        )
    else:
        results = await asyncio.gather(*tasks)

    return self._calculate_average_score(results)