Skip to content

πŸ–‡οΈ Memory

evoagentx.memory

BaseMemory

BaseMemory(**kwargs)

Bases: BaseModule

Base class for memory implementations in the EvoAgentX framework.

BaseMemory provides core functionality for storing, retrieving, and filtering messages. It maintains a chronological list of messages while also providing indices for efficient retrieval by action or workflow goal.

Attributes:

Name Type Description
messages List[Message]

List of stored Message objects.

memory_id str

Unique identifier for this memory instance.

timestamp str

Creation timestamp of this memory instance.

capacity Optional[PositiveInt]

Maximum number of messages that can be stored, or None for unlimited.

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

size property

size: int

Returns the current number of messages in memory.

Returns:

Name Type Description
int int

Number of messages currently stored.

init_module

init_module()

Initialize memory indices.

Creates default dictionaries for indexing messages by action and workflow goal.

Source code in evoagentx/memory/memory.py
def init_module(self):
    """Initialize memory indices.

    Creates default dictionaries for indexing messages by action and workflow goal.
    """
    self._by_action = defaultdict(list)
    self._by_wf_goal = defaultdict(list)

clear

clear()

Clear all messages from memory.

Removes all messages and resets all indices.

Source code in evoagentx/memory/memory.py
def clear(self):
    """Clear all messages from memory.

    Removes all messages and resets all indices.
    """
    self.messages.clear()
    self._by_action.clear()
    self._by_wf_goal.clear()

remove_message

remove_message(message: Message)

Remove a single message from memory.

Removes the specified message from the main message list and all indices. If the message is not found in memory, no action is taken.

Parameters:

Name Type Description Default
message Message

The message to be removed. The message will be removed from self.messages, self._by_action, and self._by_wf_goal.

required
Source code in evoagentx/memory/memory.py
def remove_message(self, message: Message):
    """Remove a single message from memory.

    Removes the specified message from the main message list and all indices.
    If the message is not found in memory, no action is taken.

    Args:
        message: The message to be removed. The message will be removed from 
               self.messages, self._by_action, and self._by_wf_goal.
    """
    if not message:
        return
    if message not in self.messages:
        return
    safe_remove(self.messages, message)
    if self._by_action and not message.action:
        safe_remove(self._by_action[message.action], message)
    if self._by_wf_goal and not message.wf_goal:
        safe_remove(self._by_wf_goal[message.wf_goal], message)

add_message

add_message(message: Message)

Store a single message in memory.

Adds the message to the main list and relevant indices if it's not already stored.

Parameters:

Name Type Description Default
message Message

the message to be stored.

required
Source code in evoagentx/memory/memory.py
def add_message(self, message: Message):
    """Store a single message in memory.

    Adds the message to the main list and relevant indices if it's not already stored.

    Args:
        message (Message): the message to be stored. 
    """
    if not message:
        return
    if message in self.messages:
        return
    self.messages.append(message)
    if self._by_action and not message.action:
        self._by_action[message.action].append(message)
    if self._by_wf_goal and not message.wf_goal:
        self._by_wf_goal[message.wf_goal].append(message)

add_messages

add_messages(messages: Union[Message, List[Message]], **kwargs)

store (a) message(s) to the memory.

Parameters:

Name Type Description Default
messages Union[Message, List[Message]]

the input messages can be a single message or a list of message.

required
Source code in evoagentx/memory/memory.py
def add_messages(self, messages: Union[Message, List[Message]], **kwargs):
    """
    store (a) message(s) to the memory. 

    Args:
        messages (Union[Message, List[Message]]): the input messages can be a single message or a list of message.
    """
    if not isinstance(messages, list):
        messages = [messages]
    for message in messages:
        self.add_message(message)

get

get(n: int = None, **kwargs) -> List[Message]

Retrieve recent messages from memory.

Returns the most recent messages, up to the specified limit.

Parameters:

Name Type Description Default
n int

The maximum number of messages to return. If None, returns all messages.

