StorageHandler 文档¶
概述¶
StorageHandler
类是一个为管理多种存储后端而设计的关键组件,用于存储和检索各类数据,如代理配置、工作流、记忆条目和索引数据。它提供了一个统一的接口,与不同类型的存储系统交互,包括关系型数据库(如 SQLite)、向量数据库(如 FAISS)和图数据库(如 Neo4j)。该类利用Pydantic库进行配置验证,以及工厂模式初始化存储后端。
StorageHandler
与 RAGEngine
类紧密集成,支持检索增强生成(RAG)功能,通过管理索引文档、嵌入向量及其相关元数据的存储来实现。它抽象了与不同存储系统交互的复杂性,确保了长期记忆管理和 RAG 流水线等应用的数据操作无缝进行。
类结构¶
StorageHandler
类继承自 BaseModule
,使用 Pydantic 的 Field
进行配置和类型验证。它支持三种存储后端:
- 数据库存储(storageDB
):管理关系型数据库操作,如 SQLite,用于结构化数据存储。
- 向量存储(vector_store
):处理用于语义搜索的向量嵌入,支持 FAISS等。
- 图存储(graph_store
):管理基于图的数据,如 Neo4j,用于关系型或网络化数据结构。
关键属性¶
storageConfig: StoreConfig
:所有存储后端的配置对象,在storages_config.py
中定义,包含数据库、向量和图存储的设置。storageDB: Optional[Union[DBStoreBase, Any]]
:数据库存储后端的实例,通过DBStoreFactory
初始化。vector_store: Optional[Union[VectorStoreBase, Any]]
:向量存储后端的实例,通过VectorStoreFactory
初始化。graph_store: Optional[Union[GraphStoreBase, Any]]
:图存储后端的实例,通过GraphStoreFactory
初始化。
依赖项¶
- Pydantic:用于配置验证和类型检查。
- 工厂模式:
DBStoreFactory
、VectorStoreFactory
和GraphStoreFactory
用于创建存储后端实例。 - 配置:
storages_config.py
中的StoreConfig
、DBConfig
、VectorStoreConfig
和GraphStoreConfig
用于定义存储设置。 - 模式:
TableType
、AgentStore
、WorkflowStore
、MemoryStore
、HistoryStore
和IndexStore
用于数据验证和结构化。
关键方法¶
初始化¶
init_module(self)
- 根据提供的
storageConfig
初始化所有存储后端。 -
如果指定了存储路径,则创建存储目录,并通过调用各自的初始化方法来初始化数据库、向量和图存储。
-
_init_db_store(self)
- 使用
DBStoreFactory
和storageConfig
中的dbConfig
初始化数据库存储后端。 -
设置
storageDB
属性。 -
_init_vector_store(self)
- 如果提供了
vectorConfig
,则使用VectorStoreFactory
初始化向量存储后端。 -
设置
vector_store
属性。 -
_init_graph_store(self)
- 如果提供了
graphConfig
,则使用GraphStoreFactory
初始化图存储后端。 - 设置
graph_store
属性。
数据操作¶
load(self, tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]
- 从数据库存储中加载指定表或
TableType
中定义的所有表的数据。 - 返回一个字典,键为表名,值为记录列表。
-
每条记录是一个将列名映射到值的字典,JSON 字段需要手动解析。
-
save(self, data: Dict[str, Any], *args, **kwargs)
- 将数据保存到数据库存储中。
- 接受一个字典,键为表名,值为要保存的记录列表。
-
验证表名是否符合
TableType
,并使用storageDB.insert
插入记录。 -
parse_result(self, results: Dict[str, str], store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any]
- 解析原始数据库结果,根据提供的 Pydantic 模型(
store
)将 JSON 字符串反序列化为 Python 对象。 - 返回解析后的结果字典,适当处理非字符串字段。
实体特定操作¶
load_memory(self, memory_id: str, table: Optional[str]=None, **kwargs) -> Dict[str, Any]
- 用于加载单个长期记忆条目的占位方法,通过
memory_id
检索。 -
如果未指定表名,默认使用
memory
表。 -
save_memory(self, memory_data: Dict[str, Any], table: Optional[str]=None, **kwargs)
- 用于保存或更新单个记忆条目的占位方法。
-
如果未指定表名,默认使用
memory
表。 -
load_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]
- 通过
agent_name
从数据库加载单个代理的数据。 - 如果未指定表名,默认使用
agent
表。 - 使用
parse_result
和AgentStore
进行结果解析和验证。 -
如果未找到代理,返回
None
。 -
remove_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs)
- 从指定表(默认
agent
表)中删除指定agent_name
的代理。 -
如果代理不存在,抛出
ValueError
。 -
save_agent(self, agent_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)
- 在数据库中保存或更新单个代理的数据。
- 要求
agent_data
包含name
字段。 -
使用
storageDB.update
更新现有记录,或使用storageDB.insert
插入新记录。 -
load_workflow(self, workflow_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]
- 通过
workflow_id
从数据库加载单个工作流的数据。 - 如果未指定表名,默认使用
workflow
表。 - 使用
parse_result
和WorkflowStore
进行结果解析和验证。 -
如果未找到工作流,返回
None
。 -
save_workflow(self, workflow_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)
- 在数据库中保存或更新单个工作流的数据。
- 要求
workflow_data
包含name
字段。 -
使用
storageDB.update
更新现有记录,或使用storageDB.insert
插入新记录。 -
load_history(self, memory_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]
- 通过
memory_id
从数据库加载单个历史条目。 - 如果未指定表名,默认使用
history
表。 - 使用
parse_result
和HistoryStore
进行结果解析和验证。 -
如果未找到历史条目,返回
None
。 -
save_history(self, history_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)
- 在数据库中保存或更新单个历史条目。
- 要求
history_data
包含memory_id
字段。 -
更新现有记录时保留
old_memory
,或插入新记录。 -
load_index(self, corpus_id: str, table: Optional[str]=None) -> Optional[Dict[str, Any]]
- 通过
corpus_id
从数据库加载索引数据。 - 使用
parse_result
和IndexStore
进行结果解析和验证。 -
如果未找到索引,返回
None
。 -
save_index(self, index_data: Dict[str, Any], table: Optional[str]=None)
- 在数据库中保存或更新索引数据。
- 要求
index_data
包含corpus_id
字段。 - 使用
storageDB.update
更新现有记录,或使用storageDB.insert
插入新记录。
与 RAGEngine 的集成¶
StorageHandler
与 RAGEngine
类紧密集成,支持 RAG 功能,主要用于:
- 初始化向量存储:RAGEngine
构造函数检查向量存储的维度是否与嵌入模型的维度一致,如不一致则重新初始化向量存储。
- 保存索引:RAGEngine
的 save
方法使用 StorageHandler.save_index
将索引数据(例如语料库分块和元数据)持久化到数据库,当未指定文件输出路径时。
- 加载索引:RAGEngine
的 load
方法使用 StorageHandler.load
和 StorageHandler.parse_result
从数据库记录中重建索引,确保与嵌入模型和维度的兼容性。
配置¶
StorageHandler
依赖 storages_config.py
中定义的 StoreConfig
类来配置其后端:
- DBConfig
:配置关系型数据库(如 SQLite),包含 db_name
、path
、ip
和 port
等设置。
- VectorStoreConfig
:配置向量数据库(如 FAISS、Qdrant),包含 vector_name
、dimensions
、index_type
、qdrant_url
和 qdrant_collection_name
等设置。
- GraphStoreConfig
:配置图数据库(如 Neo4j),包含 graph_name
、uri
、username
、password
和 database
等设置。
配置通过 Pydantic 进行验证,确保类型检查和默认值的稳健性。
使用示例¶
以下是如何初始化和使用 StorageHandler
的示例:
from evoagentx.storages.base import StorageHandler
from evoagentx.storages.storages_config import StoreConfig, DBConfig, VectorStoreConfig
# 定义配置
config = StoreConfig(
dbConfig=DBConfig(db_name="sqlite", path="data/storage.db"),
vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1536),
path="data/index_cache"
)
# 初始化 StorageHandler
storage_handler = StorageHandler(storageConfig=config)
storage_handler.init_module()
# 保存代理数据
agent_data = {"name": "agent1", "content": {"role": "analyst", "tasks": ["data analysis"]}}
storage_handler.save_agent(agent_data)
# 加载代理数据
agent = storage_handler.load_agent("agent1")
print(agent) # {'name': 'agent1', 'content': {'role': 'analyst', 'tasks': ['data analysis']}}
# 保存索引数据(在 RAGEngine 中使用)
index_data = {
"corpus_id": "corpus1",
"content": {"chunks": [{"chunk_id": "c1", "text": "样本文本", "metadata": {}}]},
"metadata": {"index_type": "VECTOR", "dimension": 1536}
}
storage_handler.save_index(index_data)
# 加载索引数据
index = storage_handler.load_index("corpus1")
print(index) # {'corpus_id': 'corpus1', 'content': {...}, 'metadata': {...}}
注意事项¶
load_memory
和save_memory
方法还未完全实现,后续将在与LongTermMemory一同实现。StorageHandler
假设数据库模式由DBStoreBase
及其工厂管理,确保与TableType
枚举的兼容性。- 与
RAGEngine
一起使用时,确保向量存储的维度与嵌入模型的维度一致,以避免重新初始化问题。 - 错误处理贯穿始终,通过
evoagentx.core.logging.logger
模块生成日志。
结论¶
StorageHandler
类提供了一个灵活且可扩展的接口,以统一的方式管理多种存储后端。其与 RAGEngine
的集成使其成为 RAG 流水线的关键组件,支持索引数据的有效存储和检索。通过利用工厂模式和 Pydantic 验证,它确保了稳健性和可扩展性,适用于需要复杂数据管理的应用。