跳转至

StorageHandler 文档

概述

StorageHandler 类是一个为管理多种存储后端而设计的关键组件,用于存储和检索各类数据,如代理配置、工作流、记忆条目和索引数据。它提供了一个统一的接口,与不同类型的存储系统交互,包括关系型数据库(如 SQLite)、向量数据库(如 FAISS)和图数据库(如 Neo4j)。该类利用Pydantic库进行配置验证,以及工厂模式初始化存储后端。

StorageHandlerRAGEngine 类紧密集成,支持检索增强生成(RAG)功能,通过管理索引文档、嵌入向量及其相关元数据的存储来实现。它抽象了与不同存储系统交互的复杂性,确保了长期记忆管理和 RAG 流水线等应用的数据操作无缝进行。

类结构

StorageHandler 类继承自 BaseModule,使用 Pydantic 的 Field 进行配置和类型验证。它支持三种存储后端: - 数据库存储storageDB):管理关系型数据库操作,如 SQLite,用于结构化数据存储。 - 向量存储vector_store):处理用于语义搜索的向量嵌入,支持 FAISS等。 - 图存储graph_store):管理基于图的数据,如 Neo4j,用于关系型或网络化数据结构。

关键属性

  • storageConfig: StoreConfig:所有存储后端的配置对象,在 storages_config.py 中定义,包含数据库、向量和图存储的设置。
  • storageDB: Optional[Union[DBStoreBase, Any]]:数据库存储后端的实例,通过 DBStoreFactory 初始化。
  • vector_store: Optional[Union[VectorStoreBase, Any]]:向量存储后端的实例,通过 VectorStoreFactory 初始化。
  • graph_store: Optional[Union[GraphStoreBase, Any]]:图存储后端的实例,通过 GraphStoreFactory 初始化。

依赖项

  • Pydantic:用于配置验证和类型检查。
  • 工厂模式DBStoreFactoryVectorStoreFactoryGraphStoreFactory 用于创建存储后端实例。
  • 配置storages_config.py 中的 StoreConfigDBConfigVectorStoreConfigGraphStoreConfig 用于定义存储设置。
  • 模式TableTypeAgentStoreWorkflowStoreMemoryStoreHistoryStoreIndexStore 用于数据验证和结构化。

关键方法

初始化

  • init_module(self)
  • 根据提供的 storageConfig 初始化所有存储后端。
  • 如果指定了存储路径,则创建存储目录,并通过调用各自的初始化方法来初始化数据库、向量和图存储。

  • _init_db_store(self)

  • 使用 DBStoreFactorystorageConfig 中的 dbConfig 初始化数据库存储后端。
  • 设置 storageDB 属性。

  • _init_vector_store(self)

  • 如果提供了 vectorConfig,则使用 VectorStoreFactory 初始化向量存储后端。
  • 设置 vector_store 属性。

  • _init_graph_store(self)

  • 如果提供了 graphConfig,则使用 GraphStoreFactory 初始化图存储后端。
  • 设置 graph_store 属性。

数据操作

  • load(self, tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]
  • 从数据库存储中加载指定表或 TableType 中定义的所有表的数据。
  • 返回一个字典,键为表名,值为记录列表。
  • 每条记录是一个将列名映射到值的字典,JSON 字段需要手动解析。

  • save(self, data: Dict[str, Any], *args, **kwargs)

  • 将数据保存到数据库存储中。
  • 接受一个字典,键为表名,值为要保存的记录列表。
  • 验证表名是否符合 TableType,并使用 storageDB.insert 插入记录。

  • parse_result(self, results: Dict[str, str], store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any]

  • 解析原始数据库结果,根据提供的 Pydantic 模型(store)将 JSON 字符串反序列化为 Python 对象。
  • 返回解析后的结果字典,适当处理非字符串字段。