None
**kwargs Any

Additional parameters (unused in base implementation).

{}

Returns:

Type Description
List[Message]

A list of Message objects, ordered from oldest to newest.

Raises:

Type Description
AssertionError

If n is negative.

Source code in evoagentx/memory/memory.py
def get(self, n: int=None, **kwargs) -> List[Message]:
    """Retrieve recent messages from memory.

    Returns the most recent messages, up to the specified limit.

    Args: 
        n: The maximum number of messages to return. If None, returns all messages.
        **kwargs (Any): Additional parameters (unused in base implementation).

    Returns:
        A list of Message objects, ordered from oldest to newest.

    Raises:
        AssertionError: If n is negative.
    """
    assert n is None or n>=0, "n must be None or a positive int"
    messages = self.messages if n is None else self.messages[-n:]
    return messages

get_by_type

get_by_type(data: Dict[str, list], key: str, n: int = None, **kwargs) -> List[Message]

Retrieve a list of Message objects from a given data dictionary data based on a specified type key.

This function looks up the value associated with key in the data dictionary, which should be a list of messages. It then returns a subset of these messages according to the specified parameters. If n is provided, it limits the number of messages returned; otherwise, it may return the entire list. Additional keyword arguments (**kwargs) can be used to further filter or process the resulting messages.

Parameters:

Name Type Description Default
data Dict[str, list]

A dictionary where keys are type strings and values are lists of messages.

required
key str

The key in data identifying the specific list of messages to retrieve.

required
n int

The maximum number of messages to return. If not provided, all messages under the given key may be returned.

None
**kwargs Any

Additional parameters for filtering or processing the messages.

{}

Returns:

Type Description
List[Message]

List[Message]: A list of messages corresponding to the given key, possibly filtered or truncated according to n and other provided keyword arguments.

Source code in evoagentx/memory/memory.py
def get_by_type(self, data: Dict[str, list], key: str, n: int = None, **kwargs) -> List[Message]:
    """
    Retrieve a list of Message objects from a given data dictionary `data` based on a specified type `key`.

    This function looks up the value associated with `key` in the `data` dictionary, which should be a list of messages. It then returns a subset of these messages according to the specified parameters.
    If `n` is provided, it limits the number of messages returned; otherwise, it may return the entire list. Additional keyword arguments (**kwargs) can be used to further filter or process the resulting messages.

    Args:
        data (Dict[str, list]): A dictionary where keys are type strings and values are lists of messages.
        key (str): The key in `data` identifying the specific list of messages to retrieve.
        n (int, optional): The maximum number of messages to return. If not provided, all messages under the given `key` may be returned.
        **kwargs (Any): Additional parameters for filtering or processing the messages.

    Returns:
        List[Message]: A list of messages corresponding to the given `key`, possibly filtered or truncated according to `n` and other provided keyword arguments.
    """
    if not data or key not in data:
        return []
    assert n is None or n>=0, "n must be None or a positive int"
    messages = data[key] if n is None else data[key][-n:]
    return messages

get_by_action

get_by_action(actions: Union[str, List[str]], n: int = None, **kwargs) -> List[Message]

return messages triggered by actions in the memory.

Parameters:

Name Type Description Default
actions Union[str, List[str]]

A single action name or list of action names to filter by.

required
n int

Maximum number of messages to return per action. If None, returns all matching messages.

None
**kwargs Any

Additional parameters (unused in base implementation).

{}

Returns:

Type Description
List[Message]

A list of Message objects, sorted by timestamp.

Source code in evoagentx/memory/memory.py
def get_by_action(self, actions: Union[str, List[str]], n: int=None, **kwargs) -> List[Message]:
    """
    return messages triggered by `actions` in the memory. 

    Args:
        actions: A single action name or list of action names to filter by.
        n: Maximum number of messages to return per action. If None, returns all matching messages.
        **kwargs (Any): Additional parameters (unused in base implementation).

    Returns:
        A list of Message objects, sorted by timestamp.
    """
    if isinstance(actions, str):
        actions = [actions]
    messages = []
    for action in actions:
        messages.extend(self.get_by_type(self._by_action, key=action, n=n, **kwargs))
    messages = Message.sort_by_timestamp(messages)
    return messages

