Memory is what separates intelligent AI agents from simple chatbots. OpenClaw's memory system allows your agents to remember conversations, learn from decisions, and build context that improves their performance over time.
This guide covers everything from basic memory configuration to advanced patterns for building agents that genuinely get smarter with experience.
Understanding OpenClaw Memory Architecture
OpenClaw implements a multi-layered memory system that mirrors how humans store and recall information:
Short-term Memory: Current conversation context and immediate working data Long-term Memory: Persistent knowledge, preferences, and learned patterns Episodic Memory: Specific events, interactions, and their outcomes Semantic Memory: General facts, relationships, and procedural knowledge Working Memory: Active processing space for complex operations
This architecture enables agents to maintain context across sessions, learn from experience, and make increasingly intelligent decisions.
Memory Configuration Basics
Core Memory Settings
Configure memory in ~/.openclaw/config.yaml:
# Memory Configuration
memory:
# Storage backend
backend: "file" # file, redis, postgresql, or mongodb
storage_path: "~/.openclaw/memory"
# Memory limits and optimization
max_memory_mb: 1024 # 1GB memory limit
cleanup_interval_hours: 24
compression_enabled: true
# Retention policies
retention:
short_term_hours: 24
episodic_days: 30
semantic_days: 365
working_memory_minutes: 60
# Learning settings
learning:
enabled: true
confidence_threshold: 0.7
pattern_detection: true
preference_learning: true
# Privacy and security
privacy:
encrypt_memories: true
anonymize_pii: true
retention_compliance: "gdpr" # gdpr, ccpa, or custom
Memory Storage Backends
File-based Storage (Default):
memory:
backend: file
storage_path: ~/.openclaw/memory
backup_enabled: true
backup_interval_hours: 6
Redis Backend (Recommended for Production):
memory:
backend: redis
redis:
host: localhost
port: 6379
db: 0
password: your_redis_password
ssl: true
PostgreSQL Backend (Enterprise):
memory:
backend: postgresql
postgresql:
host: localhost
port: 5432
database: openclaw_memory
username: openclaw
password: secure_password
ssl_mode: require
Types of Memory in OpenClaw
1. Conversational Memory
Tracks ongoing conversations and context across sessions.
Example Memory Structure:
from openclaw.memory import ConversationMemory
class EmailAssistantMemory(ConversationMemory):
"""Memory specifically for email assistance"""
def __init__(self, agent_id):
super().__init__(agent_id, memory_type="conversation")
self.email_context = {}
self.user_preferences = {}
async def store_email_interaction(self, email_data, user_action, outcome):
"""Store email handling patterns"""
interaction = {
'timestamp': datetime.now(),
'email_type': email_data.get('type'),
'sender_domain': self._extract_domain(email_data['from']),
'user_action': user_action, # 'archive', 'reply', 'forward', etc.
'outcome': outcome, # 'successful', 'needs_revision', etc.
'response_time': email_data.get('response_time')
}
await self.store_memory(
memory_key=f"email_interaction_{datetime.now().timestamp()}",
content=interaction,
tags=['email', 'interaction', email_data.get('type')]
)
async def learn_email_patterns(self):
"""Analyze stored interactions to learn patterns"""
interactions = await self.recall_memories(tags=['email', 'interaction'])
# Analyze patterns
patterns = {
'priority_senders': self._identify_priority_senders(interactions),
'action_patterns': self._analyze_action_patterns(interactions),
'optimal_response_times': self._calculate_response_times(interactions)
}
# Store learned patterns
await self.store_memory(
memory_key="learned_email_patterns",
content=patterns,
memory_type="semantic"
)
return patterns
def _identify_priority_senders(self, interactions):
"""Identify senders that typically require quick responses"""
sender_stats = {}
for interaction in interactions:
sender = interaction['sender_domain']
action = interaction['user_action']
if sender not in sender_stats:
sender_stats[sender] = {'quick_responses': 0, 'total': 0}
sender_stats[sender]['total'] += 1
if interaction.get('response_time', 0) < 3600: # < 1 hour
sender_stats[sender]['quick_responses'] += 1
# Calculate priority scores
priority_senders = []
for sender, stats in sender_stats.items():
if stats['total'] >= 3: # Minimum interactions
priority_score = stats['quick_responses'] / stats['total']
if priority_score > 0.7: # 70% quick response rate
priority_senders.append({
'sender': sender,
'priority_score': priority_score,
'sample_size': stats['total']
})
return sorted(priority_senders, key=lambda x: x['priority_score'], reverse=True)
2. Procedural Memory
Stores learned procedures and workflows.
class ProcedureMemory:
"""Stores and recalls procedural knowledge"""
async def store_procedure(self, procedure_name, steps, success_rate=None):
"""Store a workflow procedure"""
procedure = {
'name': procedure_name,
'steps': steps,
'success_rate': success_rate,
'created_at': datetime.now(),
'usage_count': 0,
'last_used': None
}
await self.store_memory(
memory_key=f"procedure_{procedure_name}",
content=procedure,
memory_type="procedural",
tags=['procedure', 'workflow']
)
async def recall_procedure(self, procedure_name):
"""Recall and update procedure usage stats"""
procedure = await self.get_memory(f"procedure_{procedure_name}")
if procedure:
# Update usage statistics
procedure['usage_count'] += 1
procedure['last_used'] = datetime.now()
await self.store_memory(
memory_key=f"procedure_{procedure_name}",
content=procedure,
memory_type="procedural"
)
return procedure
async def optimize_procedure(self, procedure_name, execution_result):
"""Learn from procedure execution to optimize it"""
procedure = await self.get_memory(f"procedure_{procedure_name}")
if not procedure:
return
# Analyze execution result
if execution_result.get('success'):
# Increase confidence in successful steps
procedure['success_rate'] = (
(procedure.get('success_rate', 0.5) * procedure['usage_count'] + 1) /
(procedure['usage_count'] + 1)
)
else:
# Identify failed steps and suggest improvements
failed_step = execution_result.get('failed_step')
if failed_step:
# Store failure analysis
await self.store_memory(
memory_key=f"procedure_failure_{procedure_name}_{datetime.now().timestamp()}",
content={
'procedure': procedure_name,
'failed_step': failed_step,
'error': execution_result.get('error'),
'suggested_improvement': execution_result.get('suggestion')
},
memory_type="episodic",
tags=['procedure', 'failure', 'learning']
)
await self.store_memory(
memory_key=f"procedure_{procedure_name}",
content=procedure,
memory_type="procedural"
)
3. Contextual Memory
Maintains context about ongoing projects, relationships, and states.
class ContextualMemory:
"""Manages contextual information about projects, people, and situations"""
async def store_project_context(self, project_id, context_data):
"""Store project-specific context"""
context = {
'project_id': project_id,
'stakeholders': context_data.get('stakeholders', []),
'current_phase': context_data.get('phase'),
'key_decisions': context_data.get('decisions', []),
'blockers': context_data.get('blockers', []),
'communication_patterns': context_data.get('communication', {}),
'updated_at': datetime.now()
}
await self.store_memory(
memory_key=f"project_context_{project_id}",
content=context,
memory_type="contextual",
tags=['project', project_id, context_data.get('phase', '')]
)
async def get_project_context(self, project_id):
"""Retrieve comprehensive project context"""
# Get base context
context = await self.get_memory(f"project_context_{project_id}")
if not context:
return None
# Enrich with related memories
related_memories = await self.recall_memories(
tags=[project_id],
memory_type="episodic",
limit=20
)
context['recent_activities'] = related_memories
context['last_accessed'] = datetime.now()
return context
async def update_person_context(self, person_id, interaction_data):
"""Update context about a person based on new interactions"""
existing_context = await self.get_memory(f"person_context_{person_id}") or {}
# Merge new interaction data
updated_context = {
'person_id': person_id,
'communication_style': self._analyze_communication_style(
existing_context.get('interactions', []) + [interaction_data]
),
'preferences': self._extract_preferences(interaction_data),
'expertise_areas': self._identify_expertise(interaction_data),
'availability_patterns': self._analyze_availability(interaction_data),
'last_interaction': datetime.now(),
'interaction_count': existing_context.get('interaction_count', 0) + 1
}
await self.store_memory(
memory_key=f"person_context_{person_id}",
content=updated_context,
memory_type="contextual",
tags=['person', person_id, 'relationship']
)
Advanced Memory Patterns
1. Hierarchical Memory Organization
Organize memories in hierarchical structures for better recall and organization.
class HierarchicalMemory:
"""Hierarchical memory organization for complex knowledge structures"""
def __init__(self):
self.memory_tree = {
'root': {
'projects': {},
'people': {},
'procedures': {},
'knowledge': {}
}
}
async def store_hierarchical_memory(self, path, content, metadata=None):
"""Store memory in hierarchical structure"""
path_parts = path.split('.')
current_node = self.memory_tree['root']
# Navigate to parent node
for part in path_parts[:-1]:
if part not in current_node:
current_node[part] = {}
current_node = current_node[part]
# Store memory at final location
memory_item = {
'content': content,
'metadata': metadata or {},
'created_at': datetime.now(),
'path': path
}
current_node[path_parts[-1]] = memory_item
# Also store in flat structure for search
await self.store_memory(
memory_key=f"hierarchical_{path}",
content=memory_item,
tags=path_parts + ['hierarchical']
)
async def recall_hierarchical_memory(self, path):
"""Recall memory from hierarchical path"""
return await self.get_memory(f"hierarchical_{path}")
async def get_memory_subtree(self, path):
"""Get entire subtree of memories"""
path_parts = path.split('.') if path else []
memories = await self.recall_memories(
tags=path_parts + ['hierarchical'],
limit=1000
)
# Build subtree from flat results
subtree = {}
for memory in memories:
memory_path = memory['content']['path']
if memory_path.startswith(path):
relative_path = memory_path[len(path):].strip('.')
if relative_path:
self._insert_into_subtree(subtree, relative_path, memory['content'])
return subtree
def _insert_into_subtree(self, tree, path, content):
"""Helper to insert memory into subtree structure"""
parts = path.split('.')
current = tree
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = content
2. Associative Memory Networks
Build networks of related memories for intelligent recall.
class AssociativeMemory:
"""Memory system that builds associations between related concepts"""
def __init__(self):
self.associations = {} # memory_id -> [related_memory_ids]
self.concept_vectors = {} # memory_id -> embedding vector
async def store_associated_memory(self, content, tags=None, related_to=None):
"""Store memory and build associations"""
memory_id = f"memory_{datetime.now().timestamp()}"
# Store base memory
await self.store_memory(
memory_key=memory_id,
content=content,
tags=tags or []
)
# Generate embedding for content
embedding = await self._generate_embedding(content)
self.concept_vectors[memory_id] = embedding
# Build associations
if related_to:
# Explicit associations
self._add_association(memory_id, related_to)
else:
# Find similar memories through embedding similarity
similar_memories = await self._find_similar_memories(embedding, threshold=0.7)
for similar_id, similarity in similar_memories:
self._add_association(memory_id, similar_id)
return memory_id
async def recall_with_associations(self, query, max_depth=3):
"""Recall memories and their associations"""
# Find initial matches
query_embedding = await self._generate_embedding(query)
initial_matches = await self._find_similar_memories(query_embedding)
# Expand through associations
expanded_results = {}
for memory_id, relevance in initial_matches:
expanded_results[memory_id] = {
'memory': await self.get_memory(memory_id),
'relevance': relevance,
'depth': 0
}
# Follow associations
await self._expand_associations(
memory_id, expanded_results, current_depth=0, max_depth=max_depth
)
return expanded_results
async def _expand_associations(self, memory_id, results, current_depth, max_depth):
"""Recursively expand through memory associations"""
if current_depth >= max_depth:
return
associated_ids = self.associations.get(memory_id, [])
for assoc_id in associated_ids:
if assoc_id not in results:
results[assoc_id] = {
'memory': await self.get_memory(assoc_id),
'relevance': 0.8 - (current_depth * 0.2), # Decay relevance by depth
'depth': current_depth + 1
}
# Continue expansion
await self._expand_associations(
assoc_id, results, current_depth + 1, max_depth
)
def _add_association(self, memory1_id, memory2_id):
"""Add bidirectional association between memories"""
if memory1_id not in self.associations:
self.associations[memory1_id] = []
if memory2_id not in self.associations:
self.associations[memory2_id] = []
if memory2_id not in self.associations[memory1_id]:
self.associations[memory1_id].append(memory2_id)
if memory1_id not in self.associations[memory2_id]:
self.associations[memory2_id].append(memory1_id)
async def _generate_embedding(self, text):
"""Generate embedding vector for text content"""
# Use OpenAI embeddings or local model
import openai
response = await openai.Embedding.acreate(
model="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
async def _find_similar_memories(self, query_embedding, threshold=0.6):
"""Find memories similar to query embedding"""
similarities = []
for memory_id, memory_embedding in self.concept_vectors.items():
similarity = self._cosine_similarity(query_embedding, memory_embedding)
if similarity >= threshold:
similarities.append((memory_id, similarity))
return sorted(similarities, key=lambda x: x[1], reverse=True)
def _cosine_similarity(self, vec1, vec2):
"""Calculate cosine similarity between two vectors"""
import numpy as np
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
3. Temporal Memory Management
Manage memories based on time and relevance decay.
class TemporalMemory:
"""Time-aware memory management with relevance decay"""
def __init__(self):
self.decay_functions = {
'exponential': self._exponential_decay,
'linear': self._linear_decay,
'stepped': self._stepped_decay
}
async def store_temporal_memory(self, content, importance_score=0.5, decay_function='exponential'):
"""Store memory with temporal characteristics"""
memory_item = {
'content': content,
'created_at': datetime.now(),
'importance_score': importance_score,
'decay_function': decay_function,
'access_count': 0,
'last_accessed': datetime.now()
}
memory_id = f"temporal_{datetime.now().timestamp()}"
await self.store_memory(memory_key=memory_id, content=memory_item)
return memory_id
async def recall_temporal_memories(self, query, time_weight=0.3):
"""Recall memories considering temporal relevance"""
all_memories = await self.recall_memories(limit=1000)
current_time = datetime.now()
scored_memories = []
for memory in all_memories:
memory_data = memory['content']
created_at = memory_data.get('created_at')
if not created_at:
continue
# Calculate temporal relevance
age_seconds = (current_time - created_at).total_seconds()
temporal_relevance = self._calculate_temporal_relevance(
age_seconds,
memory_data.get('importance_score', 0.5),
memory_data.get('decay_function', 'exponential'),
memory_data.get('access_count', 0)
)
# Calculate content relevance (simplified - in practice, use embeddings)
content_relevance = self._calculate_content_relevance(
query, memory_data['content']
)
# Combine scores
total_score = (
(1 - time_weight) * content_relevance +
time_weight * temporal_relevance
)
scored_memories.append({
'memory': memory,
'score': total_score,
'temporal_relevance': temporal_relevance,
'content_relevance': content_relevance
})
# Sort by total score and update access patterns
scored_memories.sort(key=lambda x: x['score'], reverse=True)
# Update access count for returned memories
for scored_memory in scored_memories[:10]: # Top 10
await self._update_access_pattern(scored_memory['memory'])
return scored_memories
def _calculate_temporal_relevance(self, age_seconds, importance, decay_function, access_count):
"""Calculate how relevant a memory is based on time"""
decay_func = self.decay_functions.get(decay_function, self._exponential_decay)
# Base decay
base_relevance = decay_func(age_seconds, importance)
# Boost for frequently accessed memories
access_boost = min(0.3, access_count * 0.05) # Max 30% boost
return min(1.0, base_relevance + access_boost)
def _exponential_decay(self, age_seconds, importance):
"""Exponential decay function"""
import math
half_life = 86400 * 7 * importance # Week * importance as half-life
return math.exp(-0.693 * age_seconds / half_life)
def _linear_decay(self, age_seconds, importance):
"""Linear decay function"""
max_age = 86400 * 30 * importance # Month * importance
return max(0, 1 - (age_seconds / max_age))
def _stepped_decay(self, age_seconds, importance):
"""Stepped decay function"""
age_days = age_seconds / 86400
if age_days <= 1:
return 1.0 * importance
elif age_days <= 7:
return 0.8 * importance
elif age_days <= 30:
return 0.5 * importance
elif age_days <= 90:
return 0.2 * importance
else:
return 0.1 * importance
async def _update_access_pattern(self, memory):
"""Update memory access patterns"""
memory_data = memory['content']
memory_data['access_count'] += 1
memory_data['last_accessed'] = datetime.now()
await self.store_memory(
memory_key=memory['key'],
content=memory_data
)
Memory Optimization and Maintenance
Automatic Memory Cleanup
class MemoryOptimizer:
"""Optimizes memory usage and maintains memory health"""
async def run_optimization_cycle(self):
"""Run complete memory optimization"""
optimization_results = {}
# 1. Cleanup expired memories
optimization_results['expired_cleanup'] = await self._cleanup_expired_memories()
# 2. Compress old memories
optimization_results['compression'] = await self._compress_old_memories()
# 3. Merge duplicate memories
optimization_results['deduplication'] = await self._deduplicate_memories()
# 4. Rebuild search indices
optimization_results['indexing'] = await self._rebuild_search_indices()
# 5. Update memory statistics
optimization_results['statistics'] = await self._update_memory_statistics()
return optimization_results
async def _cleanup_expired_memories(self):
"""Remove memories past their retention period"""
current_time = datetime.now()
cleanup_counts = {}
# Get retention policies from config
retention_policies = {
'short_term': timedelta(hours=24),
'working': timedelta(hours=1),
'episodic': timedelta(days=30),
'semantic': timedelta(days=365)
}
for memory_type, retention_period in retention_policies.items():
cutoff_time = current_time - retention_period
expired_memories = await self.get_memories_before(cutoff_time, memory_type)
for memory in expired_memories:
# Check if memory should be preserved (high importance, frequent access)
if not await self._should_preserve_memory(memory):
await self.delete_memory(memory['key'])
cleanup_counts[memory_type] = len(expired_memories)
return cleanup_counts
async def _compress_old_memories(self):
"""Compress memories older than threshold"""
compress_threshold = datetime.now() - timedelta(days=7)
old_memories = await self.get_memories_before(compress_threshold)
compression_results = {'compressed': 0, 'space_saved': 0}
for memory in old_memories:
if memory.get('compressed'):
continue # Already compressed
# Compress memory content
original_size = len(str(memory['content']))
compressed_content = await self._compress_content(memory['content'])
compressed_size = len(str(compressed_content))
# Update memory with compressed version
memory['content'] = compressed_content
memory['compressed'] = True
memory['original_size'] = original_size
await self.store_memory(memory['key'], memory)
compression_results['compressed'] += 1
compression_results['space_saved'] += original_size - compressed_size
return compression_results
async def _deduplicate_memories(self):
"""Find and merge duplicate or very similar memories"""
all_memories = await self.recall_memories(limit=10000)
duplicates_found = 0
duplicates_merged = 0
# Group memories by similarity
similarity_groups = {}
for i, memory1 in enumerate(all_memories):
for j, memory2 in enumerate(all_memories[i+1:], i+1):
similarity = await self._calculate_memory_similarity(memory1, memory2)
if similarity > 0.85: # 85% similarity threshold
group_key = f"{min(i,j)}_{max(i,j)}"
similarity_groups[group_key] = [memory1, memory2, similarity]
# Merge similar memories
for group_key, (memory1, memory2, similarity) in similarity_groups.items():
merged_memory = await self._merge_memories(memory1, memory2)
# Delete original memories and store merged version
await self.delete_memory(memory1['key'])
await self.delete_memory(memory2['key'])
merged_key = f"merged_{datetime.now().timestamp()}"
await self.store_memory(merged_key, merged_memory)
duplicates_found += 2
duplicates_merged += 1
return {
'duplicates_found': duplicates_found,
'duplicates_merged': duplicates_merged,
'space_saved': duplicates_found - duplicates_merged
}
async def _should_preserve_memory(self, memory):
"""Determine if a memory should be preserved despite age"""
# High importance score
if memory.get('importance_score', 0) > 0.8:
return True
# High access count
if memory.get('access_count', 0) > 10:
return True
# Recent access
last_accessed = memory.get('last_accessed')
if last_accessed and (datetime.now() - last_accessed).days < 7:
return True
# Contains important tags
important_tags = ['critical', 'procedure', 'key_decision']
memory_tags = memory.get('tags', [])
if any(tag in memory_tags for tag in important_tags):
return True
return False
Memory Analytics and Insights
class MemoryAnalytics:
"""Analyze memory usage patterns and provide insights"""
async def generate_memory_report(self):
"""Generate comprehensive memory analytics report"""
report = {}
# Basic statistics
report['statistics'] = await self._get_memory_statistics()
# Usage patterns
report['usage_patterns'] = await self._analyze_usage_patterns()
# Memory health
report['health'] = await self._assess_memory_health()
# Optimization suggestions
report['suggestions'] = await self._generate_optimization_suggestions()
return report
async def _get_memory_statistics(self):
"""Get basic memory usage statistics"""
all_memories = await self.recall_memories(limit=100000)
stats = {
'total_memories': len(all_memories),
'memory_types': {},
'size_distribution': {},
'age_distribution': {},
'access_patterns': {}
}
current_time = datetime.now()
for memory in all_memories:
# Memory type distribution
memory_type = memory.get('memory_type', 'unknown')
stats['memory_types'][memory_type] = stats['memory_types'].get(memory_type, 0) + 1
# Size distribution
content_size = len(str(memory.get('content', '')))
size_bucket = self._get_size_bucket(content_size)
stats['size_distribution'][size_bucket] = stats['size_distribution'].get(size_bucket, 0) + 1
# Age distribution
created_at = memory.get('created_at')
if created_at:
age_days = (current_time - created_at).days
age_bucket = self._get_age_bucket(age_days)
stats['age_distribution'][age_bucket] = stats['age_distribution'].get(age_bucket, 0) + 1
# Access patterns
access_count = memory.get('access_count', 0)
access_bucket = self._get_access_bucket(access_count)
stats['access_patterns'][access_bucket] = stats['access_patterns'].get(access_bucket, 0) + 1
return stats
async def _analyze_usage_patterns(self):
"""Analyze how memory is being used"""
memories = await self.recall_memories(limit=10000)
patterns = {
'most_accessed_types': {},
'recent_activity': {},
'learning_indicators': {},
'efficiency_metrics': {}
}
# Most accessed memory types
type_access = {}
for memory in memories:
memory_type = memory.get('memory_type', 'unknown')
access_count = memory.get('access_count', 0)
type_access[memory_type] = type_access.get(memory_type, 0) + access_count
patterns['most_accessed_types'] = sorted(
type_access.items(),
key=lambda x: x[1],
reverse=True
)
# Recent activity (last 7 days)
recent_cutoff = datetime.now() - timedelta(days=7)
recent_memories = [m for m in memories
if m.get('created_at') and m['created_at'] > recent_cutoff]
patterns['recent_activity'] = {
'new_memories': len(recent_memories),
'most_active_types': self._analyze_recent_types(recent_memories)
}
# Learning indicators
patterns['learning_indicators'] = {
'procedure_evolution': await self._analyze_procedure_evolution(),
'preference_updates': await self._analyze_preference_updates(),
'context_growth': await self._analyze_context_growth()
}
return patterns
def _get_size_bucket(self, size_bytes):
"""Categorize memory size into buckets"""
if size_bytes < 1024: # < 1KB
return 'small'
elif size_bytes < 10240: # < 10KB
return 'medium'
elif size_bytes < 102400: # < 100KB
return 'large'
else:
return 'xl'
def _get_age_bucket(self, age_days):
"""Categorize memory age into buckets"""
if age_days < 1:
return 'today'
elif age_days < 7:
return 'this_week'
elif age_days < 30:
return 'this_month'
elif age_days < 90:
return 'this_quarter'
else:
return 'older'
def _get_access_bucket(self, access_count):
"""Categorize access frequency into buckets"""
if access_count == 0:
return 'never'
elif access_count < 5:
return 'rarely'
elif access_count < 20:
return 'occasionally'
elif access_count < 100:
return 'frequently'
else:
return 'very_frequently'
Memory Security and Privacy
Privacy-Compliant Memory Management
class PrivacyCompliantMemory:
"""Memory management with privacy and compliance features"""
def __init__(self, compliance_mode='gdpr'):
self.compliance_mode = compliance_mode
self.pii_detector = PIIDetector()
self.anonymizer = MemoryAnonymizer()
async def store_privacy_compliant_memory(self, content, tags=None, retention_policy=None):
"""Store memory with privacy compliance"""
# Detect and handle PII
pii_analysis = await self.pii_detector.analyze(content)
if pii_analysis['contains_pii']:
if self.compliance_mode in ['gdpr', 'ccpa']:
# Anonymize PII
anonymized_content = await self.anonymizer.anonymize(content, pii_analysis)
# Store original PII mapping separately (encrypted)
pii_mapping_id = await self._store_encrypted_pii_mapping(
content, anonymized_content, pii_analysis
)
content = anonymized_content
tags = (tags or []) + ['anonymized', f'pii_mapping:{pii_mapping_id}']
# Apply retention policy
if not retention_policy:
retention_policy = self._get_default_retention_policy(self.compliance_mode)
memory_metadata = {
'compliance_mode': self.compliance_mode,
'retention_policy': retention_policy,
'pii_detected': pii_analysis['contains_pii'],
'anonymized': pii_analysis['contains_pii'],
'created_at': datetime.now()
}
return await self.store_memory(
content=content,
tags=tags,
metadata=memory_metadata
)
async def handle_data_deletion_request(self, subject_identifier):
"""Handle right-to-be-forgotten requests"""
# Find all memories related to subject
related_memories = await self._find_memories_by_subject(subject_identifier)
deletion_results = {
'memories_found': len(related_memories),
'memories_deleted': 0,
'errors': []
}
for memory in related_memories:
try:
# Delete memory
await self.delete_memory(memory['key'])
# Delete associated PII mapping if exists
if 'pii_mapping:' in str(memory.get('tags', [])):
mapping_id = self._extract_pii_mapping_id(memory['tags'])
await self._delete_pii_mapping(mapping_id)
deletion_results['memories_deleted'] += 1
except Exception as e:
deletion_results['errors'].append(str(e))
# Log deletion for audit trail
await self._log_deletion_request(subject_identifier, deletion_results)
return deletion_results
def _get_default_retention_policy(self, compliance_mode):
"""Get default retention policy based on compliance requirements"""
policies = {
'gdpr': {
'default_retention_days': 1095, # 3 years
'sensitive_retention_days': 365, # 1 year
'automatic_deletion': True
},
'ccpa': {
'default_retention_days': 1095, # 3 years
'sensitive_retention_days': 730, # 2 years
'automatic_deletion': True
},
'hipaa': {
'default_retention_days': 2190, # 6 years
'sensitive_retention_days': 2190,
'automatic_deletion': False # Manual review required
}
}
return policies.get(compliance_mode, policies['gdpr'])
class PIIDetector:
"""Detect personally identifiable information in content"""
def __init__(self):
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}-\d{3}-\d{4}\b|\b\(\d{3}\)\s*\d{3}-\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
}
async def analyze(self, content):
"""Analyze content for PII"""
import re
detected_pii = {}
content_str = str(content)
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, content_str)
if matches:
detected_pii[pii_type] = matches
return {
'contains_pii': bool(detected_pii),
'detected_types': list(detected_pii.keys()),
'detected_values': detected_pii
}
Integration with OpenClaw Skills
Memory-Enhanced Skills
class MemoryEnhancedSkill(Skill):
"""Base class for skills that leverage memory"""
def __init__(self, agent):
super().__init__(agent)
self.memory = agent.memory
async def execute_with_memory(self, **kwargs):
"""Execute skill with memory integration"""
# Recall relevant context
context = await self.recall_relevant_context(kwargs)
# Execute core skill logic with context
result = await self.execute_core_logic(context, **kwargs)
# Store execution result in memory
await self.store_execution_memory(kwargs, result)
# Learn from execution
await self.learn_from_execution(kwargs, result, context)
return result
async def recall_relevant_context(self, execution_params):
"""Recall memories relevant to current execution"""
# Build search query from execution parameters
search_tags = self._extract_relevant_tags(execution_params)
# Recall memories with associative expansion
relevant_memories = await self.memory.recall_with_associations(
query=str(execution_params),
tags=search_tags,
max_depth=2
)
# Build context object
context = {
'relevant_memories': relevant_memories,
'execution_history': await self._get_execution_history(),
'learned_patterns': await self._get_learned_patterns(),
'user_preferences': await self._get_user_preferences()
}
return context
async def store_execution_memory(self, params, result):
"""Store memory of skill execution"""
execution_memory = {
'skill_name': self.name,
'parameters': params,
'result': result,
'success': result.get('status') == 'success',
'execution_time': result.get('execution_time'),
'context_used': bool(params.get('context'))
}
await self.memory.store_memory(
memory_key=f"execution_{self.name}_{datetime.now().timestamp()}",
content=execution_memory,
memory_type='episodic',
tags=[self.name, 'execution', result.get('status', 'unknown')]
)
async def learn_from_execution(self, params, result, context):
"""Learn patterns from execution results"""
# Analyze execution success patterns
if result.get('status') == 'success':
await self._learn_success_patterns(params, result, context)
else:
await self._learn_failure_patterns(params, result, context)
# Update user preference models
await self._update_preference_models(params, result)
# Improve procedural knowledge
await self._improve_procedural_knowledge(params, result)
async def _learn_success_patterns(self, params, result, context):
"""Learn from successful executions"""
# Find similar successful executions
similar_successes = await self.memory.recall_memories(
tags=[self.name, 'execution', 'success'],
limit=10
)
# Extract common patterns
success_patterns = await self._extract_patterns(similar_successes + [result])
# Store learned patterns
await self.memory.store_memory(
memory_key=f"success_patterns_{self.name}",
content=success_patterns,
memory_type='semantic',
tags=[self.name, 'patterns', 'success']
)
Troubleshooting Memory Issues
Common Memory Problems and Solutions
Problem: Memory Growth Too Large
# Solution: Implement memory cleanup scheduler
@scheduled_task(interval='daily')
async def cleanup_memory():
optimizer = MemoryOptimizer(agent.memory)
results = await optimizer.run_optimization_cycle()
logging.info(f"Memory optimization results: {results}")
Problem: Slow Memory Recall
# Solution: Add memory indices and caching
class IndexedMemory(BaseMemory):
def __init__(self):
super().__init__()
self.tag_index = {}
self.content_index = {}
self.cache = LRUCache(maxsize=1000)
async def recall_memories(self, tags=None, **kwargs):
cache_key = f"recall_{hash(str(tags))}_{hash(str(kwargs))}"
if cache_key in self.cache:
return self.cache[cache_key]
# Use indices for faster lookup
if tags:
candidate_memories = self._get_memories_by_tags(tags)
else:
candidate_memories = await super().recall_memories(**kwargs)
self.cache[cache_key] = candidate_memories
return candidate_memories
Problem: Memory Corruption
# Solution: Implement memory validation and backup
class ValidatedMemory(BaseMemory):
async def store_memory(self, memory_key, content, **kwargs):
# Validate content before storage
if not self._validate_memory_content(content):
raise ValueError(f"Invalid memory content for key: {memory_key}")
# Create backup before modification
await self._create_backup(memory_key)
# Store with checksum
content_with_checksum = {
'content': content,
'checksum': self._calculate_checksum(content),
'timestamp': datetime.now()
}
return await super().store_memory(memory_key, content_with_checksum, **kwargs)
def _validate_memory_content(self, content):
# Implement validation logic
if not isinstance(content, (dict, list, str, int, float)):
return False
# Check for required fields, size limits, etc.
return True
Why OpenClaw Memory Matters
OpenClaw's memory system transforms simple automation into intelligent assistance:
Contextual Awareness: Agents understand the full context of your work and relationships Continuous Learning: Every interaction improves future performance Personalization: Agents adapt to your specific preferences and patterns Institutional Knowledge: Team knowledge persists beyond individual conversations
For teams wanting intelligent AI automation without the complexity of memory management, consider MrDelegate — offering similar AI-powered capabilities with managed memory systems and automatic learning.
Start your free trial to experience AI agents that remember, learn, and improve with every interaction.
Advanced Memory Applications
With proper memory configuration, your OpenClaw agents become genuinely intelligent assistants that:
- Remember your preferences and adapt their communication style
- Learn from your decisions to make better suggestions over time
- Build comprehensive context about your projects and relationships
- Improve their skills through continuous feedback and experience
- Maintain consistency across long-term interactions and projects
Memory isn't just storage — it's the foundation of AI intelligence that grows with your needs.
Your AI executive assistant is ready.
Morning brief at 7am. Inbox triaged overnight. Calendar protected. Dedicated VPS. No Docker. Live in 60 seconds.