# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-reasoning
import logging
from datetime import datetime, timedelta
from typing import Any, List
import numpy as np
from ..contracts import MemoryEntry
logger = logging.getLogger(__name__)
[docs]
class MemoryCompressor:
"""Compresses memory by summarizing older entries."""
def __init__(
self,
max_entries: int = 1000,
importance_threshold: float = 0.3,
time_window: timedelta = timedelta(days=7),
):
self.max_entries = max_entries
self.importance_threshold = importance_threshold
self.time_window = time_window
[docs]
def should_compress(self, entries: List[MemoryEntry]) -> bool:
"""Check if compression is needed."""
if len(entries) <= self.max_entries:
return False
# Check if mean importance is below threshold
importances = [entry.get("importance", 0.0) for entry in entries]
mean_importance = np.mean(importances) if importances else 0.0
# Compress only if both conditions are met:
# 1. Too many entries
# 2. Low average importance
return bool(len(entries) > self.max_entries and mean_importance < self.importance_threshold)
[docs]
async def compress(
self,
entries: List[MemoryEntry],
summarizer: Any, # LLM or summarization model
) -> List[MemoryEntry]:
"""Compress memory by summarizing older entries."""
if not entries:
return entries
if not self.should_compress(entries):
return entries
# Sort by timestamp
sorted_entries = sorted(entries, key=lambda x: x.get("timestamp", datetime.min))
# Split into recent and old entries
cutoff_time = datetime.now() - self.time_window
recent_entries = [
e for e in sorted_entries if e.get("timestamp", datetime.min) > cutoff_time
]
old_entries = [e for e in sorted_entries if e.get("timestamp", datetime.min) <= cutoff_time]
# If no old entries, try to compress based on max entries
if not old_entries and len(entries) > self.max_entries:
split_point = len(entries) - self.max_entries
old_entries = sorted_entries[:split_point]
recent_entries = sorted_entries[split_point:]
elif not old_entries:
# No old entries and not over max entries, return as is
return entries
# Check summarizer type first
if not (hasattr(summarizer, "summarize") or hasattr(summarizer, "generate")):
msg = "Summarizer must have summarize() or generate() method"
logger.error(f"Error during memory compression: {msg}")
raise ValueError(msg)
# Create summary of old entries
try:
summary = await self._create_summary(old_entries, summarizer)
summary_entry: MemoryEntry = {
"content": summary,
"importance": 1.0, # High importance for summaries
"timestamp": datetime.now(),
"metadata": {"is_summary": True, "summarized_entries": len(old_entries)},
"is_summary": True,
"category": "summary", # Add a category for summaries
}
# Return recent entries + summary
return recent_entries + [summary_entry]
except Exception as e:
logger.error(f"Error during memory compression: {e}")
return entries
async def _create_summary(self, entries: List[MemoryEntry], summarizer: Any) -> str:
"""Create a summary of multiple memory entries."""
# Combine all content
combined_content = "\n".join(entry.get("content", "") for entry in entries)
# Check summarizer type first to avoid unnecessary try/except
if not (hasattr(summarizer, "summarize") or hasattr(summarizer, "generate")):
msg = "Summarizer must have summarize() or generate() method"
logger.error(f"Error during memory compression: {msg}")
raise ValueError(msg)
# Use summarizer to create summary
try:
if hasattr(summarizer, "summarize"):
result = await summarizer.summarize(combined_content)
return str(result)
else: # Must have generate() method
result = await summarizer.generate(
f"Summarize the following text concisely:\n\n{combined_content}",
)
return str(result)
except Exception as e:
logger.error(f"Error during memory compression: {e}")
raise