实体特定操作

  • load_memory(self, memory_id: str, table: Optional[str]=None, **kwargs) -> Dict[str, Any]
  • 用于加载单个长期记忆条目的占位方法,通过 memory_id 检索。
  • 如果未指定表名,默认使用 memory 表。

  • save_memory(self, memory_data: Dict[str, Any], table: Optional[str]=None, **kwargs)

  • 用于保存或更新单个记忆条目的占位方法。
  • 如果未指定表名,默认使用 memory 表。

  • load_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]

  • 通过 agent_name 从数据库加载单个代理的数据。
  • 如果未指定表名,默认使用 agent 表。
  • 使用 parse_resultAgentStore 进行结果解析和验证。
  • 如果未找到代理,返回 None

  • remove_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs)

  • 从指定表(默认 agent 表)中删除指定 agent_name 的代理。
  • 如果代理不存在,抛出 ValueError

  • save_agent(self, agent_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)

  • 在数据库中保存或更新单个代理的数据。
  • 要求 agent_data 包含 name 字段。
  • 使用 storageDB.update 更新现有记录,或使用 storageDB.insert 插入新记录。

  • load_workflow(self, workflow_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]

  • 通过 workflow_id 从数据库加载单个工作流的数据。
  • 如果未指定表名,默认使用 workflow 表。
  • 使用 parse_resultWorkflowStore 进行结果解析和验证。
  • 如果未找到工作流,返回 None

  • save_workflow(self, workflow_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)

  • 在数据库中保存或更新单个工作流的数据。
  • 要求 workflow_data 包含 name 字段。
  • 使用 storageDB.update 更新现有记录,或使用 storageDB.insert 插入新记录。

  • load_history(self, memory_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]

  • 通过 memory_id 从数据库加载单个历史条目。
  • 如果未指定表名,默认使用 history 表。
  • 使用 parse_resultHistoryStore 进行结果解析和验证。
  • 如果未找到历史条目,返回 None

  • save_history(self, history_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)

  • 在数据库中保存或更新单个历史条目。
  • 要求 history_data 包含 memory_id 字段。
  • 更新现有记录时保留 old_memory,或插入新记录。

  • load_index(self, corpus_id: str, table: Optional[str]=None) -> Optional[Dict[str, Any]]

  • 通过 corpus_id 从数据库加载索引数据。
  • 使用 parse_resultIndexStore 进行结果解析和验证。
  • 如果未找到索引,返回 None

  • save_index(self, index_data: Dict[str, Any], table: Optional[str]=None)

  • 在数据库中保存或更新索引数据。
  • 要求 index_data 包含 corpus_id 字段。
  • 使用 storageDB.update 更新现有记录,或使用 storageDB.insert 插入新记录。

与 RAGEngine 的集成

StorageHandlerRAGEngine 类紧密集成,支持 RAG 功能,主要用于: - 初始化向量存储RAGEngine 构造函数检查向量存储的维度是否与嵌入模型的维度一致,如不一致则重新初始化向量存储。 - 保存索引RAGEnginesave 方法使用 StorageHandler.save_index 将索引数据(例如语料库分块和元数据)持久化到数据库,当未指定文件输出路径时。 - 加载索引RAGEngineload 方法使用 StorageHandler.loadStorageHandler.parse_result 从数据库记录中重建索引,确保与嵌入模型和维度的兼容性。

配置

StorageHandler 依赖 storages_config.py 中定义的 StoreConfig 类来配置其后端: - DBConfig:配置关系型数据库(如 SQLite),包含 db_namepathipport 等设置。 - VectorStoreConfig:配置向量数据库(如 FAISS、Qdrant),包含 vector_namedimensionsindex_typeqdrant_urlqdrant_collection_name 等设置。 - GraphStoreConfig:配置图数据库(如 Neo4j),包含 graph_nameuriusernamepassworddatabase 等设置。

配置通过 Pydantic 进行验证,确保类型检查和默认值的稳健性。

使用示例

以下是如何初始化和使用 StorageHandler 的示例:

from evoagentx.storages.base import StorageHandler
from evoagentx.storages.storages_config import StoreConfig, DBConfig, VectorStoreConfig

# 定义配置
config = StoreConfig(
    dbConfig=DBConfig(db_name="sqlite", path="data/storage.db"),
    vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1536),
    path="data/index_cache"
)

# 初始化 StorageHandler
storage_handler = StorageHandler(storageConfig=config)
storage_handler.init_module()

# 保存代理数据
agent_data = {"name": "agent1", "content": {"role": "analyst", "tasks": ["data analysis"]}}
storage_handler.save_agent(agent_data)

# 加载代理数据
agent = storage_handler.load_agent("agent1")
print(agent)  # {'name': 'agent1', 'content': {'role': 'analyst', 'tasks': ['data analysis']}}

# 保存索引数据(在 RAGEngine 中使用)
index_data = {
    "corpus_id": "corpus1",
    "content": {"chunks": [{"chunk_id": "c1", "text": "样本文本", "metadata": {}}]},
    "metadata": {"index_type": "VECTOR", "dimension": 1536}
}
storage_handler.save_index(index_data)

# 加载索引数据
index = storage_handler.load_index("corpus1")
print(index)  # {'corpus_id': 'corpus1', 'content': {...}, 'metadata': {...}}

注意事项

  • load_memorysave_memory 方法还未完全实现,后续将在与LongTermMemory一同实现。
  • StorageHandler 假设数据库模式由 DBStoreBase 及其工厂管理,确保与 TableType 枚举的兼容性。
  • RAGEngine 一起使用时,确保向量存储的维度与嵌入模型的维度一致,以避免重新初始化问题。
  • 错误处理贯穿始终,通过 evoagentx.core.logging.logger 模块生成日志。

结论

StorageHandler 类提供了一个灵活且可扩展的接口,以统一的方式管理多种存储后端。其与 RAGEngine 的集成使其成为 RAG 流水线的关键组件,支持索引数据的有效存储和检索。通过利用工厂模式和 Pydantic 验证,它确保了稳健性和可扩展性,适用于需要复杂数据管理的应用。