跳转至

💾 存储模块

evoagentx.storages

StorageHandler

StorageHandler(**kwargs)

Bases: BaseModule

Implementation of a storage handler for managing various storage backends.

StorageHandler provides an abstraction for reading and writing data (e.g., memory, agents, workflows). It supports multiple storage types, including database, vector, and graph storage, initialized via factories.

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

init_module

init_module()

Initialize all storage backends based on the provided configuration. Calls individual initialization methods for database, vector, and graph stores.

Source code in evoagentx/storages/base.py
def init_module(self):
    """
    Initialize all storage backends based on the provided configuration.
    Calls individual initialization methods for database, vector, and graph stores.
    """
    # Create the path
    if (self.storageConfig.path is not None) or (self.storageConfig.path != ":memory:") \
        or (not self.storageConfig.path):
        os.makedirs(os.path.dirname(self.storageConfig.path), exist_ok=True)

    self._init_db_store()
    self._init_vector_store()
    self._init_graph_store()

load

load(tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]

Load all data from the database storage.

Attributes:

Name Type Description
tables Optional[List[str]]

List of table names to load; if None, loads all tables.

Returns:

Type Description
Dict[str, Any]

Dict[str, Dict[str, str]]: A dictionary with table names as keys and lists of records as values. You should parse the values by yourself.

Source code in evoagentx/storages/base.py
def load(self, tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]:
    """
    Load all data from the database storage.

    Attributes:
        tables (Optional[List[str]]): List of table names to load; if None, loads all tables.

    Returns:
        Dict[str, Dict[str, str]]: A dictionary with table names as keys and lists of records as values. You should parse the values by yourself.
    """
    result = {}
    table_info = self.storageDB.col_info()

    if tables is None:
        tables_to_load = [t.value for t in TableType]
    else:
        tables_to_load = tables

    # Load data for each table
    for table_name in tables_to_load:
        table_data = []
        # Check if the table exists
        if any(t["table_name"] == table_name for t in table_info):
            cursor = self.storageDB.connection.cursor()
            cursor.execute(f"SELECT * FROM {table_name}")
            # Get column names from the columns dictionary
            columns = next(t["columns"].keys() for t in table_info if t["table_name"] == table_name)
            rows = cursor.fetchall()
            table_data = [dict(zip(columns, row)) for row in rows]
        result[table_name] = table_data

    return result

save

save(data: Dict[str, Any], *args, **kwargs)

Save all provided data to the database storage.

Attributes:

Name Type Description
data Dict[str, Any]

Dictionary with table names as keys and lists of records to save.

Raises:

Type Description
ValueError

If an unknown table name is provided.

Source code in evoagentx/storages/base.py
def save(self, data: Dict[str, Any], *args, **kwargs):
    """
    Save all provided data to the database storage.

    Attributes:
        data (Dict[str, Any]): Dictionary with table names as keys and lists of records to save.

    Raises:
        ValueError: If an unknown table name is provided.
    """
    for table_name, records in data.items():
        store_type = None
        # Map table name to store_type
        for st in TableType:
            if st.value == table_name:
                store_type = st
                break
        if store_type is None:
            raise ValueError(f"Unknown table: {table_name}")
        # Insert each record
        for record in records:
            self.storageDB.insert(metadata=record, store_type=store_type, table=table_name)

parse_result

parse_result(results: Dict[str, str], store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any]

Parse database results, converting JSON strings to Python objects where applicable.

Attributes:

Name Type Description
results Dict[str, str]

Raw database results with column names as keys.

store Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]

Pydantic model for validation.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Parsed results with JSON strings deserialized to Python objects.

Source code in evoagentx/storages/base.py
def parse_result(self, results: Dict[str, str], 
                 store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any]:
    """
    Parse database results, converting JSON strings to Python objects where applicable.

    Attributes:
        results (Dict[str, str]): Raw database results with column names as keys.
        store (Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]): Pydantic model for validation.

    Returns:
        Dict[str, Any]: Parsed results with JSON strings deserialized to Python objects.
    """
    for k, v in store.model_fields.items():
        if v.annotation not in [Optional[str], str]:
            try:
                results[k] = json.loads(results[k])
            except (json.JSONDecodeError, KeyError, TypeError):
                results[k] = results.get(k)
    return results

load_memory

load_memory(memory_id: str, table: Optional[str] = None, **kwargs) -> Dict[str, Any]

