StorageHandler 文档¶
概述¶
StorageHandler 类是一个为管理多种存储后端而设计的关键组件,用于存储和检索各类数据,如代理配置、工作流、记忆条目和索引数据。它提供了一个统一的接口,与不同类型的存储系统交互,包括关系型数据库(如 SQLite)、向量数据库(如 FAISS)和图数据库(如 Neo4j)。该类利用Pydantic库进行配置验证,以及工厂模式初始化存储后端。
StorageHandler 与 RAGEngine 类紧密集成,支持检索增强生成(RAG)功能,通过管理索引文档、嵌入向量及其相关元数据的存储来实现。它抽象了与不同存储系统交互的复杂性,确保了长期记忆管理和 RAG 流水线等应用的数据操作无缝进行。
类结构¶
StorageHandler 类继承自 BaseModule,使用 Pydantic 的 Field 进行配置和类型验证。它支持三种存储后端:
- 数据库存储(storageDB):管理关系型数据库操作,如 SQLite,用于结构化数据存储。
- 向量存储(vector_store):处理用于语义搜索的向量嵌入,支持 FAISS等。
- 图存储(graph_store):管理基于图的数据,如 Neo4j,用于关系型或网络化数据结构。
关键属性¶
storageConfig: StoreConfig:所有存储后端的配置对象,在storages_config.py中定义,包含数据库、向量和图存储的设置。storageDB: Optional[Union[DBStoreBase, Any]]:数据库存储后端的实例,通过DBStoreFactory初始化。vector_store: Optional[Union[VectorStoreBase, Any]]:向量存储后端的实例,通过VectorStoreFactory初始化。graph_store: Optional[Union[GraphStoreBase, Any]]:图存储后端的实例,通过GraphStoreFactory初始化。
依赖项¶
- Pydantic:用于配置验证和类型检查。
- 工厂模式:
DBStoreFactory、VectorStoreFactory和GraphStoreFactory用于创建存储后端实例。 - 配置:
storages_config.py中的StoreConfig、DBConfig、VectorStoreConfig和GraphStoreConfig用于定义存储设置。 - 模式:
TableType、AgentStore、WorkflowStore、MemoryStore、HistoryStore和IndexStore用于数据验证和结构化。
关键方法¶
初始化¶
init_module(self)- 根据提供的
storageConfig初始化所有存储后端。 -
如果指定了存储路径,则创建存储目录,并通过调用各自的初始化方法来初始化数据库、向量和图存储。
-
_init_db_store(self) - 使用
DBStoreFactory和storageConfig中的dbConfig初始化数据库存储后端。 -
设置
storageDB属性。 -
_init_vector_store(self) - 如果提供了
vectorConfig,则使用VectorStoreFactory初始化向量存储后端。 -
设置
vector_store属性。 -
_init_graph_store(self) - 如果提供了
graphConfig,则使用GraphStoreFactory初始化图存储后端。 - 设置
graph_store属性。
数据操作¶
load(self, tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]- 从数据库存储中加载指定表或
TableType中定义的所有表的数据。 - 返回一个字典,键为表名,值为记录列表。
-
每条记录是一个将列名映射到值的字典,JSON 字段需要手动解析。
-
save(self, data: Dict[str, Any], *args, **kwargs) - 将数据保存到数据库存储中。
- 接受一个字典,键为表名,值为要保存的记录列表。
-
验证表名是否符合
TableType,并使用storageDB.insert插入记录。 -
parse_result(self, results: Dict[str, str], store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any] - 解析原始数据库结果,根据提供的 Pydantic 模型(
store)将 JSON 字符串反序列化为 Python 对象。 - 返回解析后的结果字典,适当处理非字符串字段。
实体特定操作¶
load_memory(self, memory_id: str, table: Optional[str]=None, **kwargs) -> Dict[str, Any]- 用于加载单个长期记忆条目的占位方法,通过
memory_id检索。 -
如果未指定表名,默认使用
memory表。 -
save_memory(self, memory_data: Dict[str, Any], table: Optional[str]=None, **kwargs) - 用于保存或更新单个记忆条目的占位方法。
-
如果未指定表名,默认使用
memory表。 -
load_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any] - 通过
agent_name从数据库加载单个代理的数据。 - 如果未指定表名,默认使用
agent表。 - 使用
parse_result和AgentStore进行结果解析和验证。 -
如果未找到代理,返回
None。 -
remove_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) - 从指定表(默认
agent表)中删除指定agent_name的代理。 -
如果代理不存在,抛出
ValueError。 -
save_agent(self, agent_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs) - 在数据库中保存或更新单个代理的数据。
- 要求
agent_data包含name字段。 -
使用
storageDB.update更新现有记录,或使用storageDB.insert插入新记录。 -
load_workflow(self, workflow_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any] - 通过
workflow_id从数据库加载单个工作流的数据。 - 如果未指定表名,默认使用
workflow表。 - 使用
parse_result和WorkflowStore进行结果解析和验证。 -
如果未找到工作流,返回
None。 -
save_workflow(self, workflow_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs) - 在数据库中保存或更新单个工作流的数据。
- 要求
workflow_data包含name字段。 -
使用
storageDB.update更新现有记录,或使用storageDB.insert插入新记录。 -
load_history(self, memory_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any] - 通过
memory_id从数据库加载单个历史条目。 - 如果未指定表名,默认使用
history表。 - 使用
parse_result和HistoryStore进行结果解析和验证。 -
如果未找到历史条目,返回
None。 -
save_history(self, history_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs) - 在数据库中保存或更新单个历史条目。
- 要求
history_data包含memory_id字段。 -
更新现有记录时保留
old_memory,或插入新记录。 -
load_index(self, corpus_id: str, table: Optional[str]=None) -> Optional[Dict[str, Any]] - 通过
corpus_id从数据库加载索引数据。 - 使用
parse_result和IndexStore进行结果解析和验证。 -
如果未找到索引,返回
None。 -
save_index(self, index_data: Dict[str, Any], table: Optional[str]=None) - 在数据库中保存或更新索引数据。
- 要求
index_data包含corpus_id字段。 - 使用
storageDB.update更新现有记录,或使用storageDB.insert插入新记录。
与 RAGEngine 的集成¶
StorageHandler 与 RAGEngine 类紧密集成,支持 RAG 功能,主要用于:
- 初始化向量存储:RAGEngine 构造函数检查向量存储的维度是否与嵌入模型的维度一致,如不一致则重新初始化向量存储。
- 保存索引:RAGEngine 的 save 方法使用 StorageHandler.save_index 将索引数据(例如语料库分块和元数据)持久化到数据库,当未指定文件输出路径时。
- 加载索引:RAGEngine 的 load 方法使用 StorageHandler.load 和 StorageHandler.parse_result 从数据库记录中重建索引,确保与嵌入模型和维度的兼容性。
配置¶
StorageHandler 依赖 storages_config.py 中定义的 StoreConfig 类来配置其后端:
- DBConfig:配置关系型数据库(如 SQLite),包含 db_name、path、ip 和 port 等设置。
- VectorStoreConfig:配置向量数据库(如 FAISS、Qdrant),包含 vector_name、dimensions、index_type、qdrant_url 和 qdrant_collection_name 等设置。
- GraphStoreConfig:配置图数据库(如 Neo4j),包含 graph_name、uri、username、password 和 database 等设置。
配置通过 Pydantic 进行验证,确保类型检查和默认值的稳健性。
使用示例¶
以下是如何初始化和使用 StorageHandler 的示例:
from evoagentx.storages.base import StorageHandler
from evoagentx.storages.storages_config import StoreConfig, DBConfig, VectorStoreConfig
# 定义配置
config = StoreConfig(
dbConfig=DBConfig(db_name="sqlite", path="data/storage.db"),
vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1536),
path="data/index_cache"
)
# 初始化 StorageHandler
storage_handler = StorageHandler(storageConfig=config)
storage_handler.init_module()
# 保存代理数据
agent_data = {"name": "agent1", "content": {"role": "analyst", "tasks": ["data analysis"]}}
storage_handler.save_agent(agent_data)
# 加载代理数据
agent = storage_handler.load_agent("agent1")
print(agent) # {'name': 'agent1', 'content': {'role': 'analyst', 'tasks': ['data analysis']}}
# 保存索引数据(在 RAGEngine 中使用)
index_data = {
"corpus_id": "corpus1",
"content": {"chunks": [{"chunk_id": "c1", "text": "样本文本", "metadata": {}}]},
"metadata": {"index_type": "VECTOR", "dimension": 1536}
}
storage_handler.save_index(index_data)
# 加载索引数据
index = storage_handler.load_index("corpus1")
print(index) # {'corpus_id': 'corpus1', 'content': {...}, 'metadata': {...}}
注意事项¶
load_memory和save_memory方法还未完全实现,后续将在与LongTermMemory一同实现。StorageHandler假设数据库模式由DBStoreBase及其工厂管理,确保与TableType枚举的兼容性。- 与
RAGEngine一起使用时,确保向量存储的维度与嵌入模型的维度一致,以避免重新初始化问题。 - 错误处理贯穿始终,通过
evoagentx.core.logging.logger模块生成日志。
结论¶
StorageHandler 类提供了一个灵活且可扩展的接口,以统一的方式管理多种存储后端。其与 RAGEngine 的集成使其成为 RAG 流水线的关键组件,支持索引数据的有效存储和检索。通过利用工厂模式和 Pydantic 验证,它确保了稳健性和可扩展性,适用于需要复杂数据管理的应用。