# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning
import logging
from datetime import datetime, timedelta
from typing import Any, List
import numpy as np
from ..contracts import MemoryEntry
logger = logging.getLogger(__name__)
[docs]
class MemoryCompressor:
"""Compresses memory by summarizing older entries."""
def __init__(
self,
max_entries: int = 1000,
importance_threshold: float = 0.3,
time_window: timedelta = timedelta(days=7),
):
self.max_entries = max_entries
self.importance_threshold = importance_threshold
self.time_window = time_window
[docs]
def should_compress(self, entries: List[MemoryEntry]) -> bool:
"""Check if compression is needed."""
if len(entries) <= self.max_entries:
return False
# Check if mean importance is below threshold
importances = [entry["importance"] for entry in entries]
if np.mean(importances) < self.importance_threshold:
return True
return False
[docs]
async def compress(
self,
entries: List[MemoryEntry],
summarizer: Any, # LLM or summarization model
) -> List[MemoryEntry]:
"""Compress memory by summarizing older entries."""
if not self.should_compress(entries):
return entries
# Sort by timestamp
sorted_entries = sorted(entries, key=lambda x: x["timestamp"])
# Split into recent and old entries
cutoff_time = datetime.now() - self.time_window
recent_entries = [e for e in sorted_entries if e["timestamp"] > cutoff_time]
old_entries = [e for e in sorted_entries if e["timestamp"] <= cutoff_time]
if not old_entries:
return entries
# Create summary of old entries
try:
summary = await self._create_summary(old_entries, summarizer)
summary_entry = {
"content": summary,
"importance": 1.0, # High importance for summaries
"timestamp": datetime.now(),
"metadata": {"is_summary": True, "summarized_entries": len(old_entries)},
"is_summary": True,
}
# Return recent entries + summary
return recent_entries + [summary_entry]
except Exception as e:
logger.error(f"Error during memory compression: {e}")
return entries
async def _create_summary(self, entries: List[MemoryEntry], summarizer: Any) -> str:
"""Create a summary of multiple memory entries."""
# Combine all content
combined_content = "\n".join(entry["content"] for entry in entries)
# Use summarizer to create summary
if hasattr(summarizer, "summarize"):
return await summarizer.summarize(combined_content)
elif hasattr(summarizer, "generate"):
return await summarizer.generate(
f"Summarize the following text concisely:\n\n{combined_content}",
)
else:
raise ValueError("Summarizer must have summarize() or generate() method")