Load a single long-term memory data.

Attributes:

Name Type Description
memory_id str

The ID of the long-term memory.

table Optional[str]

The table name; defaults to 'memory' if None.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The data that can be used to create a LongTermMemory instance.

Source code in evoagentx/storages/base.py
def load_memory(self, memory_id: str, table: Optional[str]=None, **kwargs) -> Dict[str, Any]:
    """
    Load a single long-term memory data.

    Attributes:
        memory_id (str): The ID of the long-term memory.
        table (Optional[str]): The table name; defaults to 'memory' if None.

    Returns:
        Dict[str, Any]: The data that can be used to create a LongTermMemory instance.
    """
    pass

save_memory

save_memory(memory_data: Dict[str, Any], table: Optional[str] = None, **kwargs)

Save or update a single memory.

Attributes:

Name Type Description
memory_data Dict[str, Any]

The long-term memory's data.

table Optional[str]

The table name; defaults to 'memory' if None.

Source code in evoagentx/storages/base.py
def save_memory(self, memory_data: Dict[str, Any], table: Optional[str]=None, **kwargs):
    """
    Save or update a single memory.

    Attributes:
        memory_data (Dict[str, Any]): The long-term memory's data.
        table (Optional[str]): The table name; defaults to 'memory' if None.

    """
    pass

load_agent

load_agent(agent_name: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]

Load a single agent's data.

Attributes:

Name Type Description
agent_name str

The unique name of the agent to retrieve.

table Optional[str]

The table name; defaults to 'agent' if None.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The data that can be used to create an Agent instance, or None if not found.

Source code in evoagentx/storages/base.py
def load_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]:
    """
    Load a single agent's data.

    Attributes:
        agent_name (str): The unique name of the agent to retrieve.
        table (Optional[str]): The table name; defaults to 'agent' if None.

    Returns:
        Dict[str, Any]: The data that can be used to create an Agent instance, or None if not found.
    """
    table = table or TableType.store_agent.value
    result = self.storageDB.get_by_id(agent_name, store_type="agent", table=table)
    # Parse the result to convert JSON strings to Python objects
    if result is not None:
        result = self.parse_result(result, AgentStore)
    return result

remove_agent

remove_agent(agent_name: str, table: Optional[str] = None, *args, **kwargs)

Remove an agent from storage if the agent exists.

Attributes:

Name Type Description
agent_name str

The name of the agent to be deleted.

table Optional[str]

The table name; defaults to 'agent' if None.

Raises:

Type Description
ValueError

If the agent does not exist in the specified table.

Source code in evoagentx/storages/base.py
def remove_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs):
    """
    Remove an agent from storage if the agent exists.

    Attributes:
        agent_name (str): The name of the agent to be deleted.
        table (Optional[str]): The table name; defaults to 'agent' if None.

    Raises:
        ValueError: If the agent does not exist in the specified table.
    """
    table = table or TableType.store_agent.value
    success = self.storageDB.delete(agent_name, store_type="agent", table=table)
    if not success:
        raise ValueError(f"Agent with name {agent_name} not found in table {table}")

save_agent

save_agent(agent_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs)

Save or update a single agent's data.

Attributes:

Name Type Description
agent_data Dict[str, Any]

The agent's data, must include 'name' and 'content' keys.

table Optional[str]

The table name; defaults to 'agent' if None.

Raises:

Type Description
ValueError

If 'name' field is missing or if Pydantic validation fails.

Source code in evoagentx/storages/base.py
def save_agent(self, agent_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs):
    """
    Save or update a single agent's data.

    Attributes:
        agent_data (Dict[str, Any]): The agent's data, must include 'name' and 'content' keys.
        table (Optional[str]): The table name; defaults to 'agent' if None.

    Raises:
        ValueError: If 'name' field is missing or if Pydantic validation fails.
    """
    table = table or TableType.store_agent.value
    agent_name = agent_data.get("name")
    if not agent_name:
        raise ValueError("Agent data must include a 'name' field")

    existing = self.storageDB.get_by_id(agent_name, store_type="agent", table=table)
    if existing:
        self.storageDB.update(agent_name, new_metadata=agent_data, store_type="agent", table=table)
    else:
        self.storageDB.insert(metadata=agent_data, store_type="agent", table=table)

load_workflow

load_workflow(workflow_id: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]

