← All articles

OpenClaw Memory Management: Build AI Agents That Learn and Remember

Complete guide to OpenClaw's memory system for building AI agents that remember context, learn from interactions, and improve over time. Includes memory optimization and advanced patterns.

Memory is what separates intelligent AI agents from simple chatbots. OpenClaw's memory system allows your agents to remember conversations, learn from decisions, and build context that improves their performance over time.

This guide covers everything from basic memory configuration to advanced patterns for building agents that genuinely get smarter with experience.


Understanding OpenClaw Memory Architecture

OpenClaw implements a multi-layered memory system that mirrors how humans store and recall information:

Short-term Memory: Current conversation context and immediate working data Long-term Memory: Persistent knowledge, preferences, and learned patterns Episodic Memory: Specific events, interactions, and their outcomes Semantic Memory: General facts, relationships, and procedural knowledge Working Memory: Active processing space for complex operations

This architecture enables agents to maintain context across sessions, learn from experience, and make increasingly intelligent decisions.


Memory Configuration Basics

Core Memory Settings

Configure memory in ~/.openclaw/config.yaml:

# Memory Configuration
memory:
  # Storage backend
  backend: "file"  # file, redis, postgresql, or mongodb
  storage_path: "~/.openclaw/memory"

  # Memory limits and optimization
  max_memory_mb: 1024  # 1GB memory limit
  cleanup_interval_hours: 24
  compression_enabled: true

  # Retention policies
  retention:
    short_term_hours: 24
    episodic_days: 30
    semantic_days: 365
    working_memory_minutes: 60

  # Learning settings
  learning:
    enabled: true
    confidence_threshold: 0.7
    pattern_detection: true
    preference_learning: true

  # Privacy and security
  privacy:
    encrypt_memories: true
    anonymize_pii: true
    retention_compliance: "gdpr"  # gdpr, ccpa, or custom

Memory Storage Backends

File-based Storage (Default):

memory:
  backend: file
  storage_path: ~/.openclaw/memory
  backup_enabled: true
  backup_interval_hours: 6

Redis Backend (Recommended for Production):

memory:
  backend: redis
  redis:
    host: localhost
    port: 6379
    db: 0
    password: your_redis_password
    ssl: true

PostgreSQL Backend (Enterprise):

memory:
  backend: postgresql
  postgresql:
    host: localhost
    port: 5432
    database: openclaw_memory
    username: openclaw
    password: secure_password
    ssl_mode: require

Types of Memory in OpenClaw

1. Conversational Memory

Tracks ongoing conversations and context across sessions.

Example Memory Structure:

from openclaw.memory import ConversationMemory

class EmailAssistantMemory(ConversationMemory):
    """Memory specifically for email assistance"""

    def __init__(self, agent_id):
        super().__init__(agent_id, memory_type="conversation")
        self.email_context = {}
        self.user_preferences = {}

    async def store_email_interaction(self, email_data, user_action, outcome):
        """Store email handling patterns"""
        interaction = {
            'timestamp': datetime.now(),
            'email_type': email_data.get('type'),
            'sender_domain': self._extract_domain(email_data['from']),
            'user_action': user_action,  # 'archive', 'reply', 'forward', etc.
            'outcome': outcome,  # 'successful', 'needs_revision', etc.
            'response_time': email_data.get('response_time')
        }

        await self.store_memory(
            memory_key=f"email_interaction_{datetime.now().timestamp()}",
            content=interaction,
            tags=['email', 'interaction', email_data.get('type')]
        )

    async def learn_email_patterns(self):
        """Analyze stored interactions to learn patterns"""
        interactions = await self.recall_memories(tags=['email', 'interaction'])

        # Analyze patterns
        patterns = {
            'priority_senders': self._identify_priority_senders(interactions),
            'action_patterns': self._analyze_action_patterns(interactions),
            'optimal_response_times': self._calculate_response_times(interactions)
        }

        # Store learned patterns
        await self.store_memory(
            memory_key="learned_email_patterns",
            content=patterns,
            memory_type="semantic"
        )

        return patterns

    def _identify_priority_senders(self, interactions):
        """Identify senders that typically require quick responses"""
        sender_stats = {}

        for interaction in interactions:
            sender = interaction['sender_domain']
            action = interaction['user_action']

            if sender not in sender_stats:
                sender_stats[sender] = {'quick_responses': 0, 'total': 0}

            sender_stats[sender]['total'] += 1
            if interaction.get('response_time', 0) < 3600:  # < 1 hour
                sender_stats[sender]['quick_responses'] += 1

        # Calculate priority scores
        priority_senders = []
        for sender, stats in sender_stats.items():
            if stats['total'] >= 3:  # Minimum interactions
                priority_score = stats['quick_responses'] / stats['total']
                if priority_score > 0.7:  # 70% quick response rate
                    priority_senders.append({
                        'sender': sender,
                        'priority_score': priority_score,
                        'sample_size': stats['total']
                    })

        return sorted(priority_senders, key=lambda x: x['priority_score'], reverse=True)