get_by_wf_goal

get_by_wf_goal(wf_goals: Union[str, List[str]], n: int = None, **kwargs) -> List[Message]

return messages related to wf_goals in the memory.

Parameters:

Name Type Description Default
wf_goals Union[str, List[str]]

A single workflow goal or list of workflow goals to filter by.

required
n int

Maximum number of messages to return per workflow goal. If None, returns all matching messages.

None
**kwargs Any

Additional parameters (unused in base implementation).

{}

Returns:

Type Description
List[Message]

A list of Message objects, sorted by timestamp.

Source code in evoagentx/memory/memory.py
def get_by_wf_goal(self, wf_goals: Union[str, List[str]], n: int=None, **kwargs) -> List[Message]:
    """
    return messages related to `wf_goals` in the memory. 

    Args:
        wf_goals: A single workflow goal or list of workflow goals to filter by.
        n: Maximum number of messages to return per workflow goal. If None, returns all matching messages.
        **kwargs (Any): Additional parameters (unused in base implementation).

    Returns:
        A list of Message objects, sorted by timestamp.
    """
    if isinstance(wf_goals, str):
        wf_goals = [wf_goals]
    messages = []
    for wf_goal in wf_goals:
        messages.append(self.get_by_type(self._by_wf_goal, key=wf_goal, n=n, **kwargs))
    messages = Message.sort_by_timestamp(messages)
    return messages

ShortTermMemory

ShortTermMemory(**kwargs)

Bases: BaseMemory

Short-term memory implementation.

This class extends BaseMemory to represent a temporary, short-term memory storage. In the current implementation, it inherits all functionality from BaseMemory without modifications, but it provides a semantic distinction for different memory usage patterns in the framework.

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

LongTermMemory

LongTermMemory(**kwargs)

Bases: BaseMemory

Manages long-term storage and retrieval of memories, integrating with RAGEngine for indexing and StorageHandler for persistence.

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

init_module

init_module()

Initialize the RAG engine and memory indices.

Source code in evoagentx/memory/long_term_memory.py
def init_module(self):
    """Initialize the RAG engine and memory indices."""
    super().init_module()
    if self.rag_engine is None:
        self.rag_engine = RAGEngine(config=self.rag_config, storage_handler=self.storage_handler)
    if self.default_corpus_id is None:
        self.default_corpus_id = str(uuid4())
    logger.info(f"Initialized LongTermMemory with corpus_id {self.default_corpus_id}")

add

add(messages: Union[Message, str, List[Union[Message, str]]]) -> List[str]

Store messages in memory and index them in RAGEngine, returning memory_ids.