Load a single workflow's data.

Attributes:

Name Type Description
workflow_id str

The ID of the workflow.

table Optional[str]

The table name; defaults to 'workflow' if None.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The data that can be used to create a WorkFlow instance, or None if not found.

Source code in evoagentx/storages/base.py
def load_workflow(self, workflow_id: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]:
    """
    Load a single workflow's data.

    Attributes:
        workflow_id (str): The ID of the workflow.
        table (Optional[str]): The table name; defaults to 'workflow' if None.

    Returns:
        Dict[str, Any]: The data that can be used to create a WorkFlow instance, or None if not found.
    """
    table = table or TableType.store_workflow.value
    result = self.storageDB.get_by_id(workflow_id, store_type="workflow", table=table)
    # Parse the result to convert JSON strings to Python objects
    if result is not None:
        result = self.parse_result(result, WorkflowStore)
    return result

save_workflow

save_workflow(workflow_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs)

Save or update a workflow's data.

Attributes:

Name Type Description
workflow_data Dict[str, Any]

The workflow's data, must include 'name' field.

table Optional[str]

The table name; defaults to 'workflow' if None.

Raises:

Type Description
ValueError

If 'name' field is missing or if Pydantic validation fails.

Source code in evoagentx/storages/base.py
def save_workflow(self, workflow_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs):
    """
    Save or update a workflow's data.

    Attributes:
        workflow_data (Dict[str, Any]): The workflow's data, must include 'name' field.
        table (Optional[str]): The table name; defaults to 'workflow' if None.

    Raises:
        ValueError: If 'name' field is missing or if Pydantic validation fails.
    """
    table = table or TableType.store_workflow.value
    workflow_id = workflow_data.get("name")
    if not workflow_id:
        raise ValueError("Workflow data must include a 'name' field")
    # Check if workflow exists to decide between insert or update
    existing = self.storageDB.get_by_id(workflow_id, store_type="workflow", table=table)
    if existing:

        self.storageDB.update(workflow_id, new_metadata=workflow_data, store_type="workflow", table=table)
    else:
        self.storageDB.insert(metadata=workflow_data, store_type="workflow", table=table)

load_history

load_history(memory_id: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]

Load a single history entry.

Attributes:

Name Type Description
memory_id str

The ID of the memory associated with the history entry.

table Optional[str]

The table name; defaults to 'history' if None.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The history data, or None if not found.

Source code in evoagentx/storages/base.py
def load_history(self, memory_id: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]:
    """
    Load a single history entry.

    Attributes:
        memory_id (str): The ID of the memory associated with the history entry.
        table (Optional[str]): The table name; defaults to 'history' if None.

    Returns:
        Dict[str, Any]: The history data, or None if not found.
    """
    table = table or TableType.store_history.value
    result = self.storageDB.get_by_id(memory_id, store_type="history", table=table)
    # Parse the result to convert JSON strings to Python objects (if any)
    if result is not None:
        result = self.parse_result(result, HistoryStore)
    return result

save_history

save_history(history_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs)

Save or update a single history entry.

Attributes:

Name Type Description
history_data Dict[str, Any]

The history data, must include 'memory_id' field.

table Optional[str]

The table name; defaults to 'history' if None.

Raises:

Type Description
ValueError

If 'memory_id' field is missing or if Pydantic validation fails.

Source code in evoagentx/storages/base.py
def save_history(self, history_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs):
    """
    Save or update a single history entry.

    Attributes:
        history_data (Dict[str, Any]): The history data, must include 'memory_id' field.
        table (Optional[str]): The table name; defaults to 'history' if None.

    Raises:
        ValueError: If 'memory_id' field is missing or if Pydantic validation fails.
    """
    table = table or TableType.store_history.value
    memory_id = history_data.get("memory_id")
    if not memory_id:
        raise ValueError("History data must include a 'memory_id' field")
    # Check if history entry exists to decide between insert or update
    existing = self.storageDB.get_by_id(memory_id, store_type="history", table=table)
    if existing:
        # parse the history, then change the old_hisotry
        result = HistoryStore.model_validate(self.parse_result(existing, HistoryStore))
        history_data["old_memory"] = result.old_memory
        self.storageDB.update(memory_id, new_metadata=history_data, store_type="history", table=table)
    else:
        self.storageDB.insert(metadata=history_data, store_type="history", table=table)