2. Procedural Memory

Stores learned procedures and workflows.

class ProcedureMemory:
    """Stores and recalls procedural knowledge"""

    async def store_procedure(self, procedure_name, steps, success_rate=None):
        """Store a workflow procedure"""
        procedure = {
            'name': procedure_name,
            'steps': steps,
            'success_rate': success_rate,
            'created_at': datetime.now(),
            'usage_count': 0,
            'last_used': None
        }

        await self.store_memory(
            memory_key=f"procedure_{procedure_name}",
            content=procedure,
            memory_type="procedural",
            tags=['procedure', 'workflow']
        )

    async def recall_procedure(self, procedure_name):
        """Recall and update procedure usage stats"""
        procedure = await self.get_memory(f"procedure_{procedure_name}")

        if procedure:
            # Update usage statistics
            procedure['usage_count'] += 1
            procedure['last_used'] = datetime.now()

            await self.store_memory(
                memory_key=f"procedure_{procedure_name}",
                content=procedure,
                memory_type="procedural"
            )

        return procedure

    async def optimize_procedure(self, procedure_name, execution_result):
        """Learn from procedure execution to optimize it"""
        procedure = await self.get_memory(f"procedure_{procedure_name}")

        if not procedure:
            return

        # Analyze execution result
        if execution_result.get('success'):
            # Increase confidence in successful steps
            procedure['success_rate'] = (
                (procedure.get('success_rate', 0.5) * procedure['usage_count'] + 1) /
                (procedure['usage_count'] + 1)
            )
        else:
            # Identify failed steps and suggest improvements
            failed_step = execution_result.get('failed_step')
            if failed_step:
                # Store failure analysis
                await self.store_memory(
                    memory_key=f"procedure_failure_{procedure_name}_{datetime.now().timestamp()}",
                    content={
                        'procedure': procedure_name,
                        'failed_step': failed_step,
                        'error': execution_result.get('error'),
                        'suggested_improvement': execution_result.get('suggestion')
                    },
                    memory_type="episodic",
                    tags=['procedure', 'failure', 'learning']
                )

        await self.store_memory(
            memory_key=f"procedure_{procedure_name}",
            content=procedure,
            memory_type="procedural"
        )

3. Contextual Memory

Maintains context about ongoing projects, relationships, and states.

class ContextualMemory:
    """Manages contextual information about projects, people, and situations"""

    async def store_project_context(self, project_id, context_data):
        """Store project-specific context"""
        context = {
            'project_id': project_id,
            'stakeholders': context_data.get('stakeholders', []),
            'current_phase': context_data.get('phase'),
            'key_decisions': context_data.get('decisions', []),
            'blockers': context_data.get('blockers', []),
            'communication_patterns': context_data.get('communication', {}),
            'updated_at': datetime.now()
        }

        await self.store_memory(
            memory_key=f"project_context_{project_id}",
            content=context,
            memory_type="contextual",
            tags=['project', project_id, context_data.get('phase', '')]
        )

    async def get_project_context(self, project_id):
        """Retrieve comprehensive project context"""
        # Get base context
        context = await self.get_memory(f"project_context_{project_id}")

        if not context:
            return None

        # Enrich with related memories
        related_memories = await self.recall_memories(
            tags=[project_id],
            memory_type="episodic",
            limit=20
        )

        context['recent_activities'] = related_memories
        context['last_accessed'] = datetime.now()

        return context

    async def update_person_context(self, person_id, interaction_data):
        """Update context about a person based on new interactions"""
        existing_context = await self.get_memory(f"person_context_{person_id}") or {}

        # Merge new interaction data
        updated_context = {
            'person_id': person_id,
            'communication_style': self._analyze_communication_style(
                existing_context.get('interactions', []) + [interaction_data]
            ),
            'preferences': self._extract_preferences(interaction_data),
            'expertise_areas': self._identify_expertise(interaction_data),
            'availability_patterns': self._analyze_availability(interaction_data),
            'last_interaction': datetime.now(),
            'interaction_count': existing_context.get('interaction_count', 0) + 1
        }

        await self.store_memory(
            memory_key=f"person_context_{person_id}",
            content=updated_context,
            memory_type="contextual",
            tags=['person', person_id, 'relationship']
        )