Source code in evoagentx/memory/long_term_memory.py
def add(self, messages: Union[Message, str, List[Union[Message, str]]]) -> List[str]:
    """Store messages in memory and index them in RAGEngine, returning memory_ids."""
    if not isinstance(messages, list):
        messages = [messages]
    messages = [Message(content=msg) if isinstance(msg, str) else msg for msg in messages]
    messages = [msg for msg in messages if msg.content]  # Filter out empty messages

    if not messages:
        logger.warning("No valid messages to add")
        return []

    # Hash-based deduplication
    existing_hashes = {
        record["content_hash"]
        for record in self.storage_handler.load(tables=[self.memory_table]).get(self.memory_table, [])
        if "content_hash" in record
    }
    memory_ids = [str(uuid4()) for _ in messages]
    final_messages = []
    final_memory_ids = []
    final_chunks = []

    for msg, memory_id in zip(messages, memory_ids):
        content_hash = hashlib.sha256(str(msg.content).encode()).hexdigest()
        if content_hash in existing_hashes:
            logger.info(f"Duplicate message found (hash): {msg.content[:50]}...")
            existing_id = next(
                (r["memory_id"] for r in self.storage_handler.load(tables=[self.memory_table]).get(self.memory_table, [])
                 if r.get("content_hash") == content_hash), None
            )
            if existing_id:
                final_memory_ids.append(existing_id)
                continue
        final_messages.append(msg)
        final_memory_ids.append(memory_id)
        chunk = self._create_memory_chunk(msg, memory_id)
        chunk.metadata.content_hash = content_hash
        final_chunks.append(chunk)

    if not final_chunks:
        logger.info("No messages added after deduplication")
        return final_memory_ids

    # Add to in-memory messages
    for msg in final_messages:
        super().add_message(msg)

    # Index in RAGEngine
    corpus = Corpus(chunks=final_chunks, corpus_id=self.default_corpus_id)
    chunk_ids = self.rag_engine.add(index_type=self.rag_config.index.index_type, nodes=corpus, corpus_id=self.default_corpus_id)
    if not chunk_ids:
        logger.error("Failed to index memories")
        return final_memory_ids

    return final_memory_ids

get async

get(memory_ids: Union[str, List[str]], return_chunk: bool = True) -> List[Tuple[Union[Chunk, Message], str]]

Retrieve memories by memory_ids, returning (Message/Chunk, memory_id) tuples.

Source code in evoagentx/memory/long_term_memory.py
async def get(self, memory_ids: Union[str, List[str]], return_chunk: bool = True) -> List[Tuple[Union[Chunk, Message], str]]:
    """Retrieve memories by memory_ids, returning (Message/Chunk, memory_id) tuples."""
    if not isinstance(memory_ids, list):
        memory_ids = [memory_ids]

    if not memory_ids:
        logger.warning("No memory_ids provided for get")
        return []

    try:
        chunks = await self.rag_engine.aget(
            corpus_id=self.default_corpus_id,
            index_type=self.rag_config.index.index_type,
            node_ids=memory_ids
        )
        results = [(self._chunk_to_message(chunk), chunk.metadata.memory_id) if not return_chunk else (chunk, chunk.metadata.memory_id)
                   for chunk in chunks if chunk]
        logger.info(f"Retrieved {len(results)} memories for memory_ids: {memory_ids}")
        return results
    except Exception as e:
        logger.error(f"Failed to get memories: {str(e)}")
        return []

delete

delete(memory_ids: Union[str, List[str]]) -> List[bool]

Delete memories by memory_ids, returning success status for each.

Source code in evoagentx/memory/long_term_memory.py
def delete(self, memory_ids: Union[str, List[str]]) -> List[bool]:
    """Delete memories by memory_ids, returning success status for each."""
    if not isinstance(memory_ids, list):
        memory_ids = [memory_ids]

    if not memory_ids:
        logger.warning("No memory_ids provided for deletion")
        return []

    successes = [False] * len(memory_ids)
    valid_memory_ids = []

    existing_chunks = asyncio.run(self.get(memory_ids, return_chunk=True))
    for idx, (chunk, mid) in enumerate(existing_chunks):
        if chunk:
            valid_memory_ids.append(mid)
            super().remove_message(self._chunk_to_message(chunk))
            successes[idx] = True

    if not valid_memory_ids:
        logger.info("No memories found for deletion")
        return successes

    # Remove from RAG index
    self.rag_engine.delete(
        corpus_id=self.default_corpus_id,
        index_type=self.rag_config.index.index_type,
        node_ids=valid_memory_ids
    )

    return successes

update

update(updates: Union[Tuple[str, Union[Message, str]], List[Tuple[str, Union[Message, str]]]]) -> List[bool]

Update memories with new content, returning success status for each.

