StorageHandler Documentation¶
Overview¶
The StorageHandler
class is a critical component designed to manage multiple storage backends for storing and retrieving various types of data, such as agent configurations, workflows, memory entries, and index data. It provides a unified interface to interact with different storage systems, including relational databases (e.g., SQLite), vector databases (e.g., FAISS), and graph databases (e.g., Neo4j). The class leverages the Pydantic library for configuration validation and uses factory patterns to initialize storage backends.
The StorageHandler
is tightly integrated with the RAGEngine
class to support retrieval-augmented generation (RAG) functionality by managing the storage of indexed documents, embeddings, and associated metadata. It abstracts the complexity of interacting with different storage systems, ensuring seamless data operations for applications like long-term memory management and RAG pipelines.
Class Structure¶
The StorageHandler
class inherits from BaseModule
and uses Pydantic's Field
for configuration and type validation. It supports three types of storage backends:
- Database Storage (storageDB
): Manages relational database operations, such as SQLite, for structured data storage.
- Vector Storage (vector_store
): Handles vector embeddings for semantic search, supporting providers like FAISS.
- Graph Storage (graph_store
): Manages graph-based data, such as Neo4j, for relational or networked data structures.
Key Attributes¶
storageConfig: StoreConfig
: Configuration object for all storage backends, defined instorages_config.py
. It includes settings for database, vector, and graph stores.storageDB: Optional[Union[DBStoreBase, Any]]
: Instance of the database storage backend, initialized viaDBStoreFactory
.vector_store: Optional[Union[VectorStoreBase, Any]]
: Instance of the vector storage backend, initialized viaVectorStoreFactory
.graph_store: Optional[Union[GraphStoreBase, Any]]
: Instance of the graph storage backend, initialized viaGraphStoreFactory
.
Dependencies¶
- Pydantic: For configuration validation and type checking.
- Factory Patterns:
DBStoreFactory
,VectorStoreFactory
, andGraphStoreFactory
for creating storage backend instances. - Configuration:
StoreConfig
,DBConfig
,VectorStoreConfig
, andGraphStoreConfig
fromstorages_config.py
for defining storage settings. - Schema:
TableType
,AgentStore
,WorkflowStore
,MemoryStore
,HistoryStore
, andIndexStore
for data validation and structure.
Key Methods¶
Initialization¶
init_module(self)
- Initializes all storage backends based on the provided
storageConfig
. -
Creates the storage directory if specified and initializes database, vector, and graph stores by calling their respective initialization methods.
-
_init_db_store(self)
- Initializes the database storage backend using
DBStoreFactory
with thedbConfig
fromstorageConfig
. -
Sets the
storageDB
attribute. -
_init_vector_store(self)
- Initializes the vector storage backend using
VectorStoreFactory
ifvectorConfig
is provided. -
Sets the
vector_store
attribute. -
_init_graph_store(self)
- Initializes the graph storage backend using
GraphStoreFactory
ifgraphConfig
is provided. - Sets the
graph_store
attribute.
Data Operations¶
load(self, tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]
- Loads data from the database storage for specified tables or all tables defined in
TableType
. - Returns a dictionary with table names as keys and lists of records as values.
-
Each record is a dictionary mapping column names to values, requiring manual parsing for JSON fields.
-
save(self, data: Dict[str, Any], *args, **kwargs)
- Saves data to the database storage.
- Takes a dictionary with table names as keys and lists of records to save.
-
Validates table names against
TableType
and inserts records usingstorageDB.insert
. -
parse_result(self, results: Dict[str, str], store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any]
- Parses raw database results, deserializing JSON strings into Python objects based on the provided Pydantic model (
store
). - Returns a dictionary with parsed results, handling non-string fields appropriately.
Entity-Specific Operations¶
load_memory(self, memory_id: str, table: Optional[str]=None, **kwargs) -> Dict[str, Any]
- Placeholder method for loading a single long-term memory entry by
memory_id
. -
Defaults to the
memory
table if no table is specified. -
save_memory(self, memory_data: Dict[str, Any], table: Optional[str]=None, **kwargs)
- Placeholder method for saving or updating a single memory entry.
-
Defaults to the
memory
table if no table is specified. -
load_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]
- Loads a single agent's data by
agent_name
from the database. - Defaults to the
agent
table if no table is specified. - Parses the result using
parse_result
withAgentStore
for validation. -
Returns
None
if the agent is not found. -
remove_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs)
- Deletes an agent by
agent_name
from the specified table (defaults toagent
). -
Raises a
ValueError
if the agent does not exist. -
save_agent(self, agent_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)
- Saves or updates an agent's data in the database.
- Requires
agent_data
to include aname
field. -
Updates existing records or inserts new ones using
storageDB.update
orstorageDB.insert
. -
load_workflow(self, workflow_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]
- Loads a single workflow's data by
workflow_id
from the database. - Defaults to the
workflow
table if no table is specified. - Parses the result using
parse_result
withWorkflowStore
for validation. -
Returns
None
if the workflow is not found. -
save_workflow(self, workflow_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)
- Saves or updates a workflow's data in the database.
- Requires
workflow_data
to include aname
field. -
Updates existing records or inserts new ones using
storageDB.update
orstorageDB.insert
. -
load_history(self, memory_id: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]
- Loads a single history entry by
memory_id
from the database. - Defaults to the
history
table if no table is specified. - Parses the result using
parse_result
withHistoryStore
for validation. -
Returns
None
if the history entry is not found. -
save_history(self, history_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs)
- Saves or updates a single history entry in the database.
- Requires
history_data
to include amemory_id
field. -
Updates existing records with
old_memory
preserved or inserts new ones. -
load_index(self, corpus_id: str, table: Optional[str]=None) -> Optional[Dict[str, Any]]
- Loads index data by
corpus_id
from the database. - Parses the result using
parse_result
withIndexStore
for validation. -
Returns
None
if the index is not found. -
save_index(self, index_data: Dict[str, Any], table: Optional[str]=None)
- Saves or updates index data in the database.
- Requires
index_data
to include acorpus_id
field. - Updates existing records or inserts new ones using
storageDB.update
orstorageDB.insert
.
Integration with RAGEngine¶
The StorageHandler
is tightly integrated with the RAGEngine
class to support RAG functionality. It is used to:
- Initialize Vector Storage: The RAGEngine
constructor checks the vector store's dimensions against the embedding model's dimensions and reinitializes the vector store if necessary.
- Save Indices: The save
method in RAGEngine
uses StorageHandler.save_index
to persist index data (e.g., corpus chunks and metadata) to the database when no file output path is specified.
- Load Indices: The load
method in RAGEngine
uses StorageHandler.load
and StorageHandler.parse_result
to reconstruct indices from database records, ensuring compatibility with embedding models and dimensions.
Configuration¶
The StorageHandler
relies on the StoreConfig
class (defined in storages_config.py
) to configure its backends:
- DBConfig
: Configures relational databases (e.g., SQLite) with settings like db_name
, path
, ip
, and port
.
- VectorStoreConfig
: Configures vector databases (e.g., FAISS, Qdrant) with settings like vector_name
, dimensions
, index_type
, qdrant_url
, and qdrant_collection_name
.
- GraphStoreConfig
: Configures graph databases (e.g., Neo4j) with settings上午 like graph_name
, uri
, username
, password
, and database
.
The configuration is validated using Pydantic, ensuring robust type checking and default values.
Usage Example¶
Below is an example of how to initialize and use StorageHandler
:
from evoagentx.storages.base import StorageHandler
from evoagentx.storages.storages_config import StoreConfig, DBConfig, VectorStoreConfig
# Define configuration
config = StoreConfig(
dbConfig=DBConfig(db_name="sqlite", path="data/storage.db"),
vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1536),
path="data/index_cache"
)
# Initialize StorageHandler
storage_handler = StorageHandler(storageConfig=config)
storage_handler.init_module()
# Save agent data
agent_data = {"name": "agent1", "content": {"role": "analyst", "tasks": ["data analysis"]}}
storage_handler.save_agent(agent_data)
# Load agent data
agent = storage_handler.load_agent("agent1")
print(agent) # {'name': 'agent1', 'content': {'role': 'analyst', 'tasks': ['data analysis']}}
# Save index data (used in RAGEngine)
index_data = {
"corpus_id": "corpus1",
"content": {"chunks": [{"chunk_id": "c1", "text": "Sample text", "metadata": {}}]},
"metadata": {"index_type": "VECTOR", "dimension": 1536}
}
storage_handler.save_index(index_data)
# Load index data
index = storage_handler.load_index("corpus1")
print(index) # {'corpus_id': 'corpus1', 'content': {...}, 'metadata': {...}}
Notes¶
- The
load_memory
andsave_memory
methods are not yet fully implemented and will be developed alongsideLongTermMemory
. - The
StorageHandler
assumes the database schema is managed byDBStoreBase
and its factory, ensuring compatibility withTableType
enums. - When used with
RAGEngine
, ensure the vector store's dimensions match the embedding model's dimensions to avoid reinitialization issues. - Error handling is implemented throughout, with logs generated via the
evoagentx.core.logging.logger
module.
Conclusion¶
The StorageHandler
class provides a flexible and extensible interface for managing multiple storage backends in a unified manner. Its integration with RAGEngine
makes it a key component for RAG pipelines, enabling efficient storage and retrieval of indexed data. By leveraging factory patterns and Pydantic validation, it ensures robustness and scalability for applications requiring complex data management.