Advanced Memory Patterns

1. Hierarchical Memory Organization

Organize memories in hierarchical structures for better recall and organization.

class HierarchicalMemory:
    """Hierarchical memory organization for complex knowledge structures"""

    def __init__(self):
        self.memory_tree = {
            'root': {
                'projects': {},
                'people': {},
                'procedures': {},
                'knowledge': {}
            }
        }

    async def store_hierarchical_memory(self, path, content, metadata=None):
        """Store memory in hierarchical structure"""
        path_parts = path.split('.')
        current_node = self.memory_tree['root']

        # Navigate to parent node
        for part in path_parts[:-1]:
            if part not in current_node:
                current_node[part] = {}
            current_node = current_node[part]

        # Store memory at final location
        memory_item = {
            'content': content,
            'metadata': metadata or {},
            'created_at': datetime.now(),
            'path': path
        }

        current_node[path_parts[-1]] = memory_item

        # Also store in flat structure for search
        await self.store_memory(
            memory_key=f"hierarchical_{path}",
            content=memory_item,
            tags=path_parts + ['hierarchical']
        )

    async def recall_hierarchical_memory(self, path):
        """Recall memory from hierarchical path"""
        return await self.get_memory(f"hierarchical_{path}")

    async def get_memory_subtree(self, path):
        """Get entire subtree of memories"""
        path_parts = path.split('.') if path else []
        memories = await self.recall_memories(
            tags=path_parts + ['hierarchical'],
            limit=1000
        )

        # Build subtree from flat results
        subtree = {}
        for memory in memories:
            memory_path = memory['content']['path']
            if memory_path.startswith(path):
                relative_path = memory_path[len(path):].strip('.')
                if relative_path:
                    self._insert_into_subtree(subtree, relative_path, memory['content'])

        return subtree

    def _insert_into_subtree(self, tree, path, content):
        """Helper to insert memory into subtree structure"""
        parts = path.split('.')
        current = tree

        for part in parts[:-1]:
            if part not in current:
                current[part] = {}
            current = current[part]

        current[parts[-1]] = content

2. Associative Memory Networks

Build networks of related memories for intelligent recall.

class AssociativeMemory:
    """Memory system that builds associations between related concepts"""

    def __init__(self):
        self.associations = {}  # memory_id -> [related_memory_ids]
        self.concept_vectors = {}  # memory_id -> embedding vector

    async def store_associated_memory(self, content, tags=None, related_to=None):
        """Store memory and build associations"""
        memory_id = f"memory_{datetime.now().timestamp()}"

        # Store base memory
        await self.store_memory(
            memory_key=memory_id,
            content=content,
            tags=tags or []
        )

        # Generate embedding for content
        embedding = await self._generate_embedding(content)
        self.concept_vectors[memory_id] = embedding

        # Build associations
        if related_to:
            # Explicit associations
            self._add_association(memory_id, related_to)
        else:
            # Find similar memories through embedding similarity
            similar_memories = await self._find_similar_memories(embedding, threshold=0.7)
            for similar_id, similarity in similar_memories:
                self._add_association(memory_id, similar_id)

        return memory_id

    async def recall_with_associations(self, query, max_depth=3):
        """Recall memories and their associations"""
        # Find initial matches
        query_embedding = await self._generate_embedding(query)
        initial_matches = await self._find_similar_memories(query_embedding)

        # Expand through associations
        expanded_results = {}
        for memory_id, relevance in initial_matches:
            expanded_results[memory_id] = {
                'memory': await self.get_memory(memory_id),
                'relevance': relevance,
                'depth': 0
            }

            # Follow associations
            await self._expand_associations(
                memory_id, expanded_results, current_depth=0, max_depth=max_depth
            )

        return expanded_results

    async def _expand_associations(self, memory_id, results, current_depth, max_depth):
        """Recursively expand through memory associations"""
        if current_depth >= max_depth:
            return

        associated_ids = self.associations.get(memory_id, [])

        for assoc_id in associated_ids:
            if assoc_id not in results:
                results[assoc_id] = {
                    'memory': await self.get_memory(assoc_id),
                    'relevance': 0.8 - (current_depth * 0.2),  # Decay relevance by depth
                    'depth': current_depth + 1
                }

                # Continue expansion
                await self._expand_associations(
                    assoc_id, results, current_depth + 1, max_depth
                )

    def _add_association(self, memory1_id, memory2_id):
        """Add bidirectional association between memories"""
        if memory1_id not in self.associations:
            self.associations[memory1_id] = []
        if memory2_id not in self.associations:
            self.associations[memory2_id] = []

        if memory2_id not in self.associations[memory1_id]:
            self.associations[memory1_id].append(memory2_id)
        if memory1_id not in self.associations[memory2_id]:
            self.associations[memory2_id].append(memory1_id)

    async def _generate_embedding(self, text):
        """Generate embedding vector for text content"""
        # Use OpenAI embeddings or local model
        import openai

        response = await openai.Embedding.acreate(
            model="text-embedding-ada-002",
            input=text
        )

        return response['data'][0]['embedding']

    async def _find_similar_memories(self, query_embedding, threshold=0.6):
        """Find memories similar to query embedding"""
        similarities = []

        for memory_id, memory_embedding in self.concept_vectors.items():
            similarity = self._cosine_similarity(query_embedding, memory_embedding)
            if similarity >= threshold:
                similarities.append((memory_id, similarity))

        return sorted(similarities, key=lambda x: x[1], reverse=True)

    def _cosine_similarity(self, vec1, vec2):
        """Calculate cosine similarity between two vectors"""
        import numpy as np
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