Source code in evoagentx/memory/long_term_memory.py
def update(self, updates: Union[Tuple[str, Union[Message, str]], List[Tuple[str, Union[Message, str]]]]) -> List[bool]:
    """Update memories with new content, returning success status for each."""
    if not isinstance(updates, list):
        updates = [updates]
    updates = [(mid, Message(content=msg) if isinstance(msg, str) else msg) for mid, msg in updates]
    updates_dict = {mid: msg for mid, msg in updates if msg.content}

    if not updates_dict:
        logger.warning("No valid updates provided")
        return []

    memory_ids = list(updates_dict.keys())
    existing_memories = asyncio.run(self.get(memory_ids, return_chunk=False))
    existing_dict = {mid: msg for msg, mid in existing_memories}

    successes = [False] * len(updates)
    final_updates = []
    final_memory_ids = []

    for mid, msg in updates_dict.items():
        if mid not in existing_dict:
            logger.warning(f"No memory found with memory_id {mid}")
            continue
        final_updates.append((mid, msg))
        final_memory_ids.append(mid)
        successes[memory_ids.index(mid)] = True
        super().remove_message(existing_dict[mid])

    if not final_updates:
        logger.info("No memories updated")
        return successes

    chunks = [self._create_memory_chunk(msg, mid) for mid, msg in final_updates]
    for msg in [msg for _, msg in final_updates]:
        super().add_message(msg)

    corpus = Corpus(chunks=chunks, corpus_id=self.default_corpus_id)
    chunk_ids = self.rag_engine.add(index_type=self.rag_config.index.index_type, nodes=corpus, corpus_id=self.default_corpus_id)
    if not chunk_ids:
        logger.error(f"Failed to update memories in RAG index: {final_memory_ids}")
        return [False] * len(updates)

    return successes

search_async async

search_async(query: Union[str, Query], n: Optional[int] = None, metadata_filters: Optional[Dict] = None, return_chunk=False) -> List[Tuple[Message, str]]

Retrieve messages from RAG index asynchronously based on a query, returning messages and memory_ids.

Source code in evoagentx/memory/long_term_memory.py
async def search_async(self, query: Union[str, Query], n: Optional[int] = None,
                      metadata_filters: Optional[Dict] = None, return_chunk=False) -> List[Tuple[Message, str]]:
    """Retrieve messages from RAG index asynchronously based on a query, returning messages and memory_ids."""
    if isinstance(query, str):
        query_obj = Query(
            query_str=query,
            top_k=n or self.rag_config.retrieval.top_k,
            metadata_filters=metadata_filters or {}
        )
    else:
        query_obj = query
        query_obj.top_k = n or self.rag_config.retrieval.top_k
        if metadata_filters:
            query_obj.metadata_filters = {**query_obj.metadata_filters, **metadata_filters} if query_obj.metadata_filters else metadata_filters

    try:
        result: RagResult = await self.rag_engine.query_async(query_obj, corpus_id=self.default_corpus_id)
        if return_chunk:
            return [(chunk, chunk.metadata.memory_id) for chunk in result.corpus.chunks]
        else:
            messages = [(self._chunk_to_message(chunk), chunk.metadata.memory_id) for chunk in result.corpus.chunks]
        logger.info(f"Retrieved {len(messages)} memories for query: {query_obj.query_str}")
        return messages[:n] if n else messages
    except Exception as e:
        logger.error(f"Failed to search memories: {str(e)}")
        return []

search

search(query: Union[str, Query], n: Optional[int] = None, metadata_filters: Optional[Dict] = None) -> List[Tuple[Message, str]]

Synchronous wrapper for searching memories.

Source code in evoagentx/memory/long_term_memory.py
def search(self, query: Union[str, Query], n: Optional[int] = None,
           metadata_filters: Optional[Dict] = None) -> List[Tuple[Message, str]]:
    """Synchronous wrapper for searching memories."""
    return asyncio.run(self.search_async(query, n, metadata_filters))

clear

clear() -> None

Clear all messages and indices.

Source code in evoagentx/memory/long_term_memory.py
def clear(self) -> None:
    """Clear all messages and indices."""
    super().clear()
    self.rag_engine.clear(corpus_id=self.default_corpus_id)
    logger.info(f"Cleared LongTermMemory with corpus_id {self.default_corpus_id}")

