# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-reasoning
import ast
import asyncio
import json
import logging
import os
import re
import tempfile
from datetime import datetime
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Protocol,
TypedDict,
TypeGuard,
TypeVar,
Union,
cast,
)
import numpy as np
import yaml
from jinja2 import Template
from redis import Redis
from redis.client import Redis as RedisType
from ..memory.redisstack_logger import RedisStackMemoryLogger
from ..utils.embedder import get_embedder
from .base_node import BaseNode
T = TypeVar("T")
logger = logging.getLogger(__name__)
logger.info(f"DEBUG: Loading loop_node.py from {__file__}")
[docs]
class InsightCategory(TypedDict):
insights: str
improvements: str
mistakes: str
[docs]
class FloatConvertible(Protocol):
def __float__(self) -> float: ...
CategoryType = Literal["insights", "improvements", "mistakes"]
MetadataKey = Literal["loop_number", "score", "timestamp", "insights", "improvements", "mistakes"]
[docs]
class LoopNode(BaseNode):
"""
A specialized node that executes an internal workflow repeatedly until a condition is met.
The LoopNode enables iterative improvement workflows by running a sub-workflow multiple
times, learning from each iteration, and stopping when either a quality threshold is met
or a maximum number of iterations is reached.
Key Features:
- Iterative execution with quality thresholds
- Cognitive insight extraction from each iteration
- Learning from past iterations
- Automatic loop termination based on scores or max iterations
- Metadata tracking across iterations
Attributes:
max_loops (int): Maximum number of iterations allowed
score_threshold (float): Quality score required to stop iteration
score_extraction_pattern (str): Regex pattern to extract quality scores
cognitive_extraction (dict): Configuration for extracting insights
past_loops_metadata (dict): Template for tracking iteration data
internal_workflow (dict): The workflow to execute in each iteration
Example:
.. code-block:: yaml
- id: improvement_loop
type: loop
max_loops: 5
score_threshold: 0.85
score_extraction_pattern: "QUALITY_SCORE:\\s*([0-9.]+)"
cognitive_extraction:
enabled: true
extract_patterns:
insights: ["(?:provides?|shows?)\\s+(.+?)(?:\\n|$)"]
improvements: ["(?:lacks?|needs?)\\s+(.+?)(?:\\n|$)"]
past_loops_metadata:
iteration: "{{ loop_number }}"
score: "{{ score }}"
insights: "{{ insights }}"
internal_workflow:
orchestrator:
id: improvement-cycle
agents: [analyzer, scorer]
"""
[docs]
def __init__(
self,
node_id: str,
prompt: Optional[str] = None,
queue: Optional[List[Any]] = None,
memory_logger: Optional[RedisStackMemoryLogger] = None,
**kwargs: Any,
) -> None:
"""
Initialize the loop node.
Args:
node_id (str): Unique identifier for the node.
prompt (Optional[str]): Prompt or instruction for the node.
queue (Optional[List[Any]]): Queue of agents or nodes to be processed.
memory_logger (Optional[RedisStackMemoryLogger]): The RedisStackMemoryLogger instance.
**kwargs: Additional configuration parameters:
- max_loops (int): Maximum number of loop iterations (default: 5)
- score_threshold (float): Score threshold to meet before continuing (default: 0.8)
- high_priority_agents (List[str]): Agent names to check first for scores (default: ["agreement_moderator", "quality_moderator", "score_moderator"])
- score_extraction_config (dict): Complete score extraction configuration with strategies
- score_extraction_pattern (str): Regex pattern to extract score from results (deprecated, use score_extraction_config)
- score_extraction_key (str): Direct key to look for score in result dict (deprecated, use score_extraction_config)
- internal_workflow (dict): Complete workflow configuration to execute in loop
- past_loops_metadata (dict): Template for past_loops object structure
- cognitive_extraction (dict): Configuration for extracting valuable cognitive data
"""
super().__init__(node_id, prompt, queue, **kwargs)
# Ensure memory_logger is of correct type
if memory_logger is not None and not isinstance(memory_logger, RedisStackMemoryLogger):
logger.warning(f"Expected RedisStackMemoryLogger but got {type(memory_logger)}") # type: ignore [unreachable]
try:
memory_logger = cast(RedisStackMemoryLogger, memory_logger)
except Exception as e:
logger.error(f"Failed to cast memory logger: {e}")
memory_logger = None
self.memory_logger = memory_logger
# Configuration with type hints
self.max_loops: int = kwargs.get("max_loops", 5)
self.score_threshold: float = kwargs.get("score_threshold", 0.8)
# High-priority agents for score extraction (configurable)
self.high_priority_agents: List[str] = kwargs.get(
"high_priority_agents", ["agreement_moderator", "quality_moderator", "score_moderator"]
)
# Debug: Log the received configuration
if "score_extraction_config" in kwargs:
logger.debug(f"LoopNode {node_id}: Received custom score_extraction_config from YAML")
custom_config = kwargs["score_extraction_config"]
if "strategies" in custom_config:
logger.debug(
f"LoopNode {node_id}: Found {len(custom_config['strategies'])} strategies"
)
for i, strategy in enumerate(custom_config["strategies"]):
if strategy.get("type") == "pattern" and "patterns" in strategy:
logger.debug(
f"LoopNode {node_id}: Strategy {i+1} has {len(strategy['patterns'])} patterns"
)
logger.debug(
f"LoopNode {node_id}: First pattern: {strategy['patterns'][0] if strategy['patterns'] else 'None'}"
)
else:
logger.debug(f"LoopNode {node_id}: No custom score_extraction_config, using defaults")
self.score_extraction_config: Dict[str, List[Dict[str, Union[str, List[str]]]]] = (
kwargs.get(
"score_extraction_config",
{
"strategies": [
{
"type": "pattern",
"patterns": [
r"score:\s*(\d+\.?\d*)",
r"rating:\s*(\d+\.?\d*)",
r"confidence:\s*(\d+\.?\d*)",
r"agreement:\s*(\d+\.?\d*)",
r"consensus:\s*(\d+\.?\d*)",
r"AGREEMENT:\s*(\d+\.?\d*)",
r"SCORE:\s*(\d+\.?\d*)",
r"Score:\s*(\d+\.?\d*)",
r"Agreement:\s*(\d+\.?\d*)",
r"(\d+\.?\d*)/10",
r"(\d+\.?\d*)%",
r"(\d+\.?\d*)\s*out\s*of\s*10",
r"(\d+\.?\d*)\s*points?",
r"0\.[6-9][0-9]?", # Pattern for high agreement scores
r"([0-9])\.[0-9]+", # Any decimal number
],
}
]
},
)
)
# Debug: Log which configuration is actually being used
if "strategies" in self.score_extraction_config:
strategy_count = len(self.score_extraction_config["strategies"])
logger.debug(f"LoopNode {node_id}: Using {strategy_count} extraction strategies")
for i, strategy in enumerate(self.score_extraction_config["strategies"]):
if strategy.get("type") == "pattern" and "patterns" in strategy:
pattern_count = len(strategy["patterns"])
first_pattern = strategy["patterns"][0] if strategy["patterns"] else "None"
logger.debug(
f"LoopNode {node_id}: Strategy {i+1} (pattern): {pattern_count} patterns, first: {first_pattern}"
)
# Backward compatibility - convert old format to new format
if "score_extraction_pattern" in kwargs or "score_extraction_key" in kwargs:
logger.warning(
"score_extraction_pattern and score_extraction_key are deprecated. Use score_extraction_config instead.",
)
# Convert old format to new format
old_strategies = []
if "score_extraction_key" in kwargs:
old_strategies.append(
{
"type": "direct_key",
"key": kwargs["score_extraction_key"],
},
)
if "score_extraction_pattern" in kwargs:
old_strategies.append(
{
"type": "pattern",
"patterns": [kwargs["score_extraction_pattern"]],
},
)
if old_strategies:
self.score_extraction_config = {"strategies": old_strategies}
# Internal workflow configuration
self.internal_workflow = kwargs.get("internal_workflow", {})
# Past loops metadata structure (user-defined)
default_metadata_fields: Dict[MetadataKey, str] = {
"loop_number": "{{ loop_number }}",
"score": "{{ score }}",
"timestamp": "{{ timestamp }}",
"insights": "{{ insights }}",
"improvements": "{{ improvements }}",
"mistakes": "{{ mistakes }}",
}
# Load user-defined past_loops_metadata from YAML configuration
user_metadata = kwargs.get("past_loops_metadata", {})
if user_metadata:
logger.info(
f"Loading custom past_loops_metadata with {len(user_metadata)} fields: {list(user_metadata.keys())}"
)
self.past_loops_metadata: Dict[MetadataKey, str] = user_metadata
else:
logger.debug("Using default past_loops_metadata structure")
self.past_loops_metadata = default_metadata_fields
# Cognitive extraction configuration
self.cognitive_extraction: Dict[str, Any] = kwargs.get(
"cognitive_extraction",
{
"enabled": True,
"max_length_per_category": 300,
"extract_patterns": {
"insights": [],
"improvements": [],
"mistakes": [],
},
"agent_priorities": {},
},
)
[docs]
async def run(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the loop node with threshold checking."""
original_input = payload.get("input")
original_previous_outputs = payload.get("previous_outputs", {})
# Create a working copy of previous_outputs to avoid circular references
loop_previous_outputs = original_previous_outputs.copy()
# Initialize past_loops in our working copy
past_loops: List[PastLoopMetadata] = []
current_loop = 0
loop_result: Optional[Dict[str, Any]] = None
score = 0.0
while current_loop < self.max_loops:
current_loop += 1
logger.info(f"Loop {current_loop}/{self.max_loops} starting")
# Update the working copy with current past_loops for this iteration
loop_previous_outputs["past_loops"] = past_loops
# Clear any Redis cache that might cause response duplication
await self._clear_loop_cache(current_loop)
# Execute internal workflow
loop_result = await self._execute_internal_workflow(
original_input,
loop_previous_outputs,
)
if loop_result is None:
logger.error("Internal workflow execution failed")
break
# Extract score
score = await self._extract_score(loop_result)
# Create past_loop object using metadata template
past_loop_obj = self._create_past_loop_object(
current_loop,
score,
loop_result,
original_input,
)
# Add to our local past_loops array
past_loops.append(past_loop_obj)
# Store loop result in Redis if memory_logger is available
if self.memory_logger is not None:
try:
# Store individual loop result
loop_key = f"loop_result:{self.node_id}:{current_loop}"
self._store_in_redis(loop_key, loop_result)
logger.debug(f"- Stored loop result: {loop_key}")
# Store past loops array
past_loops_key = f"past_loops:{self.node_id}"
self._store_in_redis(past_loops_key, past_loops)
logger.debug(f"- Stored past loops: {past_loops_key}")
# Store in Redis hash for tracking
group_key = f"loop_results:{self.node_id}"
self._store_in_redis_hash(
group_key,
str(current_loop),
{
"result": loop_result,
"score": score,
"past_loop": past_loop_obj,
},
)
logger.debug(f"- Stored result in group for loop {current_loop}")
except Exception as e:
logger.error(f"Failed to store loop result in Redis: {e}")
# Check threshold
if score >= self.score_threshold:
logger.info(f"Threshold met: {score} >= {self.score_threshold}")
# Return final result with clean past_loops array and safe result
final_result = {
"input": original_input,
"result": self._create_safe_result(loop_result),
"loops_completed": current_loop,
"final_score": score,
"threshold_met": True,
"past_loops": past_loops,
}
# Store final result in Redis
if self.memory_logger is not None:
try:
final_key = f"final_result:{self.node_id}"
self._store_in_redis(final_key, final_result)
logger.debug(f"- Stored final result: {final_key}")
except Exception as e:
logger.error(f"Failed to store final result in Redis: {e}")
return final_result
logger.info(f"Threshold not met: {score} < {self.score_threshold}, continuing...")
# Max loops reached without meeting threshold
if loop_result is None:
loop_result = {}
logger.info(f"Max loops reached: {self.max_loops}")
final_result = {
"input": original_input,
"result": self._create_safe_result(loop_result),
"loops_completed": current_loop,
"final_score": score,
"threshold_met": False,
"past_loops": past_loops,
}
# Store final result in Redis
if self.memory_logger is not None:
try:
final_key = f"final_result:{self.node_id}"
self._store_in_redis(final_key, final_result)
logger.debug(f"- Stored final result: {final_key}")
except Exception as e:
logger.error(f"Failed to store final result in Redis: {e}")
return final_result
async def _execute_internal_workflow(
self, original_input: Any, previous_outputs: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Execute the internal workflow configuration."""
from ..orchestrator import Orchestrator
# Get the original workflow configuration
original_workflow = self.internal_workflow.copy()
# Ensure we have the basic structure
if "orchestrator" not in original_workflow:
original_workflow["orchestrator"] = {}
# Update the orchestrator configuration while preserving agents
orchestrator_config = original_workflow["orchestrator"]
orchestrator_config.update(
{
"id": orchestrator_config.get("id", "internal-workflow"),
"strategy": orchestrator_config.get("strategy", "sequential"),
"memory": {
"config": {
"redis_url": os.getenv("REDIS_URL", "redis://localhost:6380/0"),
"backend": "redisstack",
"enable_hnsw": True,
"vector_params": {
"M": 16,
"ef_construction": 200,
"ef_runtime": 10,
},
}
},
}
)
# Create temporary workflow file
with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f:
yaml.dump(original_workflow, f)
temp_file = f.name
try:
# Create orchestrator for internal workflow
orchestrator = Orchestrator(temp_file)
# Use parent's memory logger to maintain consistency
if self.memory_logger is not None:
orchestrator.memory = self.memory_logger
orchestrator.fork_manager.redis = self.memory_logger.redis # Fixed attribute name
# Create a safe version of previous_outputs to prevent circular references
# BUT preserve important loop context for agents to see previous results
safe_previous_outputs = self._create_safe_result_with_context(previous_outputs)
# Calculate current loop number from past_loops length
current_loop_number = len(previous_outputs.get("past_loops", [])) + 1
# Prepare input with past_loops context AND loop_number
# Build dynamic past_loops_metadata using user-defined fields
dynamic_metadata = {}
past_loops_data = cast(List[PastLoopMetadata], previous_outputs.get("past_loops", []))
for field_name in self.past_loops_metadata.keys():
if field_name in ["insights", "improvements", "mistakes"]:
# Extract from cognitive extraction
dynamic_metadata[field_name] = self._extract_metadata_field(
field_name, past_loops_data
)
else:
# For other fields, extract from past loop metadata if available
if past_loops_data:
last_loop = past_loops_data[-1]
value = last_loop.get(field_name, f"No {field_name} available")
dynamic_metadata[field_name] = str(value)
else:
dynamic_metadata[field_name] = f"No {field_name} available"
workflow_input = {
"input": original_input,
"previous_outputs": safe_previous_outputs,
"loop_number": current_loop_number,
"past_loops_metadata": dynamic_metadata,
}
# Execute workflow with return_logs=True to get full logs for processing
agent_sequence = [agent.get("id") for agent in self.internal_workflow.get("agents", [])]
logger.debug(
f"About to execute internal workflow with {len(agent_sequence)} agents defined"
)
logger.debug(f"Full agent sequence: {agent_sequence}")
try:
logs = await orchestrator.run(workflow_input, return_logs=True)
logger.debug(f"Internal workflow execution completed with {len(logs)} log entries")
# Debug which agents actually executed
executed_sequence = []
for log_entry in logs:
if (
isinstance(log_entry, dict)
and log_entry.get("agent_id")
and log_entry.get("agent_id") not in executed_sequence
):
executed_sequence.append(log_entry.get("agent_id"))
logger.debug(f"Actual execution sequence: {executed_sequence}")
logger.debug(
f"Expected vs actual count: {len(agent_sequence)} expected, {len(executed_sequence)} executed"
)
# Identify missing agents
missing_agents = [
agent for agent in agent_sequence if agent not in executed_sequence
]
if missing_agents:
logger.error(f"CRITICAL: Missing agents from execution: {missing_agents}")
else:
logger.info("All agents executed successfully")
except Exception as e:
logger.error(f"CRITICAL: Internal workflow execution failed with exception: {e}")
logger.error(f"CRITICAL: Exception type: {type(e)}")
raise
# Extract actual agent responses from logs - ENHANCED for all execution types
agents_results: Dict[str, Any] = {}
executed_agents = []
# Track extraction statistics for debugging
extraction_stats: Dict[str, Any] = {
"total_log_entries": len(logs),
"agent_entries": 0,
"successful_extractions": 0,
"extraction_methods": {},
}
for log_entry in logs:
if isinstance(log_entry, dict) and log_entry.get("event_type") == "MetaReport":
continue # Skip meta report
if isinstance(log_entry, dict):
agent_id = log_entry.get("agent_id")
if agent_id:
executed_agents.append(agent_id)
extraction_stats["agent_entries"] += 1
# ENHANCED: Multiple extraction strategies for different agent execution types
result_found = False
extraction_method = None
# Strategy 1: Standard payload.result (for most agents)
if not result_found and "payload" in log_entry:
payload = log_entry["payload"]
if "result" in payload:
agents_results[agent_id] = payload["result"]
result_found = True
extraction_method = "payload.result"
# Strategy 2: Check if the log entry itself contains result data
if not result_found and "result" in log_entry:
agents_results[agent_id] = log_entry["result"]
result_found = True
extraction_method = "direct_result"
# Strategy 3: Extract from structured log content (for embedded results)
if not result_found:
# Parse the full log entry content for embedded data structures
log_content = str(log_entry)
# Look for JSON-like structures containing our agent
if f'"{agent_id}":' in log_content and '"response":' in log_content:
try:
# Try to extract the JSON structure
import json
import re
# Look for the pattern: "agent_id": {"response": "...", ...}
pattern = f'"{re.escape(agent_id)}":\\s*\\{{[^}}]+\\}}'
match = re.search(pattern, log_content)
if match:
agent_data_str = "{" + match.group(0) + "}"
# Clean up the string to make it valid JSON
agent_data_str = agent_data_str.replace(
f'"{agent_id}":', f'"{agent_id}":'
)
try:
agent_data = json.loads(agent_data_str)
if agent_id in agent_data:
agents_results[agent_id] = agent_data[agent_id]
result_found = True
extraction_method = "embedded_json"
except json.JSONDecodeError:
pass
except Exception as e:
logger.debug(
f"Failed to parse embedded JSON for {agent_id}: {e}"
)
# Strategy 4: For LocalLLM agents, check for response/content patterns in log entry
if not result_found and isinstance(log_entry, dict):
potential_response = None
# Check common response patterns
if "response" in log_entry:
potential_response = {"response": log_entry["response"]}
elif "content" in log_entry:
potential_response = {"response": log_entry["content"]}
elif "output" in log_entry:
potential_response = {"response": log_entry["output"]}
# Check nested payload structures
elif "payload" in log_entry and isinstance(log_entry["payload"], dict):
payload = log_entry["payload"]
if "response" in payload:
potential_response = {"response": payload["response"]}
elif "content" in payload:
potential_response = {"response": payload["content"]}
elif "output" in payload:
potential_response = {"response": payload["output"]}
if potential_response:
agents_results[agent_id] = potential_response
result_found = True
extraction_method = "response_pattern"
# Strategy 5: Search for agent data in the entire log structure
if not result_found:
# Convert entire log entry to string and search for response patterns
full_content = str(log_entry)
# Look for common score patterns that indicate this agent has useful data
score_indicators = [
"AGREEMENT_SCORE:",
"SCORE:",
"score:",
"Score:",
"RATING:",
"rating:",
]
for indicator in score_indicators:
if indicator in full_content and agent_id in full_content:
# Create a basic response structure with the content
agents_results[agent_id] = {"response": full_content}
result_found = True
extraction_method = "content_search"
break
# Track extraction statistics
if result_found:
extraction_stats["successful_extractions"] += 1
extraction_stats["extraction_methods"][extraction_method] = (
extraction_stats["extraction_methods"].get(extraction_method, 0) + 1
)
logger.debug(
f"✅ Extracted result for '{agent_id}' via {extraction_method}"
)
else:
logger.debug(
f"❌ No result found for '{agent_id}' - Available keys: {list(log_entry.keys())}"
)
# Log a sample of the log entry for debugging
sample_content = (
str(log_entry)[:500] + "..."
if len(str(log_entry)) > 500
else str(log_entry)
)
logger.debug(f" Sample content: {sample_content}")
logger.debug(f"Agents that actually executed: {executed_agents}")
logger.debug(f"Agents with results: {list(agents_results.keys())}")
logger.info(f"Extraction statistics: {extraction_stats}")
# Generic debugging: Check for any agents that might contain scores
score_patterns = ["AGREEMENT_SCORE", "SCORE:", "score:", "Score:"]
potential_scoring_agents = []
for agent_id, agent_result in agents_results.items():
if isinstance(agent_result, dict) and "response" in agent_result:
response_text = str(agent_result["response"])
for pattern in score_patterns:
if pattern in response_text:
potential_scoring_agents.append(agent_id)
logger.info(
f"Found potential scoring agent '{agent_id}' with pattern '{pattern}'"
)
logger.info(f"{agent_id} response: {response_text[:200]}...")
break
if potential_scoring_agents:
logger.info(f"Potential scoring agents found: {potential_scoring_agents}")
else:
logger.warning("No agents found with score patterns!")
logger.warning(f"All executed agents: {executed_agents}")
logger.warning(
f"Expected agents: {[agent.get('id') for agent in self.internal_workflow.get('agents', [])]}"
)
# Show sample responses to understand format
for agent_id, agent_result in list(agents_results.items())[
:3
]: # Show first 3 agents
if isinstance(agent_result, dict) and "response" in agent_result:
logger.warning(
f"Sample response from '{agent_id}': {str(agent_result['response'])[:100]}..."
)
# Store agent results in Redis
for agent_id, result in agents_results.items():
# Store agent result in Redis
result_key = f"agent_result:{agent_id}:{current_loop_number}"
self._store_in_redis(result_key, result)
# Store in Redis hash for tracking
group_key = f"agent_results:{self.node_id}:{current_loop_number}"
self._store_in_redis_hash(group_key, agent_id, result)
# Store all results for this loop
loop_results_key = f"loop_agents:{self.node_id}:{current_loop_number}"
self._store_in_redis(loop_results_key, agents_results)
# Store in Redis hash for tracking
group_key = f"loop_agents:{self.node_id}"
self._store_in_redis_hash(group_key, str(current_loop_number), agents_results)
return agents_results
except Exception as e:
logger.error(f"Failed to execute internal workflow: {e}")
return None
finally:
try:
os.unlink(temp_file)
except Exception as e:
logger.warning(f"Failed to delete temporary workflow file: {e}")
def _is_valid_value(self, value: Any) -> TypeGuard[Union[str, int, float]]:
"""Check if a value can be converted to float."""
try:
if isinstance(value, (int, float)):
return True
if isinstance(value, str) and value.strip():
float(value)
return True
return False
except (ValueError, TypeError):
return False
async def _extract_score(self, result: Dict[str, Any]) -> float:
"""Extract score from result using configured extraction strategies."""
if not result:
return 0.0
# DEBUG: Log what data we're trying to extract from
logger.debug(f"Score extraction called with {len(result)} agents")
for agent_id, agent_result in result.items():
if isinstance(agent_result, dict) and "response" in agent_result:
response_text = str(agent_result["response"])
response_preview = (
response_text[:100] + "..." if len(response_text) > 100 else response_text
)
logger.debug(f"Agent '{agent_id}' response preview: {response_preview}")
strategies = self.score_extraction_config.get("strategies", [])
# PRIORITY 1: Check for high-priority agents first (configurable)
for priority_agent in self.high_priority_agents:
if priority_agent in result:
agent_result = result[priority_agent]
if isinstance(agent_result, dict) and "response" in agent_result:
response_text = str(agent_result["response"])
logger.info(
f"🔍 Checking high-priority agent '{priority_agent}': {response_text[:100]}..."
)
# Collect all patterns from the configuration
all_patterns = []
for strategy in strategies:
if strategy.get("type") == "pattern" and "patterns" in strategy:
patterns = strategy["patterns"]
if isinstance(patterns, list):
all_patterns.extend(patterns)
# If no patterns in config, use basic fallback patterns
if not all_patterns:
all_patterns = [
r"AGREEMENT_SCORE:\s*([0-9.]+)",
r"Agreement Score:\s*([0-9.]+)",
r"AGREEMENT_SCORE\s*([0-9.]+)",
r"Score:\s*([0-9.]+)",
r"SCORE:\s*([0-9.]+)",
]
score_patterns = all_patterns
for score_pattern in score_patterns:
match = re.search(score_pattern, response_text)
if match and match.group(1):
try:
score = float(match.group(1))
logger.info(
f"✅ Found score {score} from high-priority agent '{priority_agent}' using pattern: {score_pattern}"
)
return score
except (ValueError, TypeError):
continue
logger.warning(
f"❌ High-priority agent '{priority_agent}' found but no score extracted from: {response_text}"
)
# PRIORITY 2: Use configured extraction strategies
for strategy in strategies:
if not isinstance(strategy, dict):
continue # type: ignore [unreachable]
strategy_type = strategy.get("type")
if strategy_type == "direct_key":
key = str(strategy.get("key", ""))
if key in result:
value = result[key]
if self._is_valid_value(value):
logger.info(f"✅ Found score {value} via direct_key strategy")
return float(value) # Now type-safe due to TypeGuard
elif strategy_type == "pattern":
patterns = strategy.get("patterns", [])
if not isinstance(patterns, list):
continue
for pattern in patterns:
if not isinstance(pattern, str):
continue # type: ignore [unreachable]
logger.debug(f"🔍 Trying pattern: {pattern}")
# Look deeper into agent result structures
for agent_id, agent_result in result.items():
# Check direct string values
if isinstance(agent_result, str):
match = re.search(pattern, agent_result)
if match and match.group(1):
try:
score = float(match.group(1))
logger.info(
f"✅ Found score {score} in {agent_id} (direct string) using pattern: {pattern}"
)
return score
except (ValueError, TypeError):
continue
# Check nested response fields in agent dictionaries
elif isinstance(agent_result, dict):
for key in ["response", "result", "output", "data"]:
if key in agent_result and isinstance(agent_result[key], str):
text_content = agent_result[key]
logger.debug(
f"🔍 Searching in {agent_id}.{key}: {repr(text_content[:200])}"
)
match = re.search(pattern, text_content)
if match and match.group(1):
logger.debug(f"✅ Matched text: '{text_content[:200]}'")
try:
score = float(match.group(1))
logger.info(
f"✅ Found score {score} in {agent_id}.{key} using pattern: {pattern}"
)
return score
except (ValueError, TypeError):
continue
else:
if (
"agreement" in agent_id.lower()
or "AGREEMENT_SCORE" in text_content
):
logger.debug(
f"❌ No match for pattern '{pattern}' in {agent_id}.{key}: '{text_content[:100]}'"
)
elif strategy_type == "agent_key":
agents = strategy.get("agents", [])
key = str(strategy.get("key", "response"))
logger.debug(f"🔍 Trying agent_key strategy for agents: {agents}, key: {key}")
for agent_name in agents:
if agent_name in result:
logger.debug(f"🔍 Found agent '{agent_name}' in results")
agent_result = result[agent_name]
if isinstance(agent_result, dict) and key in agent_result:
response_text = str(agent_result[key])
logger.debug(f"🔍 Agent '{agent_name}' {key}: '{response_text[:100]}'")
# Use configured patterns for agent_key strategy
agent_score_patterns = []
for strategy in strategies:
if strategy.get("type") == "pattern" and "patterns" in strategy:
patterns = strategy["patterns"]
if isinstance(patterns, list):
agent_score_patterns.extend(patterns)
# If no patterns in config, use basic fallback patterns
if not agent_score_patterns:
agent_score_patterns = [
r"AGREEMENT_SCORE:\s*([0-9.]+)",
r"Agreement Score:\s*([0-9.]+)",
r"SCORE:\s*([0-9.]+)",
r"Score:\s*([0-9.]+)",
]
for score_pattern in agent_score_patterns:
score_match = re.search(score_pattern, response_text)
if score_match:
try:
score = float(score_match.group(1))
logger.info(
f"✅ Found score {score} in agent_key strategy from {agent_name} using pattern: {score_pattern}"
)
return score
except (ValueError, TypeError):
continue
else:
logger.debug(
f"🔍 Agent '{agent_name}' not found in results. Available agents: {list(result.keys())}"
)
# PRIORITY 3: Fallback to embedding computation ONLY if no explicit scores found
# AND this appears to be a cognitive debate scenario without explicit moderators
agent_ids = list(result.keys())
# Check if we have any explicit score agents that might have failed
has_score_agents = any(agent_id in result for agent_id in self.high_priority_agents)
if has_score_agents:
logger.warning(
"❌ Score agents present but no scores extracted. NOT using embedding fallback to avoid overriding explicit scores."
)
return 0.0
# Only use embedding fallback for pure cognitive debates without score moderators
cognitive_agents = [
aid
for aid in agent_ids
if any(
word in aid.lower()
for word in ["progressive", "conservative", "realist", "purist"]
# Note: "agreement" excluded to avoid confusion with agreement_moderator
)
]
if len(cognitive_agents) >= 2:
logger.info(
f"Detected cognitive debate with agents: {cognitive_agents} (no score moderators found)"
)
logger.info("Using embedding-based agreement computation as final fallback")
try:
# Run agreement computation directly since we're already in async context
agreement_score = await self._compute_agreement_score(result)
logger.info(f"✅ Computed fallback agreement score: {agreement_score}")
return agreement_score
except Exception as e:
logger.error(f"Failed to compute agreement score: {e}")
return 0.0
logger.warning("❌ No valid score extraction method succeeded")
return 0.0
def _extract_direct_key(self, result: dict[str, Any], key: str) -> float | None:
"""Extract score from direct key in result."""
if key in result:
try:
return float(result[key])
except (ValueError, TypeError):
pass
return None
def _extract_agent_key(
self, result: dict[str, Any], agents: list[str], key: str
) -> float | None:
"""Extract score from specific agent results."""
for agent_id, agent_result in result.items():
# Check if this agent matches our priority list
if agents and not any(agent_name in agent_id.lower() for agent_name in agents):
continue
# 🔧 FIXED: Handle nested result structures (result.response, result.result, etc.)
possible_values = []
# Direct key access
if isinstance(agent_result, dict) and key in agent_result:
possible_values.append(agent_result[key])
# Nested access - look in result.response, result.result, etc.
if isinstance(agent_result, dict):
for nested_key in ["response", "result", "output", "data"]:
if nested_key in agent_result:
nested_value = agent_result[nested_key]
# If nested value is a dict, look for our key directly
if isinstance(nested_value, dict) and key in nested_value:
possible_values.append(nested_value[key])
# 🔧 NEW: Parse string dictionaries from LLM responses
elif isinstance(nested_value, str):
# Try to parse as JSON first
try:
parsed = json.loads(nested_value)
if isinstance(parsed, dict) and key in parsed:
possible_values.append(parsed[key])
except json.JSONDecodeError:
pass
# Try to parse as Python dictionary string
try:
parsed = ast.literal_eval(nested_value)
if isinstance(parsed, dict) and key in parsed:
possible_values.append(parsed[key])
except (ValueError, SyntaxError):
pass
pattern = rf"['\"]?{re.escape(key)}['\"]?\s*:\s*([0-9.]+)"
match = re.search(pattern, nested_value)
if match:
possible_values.append(match.group(1))
# Try to convert any found values to float
for value in possible_values:
try:
return float(value)
except (ValueError, TypeError):
continue
return None
async def _compute_agreement_score(self, result: dict[str, Any]) -> float:
"""
Compute agreement score between agent responses using embeddings and cosine similarity.
This function replaces the text-based agreement_finder agent with proper
embedding-based similarity calculation.
Args:
result: Dictionary containing agent responses
Returns:
float: Agreement score between 0.0 and 1.0
"""
try:
# Extract responses from all agents
agent_responses: List[Dict[str, Any]] = []
for agent_id, agent_result in result.items():
if isinstance(agent_result, dict):
# Look for response content in common fields
response_text = None
for field in ["response", "result", "output", "content", "answer"]:
if field in agent_result and agent_result[field]:
response_text = str(agent_result[field])
break
if response_text:
agent_responses.append(
{"agent_id": agent_id, "response": response_text, "embedding": None}
)
elif isinstance(agent_result, str) and agent_result.strip():
# Handle direct string responses
agent_responses.append(
{"agent_id": agent_id, "response": agent_result, "embedding": None}
)
# Need at least 2 responses to compute agreement
if len(agent_responses) < 2:
logger.warning(
f"Only {len(agent_responses)} agent responses found, need at least 2 for agreement"
)
return 0.0
# Generate embeddings for each response
embedder = get_embedder()
for agent_data in agent_responses:
try:
response_text = agent_data["response"]
if response_text and isinstance(response_text, str):
embedding = await embedder.encode(response_text)
agent_data["embedding"] = (
np.array(embedding) if embedding is not None else None
)
else:
agent_data["embedding"] = None
except Exception as e:
logger.warning(
f"Failed to generate embedding for {agent_data['agent_id']}: {e}"
)
agent_data["embedding"] = None
# Filter out responses without valid embeddings
valid_embeddings = []
valid_agents = []
for agent_data in agent_responses:
if agent_data["embedding"] is not None and len(agent_data["embedding"]) > 0:
valid_embeddings.append(agent_data["embedding"])
valid_agents.append(agent_data["agent_id"])
if len(valid_embeddings) < 2:
logger.warning(f"Only {len(valid_embeddings)} valid embeddings, returning 0.0")
return 0.0
# Compute pairwise cosine similarities
from sklearn.metrics.pairwise import ( # type: ignore[import-untyped]
cosine_similarity,
)
embeddings_matrix = np.array(valid_embeddings)
similarity_matrix = cosine_similarity(embeddings_matrix)
# Calculate mean similarity (excluding diagonal)
n = len(similarity_matrix)
if n < 2:
return 0.0
# Sum all similarities except diagonal, then normalize
total_similarity = np.sum(similarity_matrix) - np.trace(similarity_matrix)
max_pairs = n * (n - 1) # All pairs excluding self-similarity
if max_pairs == 0:
return 0.0
mean_agreement = total_similarity / max_pairs
# Ensure score is between 0 and 1
agreement_score = max(0.0, min(1.0, float(mean_agreement)))
logger.info(
f"Computed agreement score: {agreement_score:.3f} from {len(valid_agents)} agents: {valid_agents}"
)
return agreement_score
except Exception as e:
logger.error(f"Error computing agreement score: {e}")
return 0.0
def _extract_nested_path(self, result: dict[str, Any], path: str) -> float | None:
"""Extract score from nested path (e.g., 'result.score')."""
if not path:
return None
path_parts = path.split(".")
current = result
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
if self._is_valid_value(current):
return float(current)
return None
def _extract_pattern(self, result: dict[str, Any], patterns: list[str]) -> float | None:
"""Extract score using regex patterns."""
result_text = str(result)
for pattern in patterns:
try:
match = re.search(pattern, result_text)
if match:
try:
return float(match.group(1))
except (ValueError, IndexError):
continue
except re.error:
# Skip invalid regex patterns
continue
return None
def _extract_secondary_metric(
self, result: dict[str, Any], metric_key: str, default: Any = 0.0
) -> Any:
"""
Extract secondary metrics (like REASONING_QUALITY, CONVERGENCE_TREND) from agent responses.
Args:
result: The workflow result to extract metric from
metric_key: The key to look for (e.g., "REASONING_QUALITY", "CONVERGENCE_TREND")
default: Default value if metric not found
Returns:
The extracted metric value or default
"""
if not isinstance(result, dict):
logger.warning(f"Result is not a dict, cannot extract {metric_key}: {type(result)}") # type: ignore [unreachable]
return default
# Try different extraction strategies
for agent_id, agent_result in result.items():
if not isinstance(agent_result, dict):
continue
# Look in nested structures
for nested_key in ["response", "result", "output", "data"]:
if nested_key not in agent_result:
continue
nested_value = agent_result[nested_key]
# If nested value is a dict, look for our key directly
if isinstance(nested_value, dict) and metric_key in nested_value:
return nested_value[metric_key]
# Parse string dictionaries from LLM responses
elif isinstance(nested_value, str):
# Try to parse as JSON first
try:
parsed = json.loads(nested_value)
if isinstance(parsed, dict) and metric_key in parsed:
return parsed[metric_key]
except json.JSONDecodeError:
pass
# Try to parse as Python dictionary string
try:
parsed = ast.literal_eval(nested_value)
if isinstance(parsed, dict) and metric_key in parsed:
return parsed[metric_key]
except (ValueError, SyntaxError):
pass
# Try regex pattern matching on the string
pattern = (
rf"['\"]?{re.escape(metric_key)}['\"]?\s*:\s*['\"]?([^'\",$\}}]+)['\"]?"
)
match = re.search(pattern, nested_value)
if match:
value = match.group(1).strip()
# For numeric values, try to convert to float
if (
metric_key in ["REASONING_QUALITY", "AGREEMENT_SCORE"]
and value.replace(".", "").isdigit()
):
try:
return float(value)
except ValueError:
pass
return value
# Fallback: return default
logger.debug(
f"Secondary metric '{metric_key}' not found in result, using default: {default}",
)
return default
def _extract_cognitive_insights(
self, result: Dict[str, Any], max_length: int = 300
) -> InsightCategory:
"""Extract cognitive insights from result using configured patterns."""
if not self.cognitive_extraction.get("enabled", True):
return InsightCategory(insights="", improvements="", mistakes="")
extract_patterns = cast(
Dict[str, List[str]], self.cognitive_extraction.get("extract_patterns", {})
)
agent_priorities = cast(
Dict[str, List[str]], self.cognitive_extraction.get("agent_priorities", {})
)
max_length = self.cognitive_extraction.get("max_length_per_category", max_length)
extracted: Dict[CategoryType, List[str]] = {
"insights": [],
"improvements": [],
"mistakes": [],
}
if not isinstance(result, dict):
return InsightCategory(insights="", improvements="", mistakes="") # type: ignore [unreachable]
# ✅ FIX: Extract insights from ALL agent responses, not just prioritized ones
for agent_id, agent_result in result.items():
if not isinstance(agent_result, (str, dict)):
continue
# ✅ FIX: Get text from proper structure - look in response field for LLM agents
texts_to_analyze = []
if isinstance(agent_result, str):
texts_to_analyze.append(agent_result)
elif isinstance(agent_result, dict):
# Look for response content in common fields
for field in ["response", "result", "output", "data"]:
if field in agent_result and isinstance(agent_result[field], str):
texts_to_analyze.append(agent_result[field])
# Fallback: convert entire dict to string
if not texts_to_analyze:
texts_to_analyze.append(str(agent_result))
# Apply extraction patterns to all found text content
for text in texts_to_analyze:
if not text or len(text) < 20: # Skip very short content
continue
# ✅ FIX: Apply patterns for ALL categories to ALL agents (not just prioritized)
for category in ["insights", "improvements", "mistakes"]:
cat_key = cast(CategoryType, category)
patterns = extract_patterns.get(category, [])
if not isinstance(patterns, list):
continue # type: ignore[unreachable]
for pattern in patterns:
if not isinstance(pattern, str):
continue # type: ignore[unreachable]
try:
matches = re.finditer(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
if len(match.groups()) > 0:
insight = match.group(1).strip()
if insight and len(insight) > 10: # Minimum length threshold
# Clean up the insight
insight = re.sub(
r"\s+", " ", insight
) # Normalize whitespace
if len(insight) <= 200: # Reasonable length limit
extracted[cat_key].append(insight)
logger.debug(
f"✅ Extracted {category} from {agent_id}: {insight[:50]}..."
)
except re.error as e:
logger.warning(f"Invalid regex pattern '{pattern}': {e}")
continue
# Process each category
result_insights = []
result_improvements = []
result_mistakes = []
for category, items in extracted.items():
if not items:
continue
# Remove duplicates while preserving order
unique_items = []
seen: set[str] = set()
for item in items:
if item.lower() not in seen:
unique_items.append(item)
seen.add(item.lower())
# Join and truncate
combined = " | ".join(unique_items)
if len(combined) > max_length:
combined = combined[:max_length] + "..."
if category == "insights":
result_insights.append(combined)
elif category == "improvements":
result_improvements.append(combined)
elif category == "mistakes":
result_mistakes.append(combined)
return InsightCategory(
insights=" | ".join(result_insights),
improvements=" | ".join(result_improvements),
mistakes=" | ".join(result_mistakes),
)
def _create_past_loop_object(
self, loop_number: int, score: float, result: Dict[str, Any], original_input: Any
) -> PastLoopMetadata:
"""Create past_loop object using metadata template with cognitive insights."""
# Extract cognitive insights from the result
cognitive_insights = self._extract_cognitive_insights(result)
# Extract secondary metrics from agent responses
reasoning_quality = self._extract_secondary_metric(result, "REASONING_QUALITY")
convergence_trend = self._extract_secondary_metric(
result,
"CONVERGENCE_TREND",
default="STABLE",
)
# Create a safe version of the result for fallback
safe_result = self._create_safe_result(result)
# Ensure input is also safe and truncated
safe_input = str(original_input)
if len(safe_input) > 200:
safe_input = safe_input[:200] + "...<truncated>"
# Complete template context for Jinja2 rendering
template_context = {
"loop_number": loop_number,
"score": score,
"reasoning_quality": reasoning_quality,
"convergence_trend": convergence_trend,
"timestamp": datetime.now().isoformat(),
"result": safe_result,
"input": safe_input,
"insights": cognitive_insights.get("insights", ""),
"improvements": cognitive_insights.get("improvements", ""),
"mistakes": cognitive_insights.get("mistakes", ""),
"previous_outputs": safe_result,
}
# ✅ FIX: Add helper functions to template context for LoopNode metadata rendering
try:
# Create a payload-like structure for helper functions
helper_payload = {
"input": safe_input,
"previous_outputs": safe_result,
"loop_number": loop_number,
}
# Add helper functions using the same approach as execution engine
from orka.orchestrator.prompt_rendering import PromptRenderer
renderer = PromptRenderer()
helper_functions = renderer._get_template_helper_functions(helper_payload)
template_context.update(helper_functions)
logger.debug(
f"- Added {len(helper_functions)} helper functions to LoopNode template context"
)
except Exception as e:
logger.warning(f"Failed to add helper functions to LoopNode template context: {e}")
# Continue without helper functions - fallback gracefully
# ✅ FIX: Create past loop object using user-defined metadata template
past_loop_obj: PastLoopMetadata = {}
# Render each metadata field using the user-defined templates
for field_name, template_str in self.past_loops_metadata.items():
try:
template = Template(template_str)
rendered_value = template.render(template_context)
past_loop_obj[field_name] = rendered_value
logger.debug(f"- Rendered metadata field '{field_name}': {rendered_value[:50]}...")
except Exception as e:
logger.warning(
f"Failed to render metadata field '{field_name}' with template '{template_str}': {e}"
)
# Fallback to simple value
if field_name == "loop_number":
past_loop_obj[field_name] = loop_number
elif field_name == "score":
past_loop_obj[field_name] = score
elif field_name == "timestamp":
past_loop_obj[field_name] = datetime.now().isoformat()
elif field_name in cognitive_insights:
past_loop_obj[field_name] = cognitive_insights[field_name]
else:
past_loop_obj[field_name] = f"Error rendering {field_name}" # type: ignore[unreachable]
# Always ensure we have the basic required fields for compatibility
past_loop_obj.setdefault("loop_number", loop_number)
past_loop_obj.setdefault("score", score)
past_loop_obj.setdefault("timestamp", datetime.now().isoformat())
past_loop_obj.setdefault("result", safe_result)
return past_loop_obj
def _create_safe_result(self, result: Any) -> Any:
"""Create a safe, serializable version of the result that avoids circular references."""
def _make_safe(obj: Any, seen: Optional[set[int]] = None) -> Any:
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return "<circular_reference>"
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
seen.add(obj_id)
try:
if isinstance(obj, list):
return [_make_safe(item, seen.copy()) for item in obj]
if isinstance(obj, dict):
return {
str(key): _make_safe(value, seen.copy())
for key, value in obj.items()
if key not in ("previous_outputs", "payload")
}
return str(obj)[:1000] + "..." if len(str(obj)) > 1000 else str(obj)
finally:
seen.discard(obj_id)
return _make_safe(result)
def _create_safe_result_with_context(self, result: Any) -> Any:
"""
Create a safe, serializable version of the result that preserves important loop context.
This version preserves agent responses and past_loops data needed for context
in subsequent loop iterations, unlike _create_safe_result which truncates everything.
"""
def _make_safe_with_context(
obj: Any, seen: Optional[set[int]] = None, depth: int = 0
) -> Any:
if seen is None:
seen = set()
# Prevent infinite depth
if depth > 10:
return "<max_depth_reached>"
obj_id = id(obj)
if obj_id in seen:
return "<circular_reference>"
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
seen.add(obj_id)
try:
if isinstance(obj, list):
return [_make_safe_with_context(item, seen.copy(), depth + 1) for item in obj]
if isinstance(obj, dict):
safe_dict = {}
for key, value in obj.items():
key_str = str(key)
# Always preserve past_loops for context
if key_str == "past_loops":
safe_dict[key_str] = _make_safe_with_context(
value, seen.copy(), depth + 1
)
# Preserve agent responses for cognitive debate context
elif any(
agent_type in key_str.lower()
for agent_type in [
"progressive",
"conservative",
"realist",
"purist",
"agreement",
]
):
if isinstance(value, dict):
# For agent dictionaries, preserve response but limit size
agent_dict = {}
for agent_key, agent_value in value.items():
if agent_key == "response":
# Preserve full response for context but limit to reasonable size
response_str = str(agent_value)
if len(response_str) > 2000:
agent_dict[agent_key] = (
response_str[:2000] + "...<truncated_for_safety>"
)
else:
agent_dict[agent_key] = response_str
elif agent_key in ["confidence", "internal_reasoning"]:
# Preserve other important fields
agent_dict[agent_key] = (
str(agent_value)[:500]
if len(str(agent_value)) > 500
else agent_value
)
# Skip large metadata like _metrics, formatted_prompt
safe_dict[key_str] = agent_dict
else:
safe_dict[key_str] = (
str(value)[:1000] if len(str(value)) > 1000 else value
)
# Skip problematic circular references but preserve simple values
elif key_str not in ("previous_outputs", "payload"):
if isinstance(value, (str, int, float, bool, type(None))):
safe_dict[key_str] = value
elif isinstance(value, (dict, list)):
safe_dict[key_str] = _make_safe_with_context(
value, seen.copy(), depth + 1
)
else:
# Convert complex objects to strings with size limit
str_value = str(value)
if len(str_value) > 500:
safe_dict[key_str] = str_value[:500] + "...<truncated>"
else:
safe_dict[key_str] = str_value
return safe_dict
# Convert other objects to strings with size limit
str_obj = str(obj)
return str_obj[:1000] + "..." if len(str_obj) > 1000 else str_obj
finally:
seen.discard(obj_id)
return _make_safe_with_context(result)
async def _clear_loop_cache(self, loop_number: int) -> None:
"""
Clear Redis cache that might cause response duplication between loop iterations.
This ensures that agents in subsequent loops don't reuse cached responses
from previous iterations.
"""
if self.memory_logger is None:
return
try:
# Clear loop-specific caches
cache_patterns = [
f"loop_cache:{self.node_id}:{loop_number}",
f"loop_cache:{self.node_id}:*",
f"agent_cache:{self.node_id}:{loop_number}:*",
f"response_cache:{self.node_id}:{loop_number}:*",
]
for pattern in cache_patterns:
try:
# Use SCAN to find keys matching pattern
cursor = 0
while True:
cursor, keys = self.memory_logger.redis.scan(
cursor, match=pattern, count=100
)
if keys:
self.memory_logger.redis.delete(*keys)
logger.debug(f"Cleared {len(keys)} cache keys matching {pattern}")
if cursor == 0:
break
except Exception as e:
logger.warning(f"Failed to clear cache pattern {pattern}: {e}")
except Exception as e:
logger.warning(f"Failed to clear loop cache for loop {loop_number}: {e}")
def _extract_metadata_field(
self, field: MetadataKey, past_loops: List[PastLoopMetadata], max_entries: int = 5
) -> str:
"""Extract metadata field from past loops."""
values = []
for loop in reversed(past_loops[-max_entries:]):
if field in loop and loop[field]:
values.append(str(loop[field]))
return " | ".join(values)
def _store_in_redis(self, key: str, value: Any) -> None:
"""Safely store a value in Redis."""
if self.memory_logger is not None:
try:
self.memory_logger.set(key, json.dumps(value))
logger.debug(f"- Stored in Redis: {key}")
except Exception as e:
logger.error(f"Failed to store in Redis: {e}")
def _store_in_redis_hash(self, hash_key: str, field: str, value: Any) -> None:
"""Safely store a value in a Redis hash."""
if self.memory_logger is not None:
try:
self.memory_logger.hset(hash_key, field, json.dumps(value))
logger.debug(f"- Stored in Redis hash: {hash_key}[{field}]")
except Exception as e:
logger.error(f"Failed to store in Redis hash: {e}")