3. Temporal Memory Management

Manage memories based on time and relevance decay.

class TemporalMemory:
    """Time-aware memory management with relevance decay"""

    def __init__(self):
        self.decay_functions = {
            'exponential': self._exponential_decay,
            'linear': self._linear_decay,
            'stepped': self._stepped_decay
        }

    async def store_temporal_memory(self, content, importance_score=0.5, decay_function='exponential'):
        """Store memory with temporal characteristics"""
        memory_item = {
            'content': content,
            'created_at': datetime.now(),
            'importance_score': importance_score,
            'decay_function': decay_function,
            'access_count': 0,
            'last_accessed': datetime.now()
        }

        memory_id = f"temporal_{datetime.now().timestamp()}"
        await self.store_memory(memory_key=memory_id, content=memory_item)

        return memory_id

    async def recall_temporal_memories(self, query, time_weight=0.3):
        """Recall memories considering temporal relevance"""
        all_memories = await self.recall_memories(limit=1000)
        current_time = datetime.now()

        scored_memories = []

        for memory in all_memories:
            memory_data = memory['content']
            created_at = memory_data.get('created_at')

            if not created_at:
                continue

            # Calculate temporal relevance
            age_seconds = (current_time - created_at).total_seconds()
            temporal_relevance = self._calculate_temporal_relevance(
                age_seconds,
                memory_data.get('importance_score', 0.5),
                memory_data.get('decay_function', 'exponential'),
                memory_data.get('access_count', 0)
            )

            # Calculate content relevance (simplified - in practice, use embeddings)
            content_relevance = self._calculate_content_relevance(
                query, memory_data['content']
            )

            # Combine scores
            total_score = (
                (1 - time_weight) * content_relevance +
                time_weight * temporal_relevance
            )

            scored_memories.append({
                'memory': memory,
                'score': total_score,
                'temporal_relevance': temporal_relevance,
                'content_relevance': content_relevance
            })

        # Sort by total score and update access patterns
        scored_memories.sort(key=lambda x: x['score'], reverse=True)

        # Update access count for returned memories
        for scored_memory in scored_memories[:10]:  # Top 10
            await self._update_access_pattern(scored_memory['memory'])

        return scored_memories

    def _calculate_temporal_relevance(self, age_seconds, importance, decay_function, access_count):
        """Calculate how relevant a memory is based on time"""
        decay_func = self.decay_functions.get(decay_function, self._exponential_decay)

        # Base decay
        base_relevance = decay_func(age_seconds, importance)

        # Boost for frequently accessed memories
        access_boost = min(0.3, access_count * 0.05)  # Max 30% boost

        return min(1.0, base_relevance + access_boost)

    def _exponential_decay(self, age_seconds, importance):
        """Exponential decay function"""
        import math
        half_life = 86400 * 7 * importance  # Week * importance as half-life
        return math.exp(-0.693 * age_seconds / half_life)

    def _linear_decay(self, age_seconds, importance):
        """Linear decay function"""
        max_age = 86400 * 30 * importance  # Month * importance
        return max(0, 1 - (age_seconds / max_age))

    def _stepped_decay(self, age_seconds, importance):
        """Stepped decay function"""
        age_days = age_seconds / 86400

        if age_days <= 1:
            return 1.0 * importance
        elif age_days <= 7:
            return 0.8 * importance
        elif age_days <= 30:
            return 0.5 * importance
        elif age_days <= 90:
            return 0.2 * importance
        else:
            return 0.1 * importance

    async def _update_access_pattern(self, memory):
        """Update memory access patterns"""
        memory_data = memory['content']
        memory_data['access_count'] += 1
        memory_data['last_accessed'] = datetime.now()

        await self.store_memory(
            memory_key=memory['key'],
            content=memory_data
        )