save

save(save_path: Optional[str] = None) -> None

Save all indices and memory data to database.

Source code in evoagentx/memory/long_term_memory.py
def save(self, save_path: Optional[str] = None) -> None:
    """Save all indices and memory data to database."""
    self.rag_engine.save(output_path=save_path, corpus_id=self.default_corpus_id, table=self.memory_table)

load

load(save_path: Optional[str] = None) -> List[str]

Load memory data from database and reconstruct indices, returning memory_ids.

Source code in evoagentx/memory/long_term_memory.py
def load(self, save_path: Optional[str] = None) -> List[str]:
    """Load memory data from database and reconstruct indices, returning memory_ids."""
    return self.rag_engine.load(source=save_path, corpus_id=self.default_corpus_id, table=self.memory_table)

MemoryManager

MemoryManager(**kwargs)

Bases: BaseModule

The Memory Manager organizes and manages LongTermMemory data at a higher level. It retrieves data, processes it with optional LLM-based action inference, and stores new or updated data. It creates Message objects for agent use, combining user prompts with memory context.

Attributes:

Name Type Description
memory LongTermMemory

The LongTermMemory instance for storing and retrieving messages.

llm Optional[BaseLLM]

LLM for deciding memory operations.

use_llm_management bool

Toggle LLM-based memory management.

Source code in evoagentx/core/module.py
def __init__(self, **kwargs):
    """
    Initializes a BaseModule instance.

    Args:
        **kwargs (Any): Keyword arguments used to initialize the instance

    Raises:
        ValidationError: When parameter validation fails
        Exception: When other errors occur during initialization
    """

    try:
        for field_name, _ in type(self).model_fields.items():
            field_value = kwargs.get(field_name, None)
            if field_value:
                kwargs[field_name] = self._process_data(field_value)
            # if field_value and isinstance(field_value, dict) and "class_name" in field_value:
            #     class_name = field_value.get("class_name")
            #     sub_cls = MODULE_REGISTRY.get_module(cls_name=class_name)
            #     kwargs[field_name] = sub_cls._create_instance(field_value)
        super().__init__(**kwargs) 
        self.init_module()
    except (ValidationError, Exception) as e:
        exception_handler = callback_manager.get_callback("exception_buffer")
        if exception_handler is None:
            error_message = get_base_module_init_error_message(
                cls=self.__class__, 
                data=kwargs, 
                errors=e
            )
            logger.error(error_message)
            raise
        else:
            exception_handler.add(e)

handle_memory async

handle_memory(action: str, user_prompt: Optional[Union[str, Message, Query]] = None, data: Optional[Union[Message, str, List[Union[Message, str]], Dict, List[Tuple[str, Union[Message, str]]]]] = None, top_k: Optional[int] = None, metadata_filters: Optional[Dict] = None) -> Union[List[str], List[Tuple[Message, str]], List[bool], Message, None]

Handle memory operations based on the specified action, with optional LLM inference.

Parameters:

Name Type Description Default
action str

The memory operation ("add", "search", "get", "update", "delete", "clear", "save", "load", "create_message").

required
user_prompt Optional[Union[str, Message, Query]]

The user prompt or query to process with memory data.

None
data Optional

Input data for the operation (e.g., messages, memory IDs, updates).

None
top_k Optional[int]

Number of results to retrieve for search operations.

None
metadata_filters Optional[Dict]

Filters for memory retrieval.

None

Returns:

Type Description
Union[List[str], List[Tuple[Message, str]], List[bool], Message, None]

Union[List[str], List[Tuple[Message, str]], List[bool], Message, None]: Result of the operation.