Memory Optimization and Maintenance

Automatic Memory Cleanup

class MemoryOptimizer:
    """Optimizes memory usage and maintains memory health"""

    async def run_optimization_cycle(self):
        """Run complete memory optimization"""
        optimization_results = {}

        # 1. Cleanup expired memories
        optimization_results['expired_cleanup'] = await self._cleanup_expired_memories()

        # 2. Compress old memories
        optimization_results['compression'] = await self._compress_old_memories()

        # 3. Merge duplicate memories
        optimization_results['deduplication'] = await self._deduplicate_memories()

        # 4. Rebuild search indices
        optimization_results['indexing'] = await self._rebuild_search_indices()

        # 5. Update memory statistics
        optimization_results['statistics'] = await self._update_memory_statistics()

        return optimization_results

    async def _cleanup_expired_memories(self):
        """Remove memories past their retention period"""
        current_time = datetime.now()
        cleanup_counts = {}

        # Get retention policies from config
        retention_policies = {
            'short_term': timedelta(hours=24),
            'working': timedelta(hours=1),
            'episodic': timedelta(days=30),
            'semantic': timedelta(days=365)
        }

        for memory_type, retention_period in retention_policies.items():
            cutoff_time = current_time - retention_period

            expired_memories = await self.get_memories_before(cutoff_time, memory_type)

            for memory in expired_memories:
                # Check if memory should be preserved (high importance, frequent access)
                if not await self._should_preserve_memory(memory):
                    await self.delete_memory(memory['key'])

            cleanup_counts[memory_type] = len(expired_memories)

        return cleanup_counts

    async def _compress_old_memories(self):
        """Compress memories older than threshold"""
        compress_threshold = datetime.now() - timedelta(days=7)
        old_memories = await self.get_memories_before(compress_threshold)

        compression_results = {'compressed': 0, 'space_saved': 0}

        for memory in old_memories:
            if memory.get('compressed'):
                continue  # Already compressed

            # Compress memory content
            original_size = len(str(memory['content']))
            compressed_content = await self._compress_content(memory['content'])
            compressed_size = len(str(compressed_content))

            # Update memory with compressed version
            memory['content'] = compressed_content
            memory['compressed'] = True
            memory['original_size'] = original_size

            await self.store_memory(memory['key'], memory)

            compression_results['compressed'] += 1
            compression_results['space_saved'] += original_size - compressed_size

        return compression_results

    async def _deduplicate_memories(self):
        """Find and merge duplicate or very similar memories"""
        all_memories = await self.recall_memories(limit=10000)
        duplicates_found = 0
        duplicates_merged = 0

        # Group memories by similarity
        similarity_groups = {}

        for i, memory1 in enumerate(all_memories):
            for j, memory2 in enumerate(all_memories[i+1:], i+1):
                similarity = await self._calculate_memory_similarity(memory1, memory2)

                if similarity > 0.85:  # 85% similarity threshold
                    group_key = f"{min(i,j)}_{max(i,j)}"
                    similarity_groups[group_key] = [memory1, memory2, similarity]

        # Merge similar memories
        for group_key, (memory1, memory2, similarity) in similarity_groups.items():
            merged_memory = await self._merge_memories(memory1, memory2)

            # Delete original memories and store merged version
            await self.delete_memory(memory1['key'])
            await self.delete_memory(memory2['key'])

            merged_key = f"merged_{datetime.now().timestamp()}"
            await self.store_memory(merged_key, merged_memory)

            duplicates_found += 2
            duplicates_merged += 1

        return {
            'duplicates_found': duplicates_found,
            'duplicates_merged': duplicates_merged,
            'space_saved': duplicates_found - duplicates_merged
        }

    async def _should_preserve_memory(self, memory):
        """Determine if a memory should be preserved despite age"""
        # High importance score
        if memory.get('importance_score', 0) > 0.8:
            return True

        # High access count
        if memory.get('access_count', 0) > 10:
            return True

        # Recent access
        last_accessed = memory.get('last_accessed')
        if last_accessed and (datetime.now() - last_accessed).days < 7:
            return True

        # Contains important tags
        important_tags = ['critical', 'procedure', 'key_decision']
        memory_tags = memory.get('tags', [])
        if any(tag in memory_tags for tag in important_tags):
            return True

        return False

Memory Analytics and Insights