Source code in evoagentx/memory/memory_manager.py
async def handle_memory(
    self,
    action: str,
    user_prompt: Optional[Union[str, Message, Query]] = None,
    data: Optional[Union[Message, str, List[Union[Message, str]], Dict, List[Tuple[str, Union[Message, str]]]]] = None,
    top_k: Optional[int] = None,
    metadata_filters: Optional[Dict] = None
) -> Union[List[str], List[Tuple[Message, str]], List[bool], Message, None]:
    """
    Handle memory operations based on the specified action, with optional LLM inference.

    Args:
        action (str): The memory operation ("add", "search", "get", "update", "delete", "clear", "save", "load", "create_message").
        user_prompt (Optional[Union[str, Message, Query]]): The user prompt or query to process with memory data.
        data (Optional): Input data for the operation (e.g., messages, memory IDs, updates).
        top_k (Optional[int]): Number of results to retrieve for search operations.
        metadata_filters (Optional[Dict]): Filters for memory retrieval.

    Returns:
        Union[List[str], List[Tuple[Message, str]], List[bool], Message, None]: Result of the operation.
    """
    if action not in ["add", "search", "get", "update", "delete", "clear", "save", "load", "create_message"]:
        logger.error(f"Invalid action: {action}")
        raise ValueError(f"Invalid action: {action}")

    if action == "add":
        if not data:
            logger.warning("No data provided for add operation")
            return []
        if not isinstance(data, list):
            data = [data]
        messages = [
            Message(
                content=msg if isinstance(msg, str) else msg.content,
                msg_type=MessageType.REQUEST if isinstance(msg, str) else msg.msg_type,
                timestamp=datetime.now().isoformat() if isinstance(msg, str) else msg.timestamp,
                agent="user" if isinstance(msg, str) else msg.agent,
                message_id=str(uuid4()) if isinstance(msg, str) or not msg.message_id else msg.message_id
            ) for msg in data
        ]
        input_data = [
            {
                "action": "add",
                "memory_id": str(uuid4()),
                "message": msg.to_dict()
            } for msg in messages
        ]
        if self.use_llm_management and self.llm:
            llm_decisions = await self._prompt_llm_for_memory_operation(input_data)
            final_messages = []
            final_memory_ids = []
            for decision, msg in zip(llm_decisions, messages):
                if decision.get("action") != "add":
                    logger.info(f"LLM rejected adding memory: {decision}")
                    continue
                final_messages.append(msg)
                final_memory_ids.append(decision.get("memory_id"))
            return self.memory.add(final_messages) if final_messages else []
        return self.memory.add(messages)

    elif action == "search":
        if not user_prompt:
            logger.warning("No user_prompt provided for search operation")
            return []
        if isinstance(user_prompt, Message):
            user_prompt = user_prompt.content
        return await self.memory.search_async(user_prompt, top_k, metadata_filters)

    elif action == "get":
        if not data:
            logger.warning("No memory IDs provided for get operation")
            return []
        return await self.memory.get(data, return_chunk=False)

    elif action == "update":
        if not data:
            logger.warning("No updates provided for update operation")
            return []
        updates = [
            (mid, Message(
                content=msg if isinstance(msg, str) else msg.content,
                msg_type=MessageType.REQUEST if isinstance(msg, str) else msg.msg_type,
                timestamp=datetime.now().isoformat(),
                agent="user" if isinstance(msg, str) else msg.agent,
                message_id=str(uuid4()) if isinstance(msg, str) or not msg.message_id else msg.message_id
            )) for mid, msg in (data if isinstance(data, list) else [data])
        ]
        input_data = [
            {
                "action": "update",
                "memory_id": mid,
                "message": msg.to_dict()
            } for mid, msg in updates
        ]
        if self.use_llm_management and self.llm:
            existing_memories = await self.memory.get([mid for mid, _ in updates])
            llm_decisions = await self._prompt_llm_for_memory_operation(input_data, relevant_data=existing_memories)
            final_updates = []
            for decision, (mid, msg) in zip(llm_decisions, updates):
                if decision.get("action") != "update":
                    logger.info(f"LLM rejected updating memory {mid}: {decision}")
                    continue
                final_updates.append((mid, msg))
            return self.memory.update(final_updates) if final_updates else [False] * len(updates)
        return self.memory.update(updates)

    elif action == "delete":
        if not data:
            logger.warning("No memory IDs provided for delete operation")
            return []
        memory_ids = data if isinstance(data, list) else [data]
        if self.use_llm_management and self.llm:
            input_data = [{"action": "delete", "memory_id": mid} for mid in memory_ids]
            existing_memories = await self.memory.get(memory_ids)
            llm_decisions = await self._prompt_llm_for_memory_operation(input_data, relevant_data=existing_memories)
            valid_memory_ids = [decision.get("memory_id") for decision in llm_decisions if decision.get("action") == "delete"]
            return self.memory.delete(valid_memory_ids) if valid_memory_ids else [False] * len(memory_ids)
        return self.memory.delete(memory_ids)

    elif action == "clear":
        self.memory.clear()
        return None

    elif action == "save":
        self.memory.save(data)
        return None

    elif action == "load":
        return self.memory.load(data)

    elif action == "create_message":
        if not user_prompt:
            logger.warning("No user_prompt provided for create_message operation")
            return None
        if isinstance(user_prompt, Query):
            user_prompt = user_prompt.query_str
        elif isinstance(user_prompt, Message):
            user_prompt = user_prompt.content
        memories = await self.memory.search_async(user_prompt, top_k, metadata_filters)
        context = "\n".join([msg.content for msg, _ in memories])
        memory_ids = [mid for _, mid in memories]
        combined_content = f"User Prompt: {user_prompt}\nContext: {context}" if context else user_prompt
        return Message(
            content=combined_content,
            msg_type=MessageType.REQUEST,
            timestamp=datetime.now().isoformat(),
            agent="user",
            memory_ids=memory_ids
        )