class MemoryAnalytics:
    """Analyze memory usage patterns and provide insights"""

    async def generate_memory_report(self):
        """Generate comprehensive memory analytics report"""
        report = {}

        # Basic statistics
        report['statistics'] = await self._get_memory_statistics()

        # Usage patterns
        report['usage_patterns'] = await self._analyze_usage_patterns()

        # Memory health
        report['health'] = await self._assess_memory_health()

        # Optimization suggestions
        report['suggestions'] = await self._generate_optimization_suggestions()

        return report

    async def _get_memory_statistics(self):
        """Get basic memory usage statistics"""
        all_memories = await self.recall_memories(limit=100000)

        stats = {
            'total_memories': len(all_memories),
            'memory_types': {},
            'size_distribution': {},
            'age_distribution': {},
            'access_patterns': {}
        }

        current_time = datetime.now()

        for memory in all_memories:
            # Memory type distribution
            memory_type = memory.get('memory_type', 'unknown')
            stats['memory_types'][memory_type] = stats['memory_types'].get(memory_type, 0) + 1

            # Size distribution
            content_size = len(str(memory.get('content', '')))
            size_bucket = self._get_size_bucket(content_size)
            stats['size_distribution'][size_bucket] = stats['size_distribution'].get(size_bucket, 0) + 1

            # Age distribution
            created_at = memory.get('created_at')
            if created_at:
                age_days = (current_time - created_at).days
                age_bucket = self._get_age_bucket(age_days)
                stats['age_distribution'][age_bucket] = stats['age_distribution'].get(age_bucket, 0) + 1

            # Access patterns
            access_count = memory.get('access_count', 0)
            access_bucket = self._get_access_bucket(access_count)
            stats['access_patterns'][access_bucket] = stats['access_patterns'].get(access_bucket, 0) + 1

        return stats

    async def _analyze_usage_patterns(self):
        """Analyze how memory is being used"""
        memories = await self.recall_memories(limit=10000)

        patterns = {
            'most_accessed_types': {},
            'recent_activity': {},
            'learning_indicators': {},
            'efficiency_metrics': {}
        }

        # Most accessed memory types
        type_access = {}
        for memory in memories:
            memory_type = memory.get('memory_type', 'unknown')
            access_count = memory.get('access_count', 0)
            type_access[memory_type] = type_access.get(memory_type, 0) + access_count

        patterns['most_accessed_types'] = sorted(
            type_access.items(),
            key=lambda x: x[1],
            reverse=True
        )

        # Recent activity (last 7 days)
        recent_cutoff = datetime.now() - timedelta(days=7)
        recent_memories = [m for m in memories
                          if m.get('created_at') and m['created_at'] > recent_cutoff]

        patterns['recent_activity'] = {
            'new_memories': len(recent_memories),
            'most_active_types': self._analyze_recent_types(recent_memories)
        }

        # Learning indicators
        patterns['learning_indicators'] = {
            'procedure_evolution': await self._analyze_procedure_evolution(),
            'preference_updates': await self._analyze_preference_updates(),
            'context_growth': await self._analyze_context_growth()
        }

        return patterns

    def _get_size_bucket(self, size_bytes):
        """Categorize memory size into buckets"""
        if size_bytes < 1024:  # < 1KB
            return 'small'
        elif size_bytes < 10240:  # < 10KB
            return 'medium'
        elif size_bytes < 102400:  # < 100KB
            return 'large'
        else:
            return 'xl'

    def _get_age_bucket(self, age_days):
        """Categorize memory age into buckets"""
        if age_days < 1:
            return 'today'
        elif age_days < 7:
            return 'this_week'
        elif age_days < 30:
            return 'this_month'
        elif age_days < 90:
            return 'this_quarter'
        else:
            return 'older'

    def _get_access_bucket(self, access_count):
        """Categorize access frequency into buckets"""
        if access_count == 0:
            return 'never'
        elif access_count < 5:
            return 'rarely'
        elif access_count < 20:
            return 'occasionally'
        elif access_count < 100:
            return 'frequently'
        else:
            return 'very_frequently'

Memory Security and Privacy

Privacy-Compliant Memory Management

class PrivacyCompliantMemory:
    """Memory management with privacy and compliance features"""

    def __init__(self, compliance_mode='gdpr'):
        self.compliance_mode = compliance_mode
        self.pii_detector = PIIDetector()
        self.anonymizer = MemoryAnonymizer()

    async def store_privacy_compliant_memory(self, content, tags=None, retention_policy=None):
        """Store memory with privacy compliance"""

        # Detect and handle PII
        pii_analysis = await self.pii_detector.analyze(content)

        if pii_analysis['contains_pii']:
            if self.compliance_mode in ['gdpr', 'ccpa']:
                # Anonymize PII
                anonymized_content = await self.anonymizer.anonymize(content, pii_analysis)

                # Store original PII mapping separately (encrypted)
                pii_mapping_id = await self._store_encrypted_pii_mapping(
                    content, anonymized_content, pii_analysis
                )

                content = anonymized_content
                tags = (tags or []) + ['anonymized', f'pii_mapping:{pii_mapping_id}']

        # Apply retention policy
        if not retention_policy:
            retention_policy = self._get_default_retention_policy(self.compliance_mode)

        memory_metadata = {
            'compliance_mode': self.compliance_mode,
            'retention_policy': retention_policy,
            'pii_detected': pii_analysis['contains_pii'],
            'anonymized': pii_analysis['contains_pii'],
            'created_at': datetime.now()
        }

        return await self.store_memory(
            content=content,
            tags=tags,
            metadata=memory_metadata
        )

    async def handle_data_deletion_request(self, subject_identifier):
        """Handle right-to-be-forgotten requests"""

        # Find all memories related to subject
        related_memories = await self._find_memories_by_subject(subject_identifier)

        deletion_results = {
            'memories_found': len(related_memories),
            'memories_deleted': 0,
            'errors': []
        }

        for memory in related_memories:
            try:
                # Delete memory
                await self.delete_memory(memory['key'])

                # Delete associated PII mapping if exists
                if 'pii_mapping:' in str(memory.get('tags', [])):
                    mapping_id = self._extract_pii_mapping_id(memory['tags'])
                    await self._delete_pii_mapping(mapping_id)

                deletion_results['memories_deleted'] += 1

            except Exception as e:
                deletion_results['errors'].append(str(e))

        # Log deletion for audit trail
        await self._log_deletion_request(subject_identifier, deletion_results)

        return deletion_results

    def _get_default_retention_policy(self, compliance_mode):
        """Get default retention policy based on compliance requirements"""
        policies = {
            'gdpr': {
                'default_retention_days': 1095,  # 3 years
                'sensitive_retention_days': 365,  # 1 year
                'automatic_deletion': True
            },
            'ccpa': {
                'default_retention_days': 1095,  # 3 years
                'sensitive_retention_days': 730,  # 2 years
                'automatic_deletion': True
            },
            'hipaa': {
                'default_retention_days': 2190,  # 6 years
                'sensitive_retention_days': 2190,
                'automatic_deletion': False  # Manual review required
            }
        }

        return policies.get(compliance_mode, policies['gdpr'])

class PIIDetector:
    """Detect personally identifiable information in content"""

    def __init__(self):
        self.pii_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}-\d{3}-\d{4}\b|\b\(\d{3}\)\s*\d{3}-\d{4}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
        }

    async def analyze(self, content):
        """Analyze content for PII"""
        import re

        detected_pii = {}
        content_str = str(content)

        for pii_type, pattern in self.pii_patterns.items():
            matches = re.findall(pattern, content_str)
            if matches:
                detected_pii[pii_type] = matches

        return {
            'contains_pii': bool(detected_pii),
            'detected_types': list(detected_pii.keys()),
            'detected_values': detected_pii
        }

Integration with OpenClaw Skills

Memory-Enhanced Skills

class MemoryEnhancedSkill(Skill):
    """Base class for skills that leverage memory"""

    def __init__(self, agent):
        super().__init__(agent)
        self.memory = agent.memory

    async def execute_with_memory(self, **kwargs):
        """Execute skill with memory integration"""

        # Recall relevant context
        context = await self.recall_relevant_context(kwargs)

        # Execute core skill logic with context
        result = await self.execute_core_logic(context, **kwargs)

        # Store execution result in memory
        await self.store_execution_memory(kwargs, result)

        # Learn from execution
        await self.learn_from_execution(kwargs, result, context)

        return result

    async def recall_relevant_context(self, execution_params):
        """Recall memories relevant to current execution"""

        # Build search query from execution parameters
        search_tags = self._extract_relevant_tags(execution_params)

        # Recall memories with associative expansion
        relevant_memories = await self.memory.recall_with_associations(
            query=str(execution_params),
            tags=search_tags,
            max_depth=2
        )

        # Build context object
        context = {
            'relevant_memories': relevant_memories,
            'execution_history': await self._get_execution_history(),
            'learned_patterns': await self._get_learned_patterns(),
            'user_preferences': await self._get_user_preferences()
        }

        return context

    async def store_execution_memory(self, params, result):
        """Store memory of skill execution"""

        execution_memory = {
            'skill_name': self.name,
            'parameters': params,
            'result': result,
            'success': result.get('status') == 'success',
            'execution_time': result.get('execution_time'),
            'context_used': bool(params.get('context'))
        }

        await self.memory.store_memory(
            memory_key=f"execution_{self.name}_{datetime.now().timestamp()}",
            content=execution_memory,
            memory_type='episodic',
            tags=[self.name, 'execution', result.get('status', 'unknown')]
        )

    async def learn_from_execution(self, params, result, context):
        """Learn patterns from execution results"""

        # Analyze execution success patterns
        if result.get('status') == 'success':
            await self._learn_success_patterns(params, result, context)
        else:
            await self._learn_failure_patterns(params, result, context)

        # Update user preference models
        await self._update_preference_models(params, result)

        # Improve procedural knowledge
        await self._improve_procedural_knowledge(params, result)

    async def _learn_success_patterns(self, params, result, context):
        """Learn from successful executions"""

        # Find similar successful executions
        similar_successes = await self.memory.recall_memories(
            tags=[self.name, 'execution', 'success'],
            limit=10
        )

        # Extract common patterns
        success_patterns = await self._extract_patterns(similar_successes + [result])

        # Store learned patterns
        await self.memory.store_memory(
            memory_key=f"success_patterns_{self.name}",
            content=success_patterns,
            memory_type='semantic',
            tags=[self.name, 'patterns', 'success']
        )