create_conversation_message async

create_conversation_message(user_prompt: Union[str, Message], conversation_id: str, top_k: Optional[int] = None, metadata_filters: Optional[Dict] = None) -> Message

Create a Message combining user prompt with conversation history and relevant memories.

Parameters:

Name Type Description Default
user_prompt Union[str, Message]

The user's input prompt or message.

required
conversation_id str

ID of the conversation thread.

required
top_k Optional[int]

Number of results to retrieve.

None
metadata_filters Optional[Dict]

Filters for memory retrieval.

None

Returns:

Name Type Description
Message Message

A new Message object with user prompt, history, and memory context.

Source code in evoagentx/memory/memory_manager.py
async def create_conversation_message(
    self,
    user_prompt: Union[str, Message],
    conversation_id: str,
    top_k: Optional[int] = None,
    metadata_filters: Optional[Dict] = None
) -> Message:
    """
    Create a Message combining user prompt with conversation history and relevant memories.

    Args:
        user_prompt (Union[str, Message]): The user's input prompt or message.
        conversation_id (str): ID of the conversation thread.
        top_k (Optional[int]): Number of results to retrieve.
        metadata_filters (Optional[Dict]): Filters for memory retrieval.

    Returns:
        Message: A new Message object with user prompt, history, and memory context.
    """
    if isinstance(user_prompt, Message):
        user_prompt = user_prompt.content

    # Retrieve conversation history
    history_filter = {"corpus_id": conversation_id}
    if metadata_filters:
        history_filter.update(metadata_filters)
    history_results = await self.memory.search_async(
        query=user_prompt, n=top_k or 10, metadata_filters=history_filter
    )
    history = "\n".join([f"{msg.content}" for msg, _ in history_results])

    # Combine prompt, history, and context
    combined_content = (
        f"User Prompt: \n{user_prompt}\n"
        f"Conversation History: \n\n{history or 'No history available'}\n"
    )
    return Message(
        content=combined_content,
        msg_type=MessageType.REQUEST,
        timestamp=datetime.now().isoformat(),
        agent="user",
        memory_ids=user_prompt.message_id if isinstance(user_prompt, Message) else str(uuid4())
    )