Troubleshooting Memory Issues

Common Memory Problems and Solutions

Problem: Memory Growth Too Large

# Solution: Implement memory cleanup scheduler
@scheduled_task(interval='daily')
async def cleanup_memory():
    optimizer = MemoryOptimizer(agent.memory)
    results = await optimizer.run_optimization_cycle()
    logging.info(f"Memory optimization results: {results}")

Problem: Slow Memory Recall

# Solution: Add memory indices and caching
class IndexedMemory(BaseMemory):
    def __init__(self):
        super().__init__()
        self.tag_index = {}
        self.content_index = {}
        self.cache = LRUCache(maxsize=1000)

    async def recall_memories(self, tags=None, **kwargs):
        cache_key = f"recall_{hash(str(tags))}_{hash(str(kwargs))}"

        if cache_key in self.cache:
            return self.cache[cache_key]

        # Use indices for faster lookup
        if tags:
            candidate_memories = self._get_memories_by_tags(tags)
        else:
            candidate_memories = await super().recall_memories(**kwargs)

        self.cache[cache_key] = candidate_memories
        return candidate_memories

Problem: Memory Corruption

# Solution: Implement memory validation and backup
class ValidatedMemory(BaseMemory):
    async def store_memory(self, memory_key, content, **kwargs):
        # Validate content before storage
        if not self._validate_memory_content(content):
            raise ValueError(f"Invalid memory content for key: {memory_key}")

        # Create backup before modification
        await self._create_backup(memory_key)

        # Store with checksum
        content_with_checksum = {
            'content': content,
            'checksum': self._calculate_checksum(content),
            'timestamp': datetime.now()
        }

        return await super().store_memory(memory_key, content_with_checksum, **kwargs)

    def _validate_memory_content(self, content):
        # Implement validation logic
        if not isinstance(content, (dict, list, str, int, float)):
            return False

        # Check for required fields, size limits, etc.
        return True

Why OpenClaw Memory Matters

OpenClaw's memory system transforms simple automation into intelligent assistance:

Contextual Awareness: Agents understand the full context of your work and relationships Continuous Learning: Every interaction improves future performance Personalization: Agents adapt to your specific preferences and patterns Institutional Knowledge: Team knowledge persists beyond individual conversations

For teams wanting intelligent AI automation without the complexity of memory management, consider MrDelegate — offering similar AI-powered capabilities with managed memory systems and automatic learning.

Start your free trial to experience AI agents that remember, learn, and improve with every interaction.


Advanced Memory Applications

With proper memory configuration, your OpenClaw agents become genuinely intelligent assistants that:

  • Remember your preferences and adapt their communication style
  • Learn from your decisions to make better suggestions over time
  • Build comprehensive context about your projects and relationships
  • Improve their skills through continuous feedback and experience
  • Maintain consistency across long-term interactions and projects

Memory isn't just storage — it's the foundation of AI intelligence that grows with your needs.

Free 3-day trial

Your AI executive assistant is ready.

Morning brief at 7am. Inbox triaged overnight. Calendar protected. Dedicated VPS. No Docker. Live in 60 seconds.

Start free trial → $0 today · $47/mo after 3 days · Cancel anytime

Ready to delegate your inbox?

3-day free trial. No charge today. Live in 60 seconds.

Start your trial →