Source code for orka.agents.llm_agents

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
🤖 **LLM Agents** - Cloud-Powered Intelligent Processing
======================================================

This module contains specialized agents that leverage cloud LLMs (OpenAI GPT models)
for sophisticated natural language understanding and generation tasks.

**Core LLM Agent Types:**

🎨 **OpenAIAnswerBuilder**: The master craftsman of responses
- Synthesizes multiple data sources into coherent answers
- Perfect for final response generation in complex workflows
- Handles context-aware formatting and detailed explanations

🎯 **OpenAIClassificationAgent**: The intelligent router
- Classifies inputs into predefined categories with high precision
- Essential for workflow branching and content routing
- Supports complex multi-class classification scenarios

✅ **OpenAIBinaryAgent**: The precise decision maker
- Makes accurate true/false determinations
- Ideal for validation, filtering, and gate-keeping logic
- Optimized for clear yes/no decision points

**Advanced Features:**
- 🧠 **Reasoning Extraction**: Captures internal reasoning from <think> blocks
- 📊 **Cost Tracking**: Automatic token usage and cost calculation
- 🔧 **JSON Parsing**: Robust handling of structured LLM responses
- ⚡ **Error Recovery**: Graceful degradation for malformed responses
- 🎛️ **Flexible Prompting**: Jinja2 template support for dynamic prompts

**Real-world Applications:**
- Customer service with intelligent intent classification
- Content moderation with nuanced decision making
- Research synthesis combining multiple information sources
- Multi-step reasoning workflows with transparent logic
"""

import os
import re
from typing import Optional

from dotenv import load_dotenv
from openai import OpenAI

from .base_agent import LegacyBaseAgent as BaseAgent

# Load environment variables
load_dotenv()

# OpenAI configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("BASE_OPENAI_MODEL", "gpt-3.5-turbo")

# Check if we're running in test mode
PYTEST_RUNNING = os.getenv("PYTEST_RUNNING", "").lower() in ("true", "1", "yes")

# Validate OpenAI API key, except in test environments
if not PYTEST_RUNNING and not OPENAI_API_KEY:
    raise OSError("OPENAI_API_KEY environment variable is required")

# Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY or "dummy_key_for_testing")


def _extract_reasoning(text) -> tuple:
    """Extract reasoning content from <think> blocks."""
    if "<think>" not in text or "</think>" not in text:
        return "", text

    think_pattern = r"<think>(.*?)</think>"
    think_match = re.search(think_pattern, text, re.DOTALL)
    if not think_match:
        return "", text

    reasoning = think_match.group(1).strip()
    cleaned_text = re.sub(think_pattern, "", text, flags=re.DOTALL).strip()
    return reasoning, cleaned_text


def _extract_json_content(text) -> str:
    """Extract JSON content from various formats (code blocks, braces, etc.)."""
    # Try markdown code blocks first
    code_patterns = [r"```(?:json|markdown)?\s*(.*?)```", r"```\s*(.*?)```"]

    for pattern in code_patterns:
        matches = re.findall(pattern, text, re.DOTALL)
        for match in matches:
            match = match.strip()
            if match and match.startswith(("{", "[")):
                return match

    # Try to find JSON-like braces
    brace_match = re.search(r"\{.*\}", text, re.DOTALL)
    return brace_match.group(0) if brace_match else text


def _parse_json_safely(json_content) -> Optional[dict]:
    """Safely parse JSON with fallback for malformed content."""
    import json

    try:
        return json.loads(json_content)
    except json.JSONDecodeError:
        try:
            fixed_json = _fix_malformed_json(json_content)
            return json.loads(fixed_json)
        except:
            return None


def _build_response_dict(parsed_json, fallback_text) -> dict:
    """Build standardized response dictionary from parsed JSON or fallback text."""
    if not parsed_json or not isinstance(parsed_json, dict):
        return {
            "response": fallback_text,
            "confidence": "0.3",
            "internal_reasoning": "Could not parse as JSON, using raw response",
        }

    # Handle perfect structure
    if all(key in parsed_json for key in ["response", "confidence", "internal_reasoning"]):
        return {
            "response": str(parsed_json["response"]),
            "confidence": str(parsed_json["confidence"]),
            "internal_reasoning": str(parsed_json["internal_reasoning"]),
        }

    # Handle task_description structure
    if "task_description" in parsed_json:
        task_desc = parsed_json["task_description"]
        if isinstance(task_desc, dict):
            return {
                "response": str(task_desc.get("response", "")),
                "confidence": str(task_desc.get("confidence", "0.0")),
                "internal_reasoning": str(task_desc.get("internal_reasoning", "")),
            }
        return {
            "response": str(task_desc),
            "confidence": "0.5",
            "internal_reasoning": "Extracted from task_description field",
        }

    # Extract any meaningful content
    return {
        "response": str(
            parsed_json.get(
                "response",
                parsed_json.get("answer", parsed_json.get("result", str(parsed_json))),
            ),
        ),
        "confidence": str(parsed_json.get("confidence", parsed_json.get("score", "0.5"))),
        "internal_reasoning": str(
            parsed_json.get(
                "internal_reasoning",
                parsed_json.get("reasoning", "Parsed from JSON response"),
            ),
        ),
    }


[docs] def parse_llm_json_response( response_text, error_tracker=None, agent_id="unknown", ) -> dict: """ Parse JSON response from LLM that may contain reasoning (<think> blocks) or be in various formats. This parser is specifically designed for local LLMs and reasoning models. It handles reasoning blocks, JSON in code blocks, and malformed JSON. Args: response_text (str): Raw response from LLM error_tracker: Optional error tracking object for silent degradations agent_id (str): Agent ID for error tracking Returns: dict: Parsed response with 'response', 'confidence', 'internal_reasoning' keys """ try: if not response_text or not isinstance(response_text, str): return { "response": str(response_text) if response_text else "", "confidence": "0.0", "internal_reasoning": "Empty or invalid response", } cleaned_text = response_text.strip() # Extract reasoning and clean text reasoning_content, cleaned_text = _extract_reasoning(cleaned_text) # Extract JSON content json_content = _extract_json_content(cleaned_text) # Parse JSON safely parsed_json = _parse_json_safely(json_content) # Track silent degradation if JSON parsing failed if not parsed_json and error_tracker: error_tracker.record_silent_degradation( agent_id, "json_parsing_failure", f"Failed to parse JSON, falling back to raw text: {json_content[:100]}...", ) # Build response dictionary result = _build_response_dict(parsed_json, cleaned_text) # Add reasoning if extracted from <think> blocks if reasoning_content: if not result.get("internal_reasoning"): result["internal_reasoning"] = f"Reasoning: {reasoning_content[:200]}..." else: current = result["internal_reasoning"] result["internal_reasoning"] = ( f"{current} | Reasoning: {reasoning_content[:200]}..." ) return result except Exception as e: # Track silent degradation for parsing errors if error_tracker: error_tracker.record_silent_degradation( agent_id, "parser_exception", f"Parser exception: {e!s}", ) return { "response": str(response_text).strip() if response_text else "[Parse error]", "confidence": "0.0", "internal_reasoning": f"Parser error: {e!s}", }
def _fix_malformed_json(json_str) -> str: """ Attempt to fix common JSON formatting issues. Args: json_str (str): Potentially malformed JSON string Returns: str: Fixed JSON string """ # Remove comments and extra whitespace json_str = re.sub(r"//.*?\n", "\n", json_str) json_str = re.sub(r"/\*.*?\*/", "", json_str, flags=re.DOTALL) # Fix missing commas between fields json_str = re.sub(r'"\s*\n\s*"', '",\n"', json_str) json_str = re.sub(r'}\s*\n\s*"', '},\n"', json_str) # Fix missing quotes around keys json_str = re.sub(r"(\w+):", r'"\1":', json_str) # Fix trailing commas json_str = re.sub(r",\s*}", "}", json_str) json_str = re.sub(r",\s*]", "]", json_str) return json_str def _calculate_openai_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float: """ Calculate approximate cost for OpenAI API usage. Args: model: OpenAI model name prompt_tokens: Number of input tokens completion_tokens: Number of output tokens Returns: Estimated cost in USD """ # Pricing as of January 2025 (per 1K tokens) pricing = { "gpt-4": {"input": 0.03, "output": 0.06}, "gpt-4-turbo": {"input": 0.01, "output": 0.03}, "gpt-4o": {"input": 0.0025, "output": 0.01}, # Updated 2025 pricing "gpt-4o-2024-08-06": {"input": 0.0025, "output": 0.01}, "gpt-4o-2024-11-20": {"input": 0.0025, "output": 0.01}, "gpt-4o-mini": {"input": 0.00015, "output": 0.0006}, "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}, # Updated 2025 pricing "gpt-3.5-turbo-instruct": {"input": 0.0015, "output": 0.002}, "o1": {"input": 0.015, "output": 0.06}, "o1-preview": {"input": 0.015, "output": 0.06}, "o1-mini": {"input": 0.003, "output": 0.012}, "o3": {"input": 0.001, "output": 0.004}, # New 2025 model "o3-mini": {"input": 0.0011, "output": 0.0044}, "o4-mini": {"input": 0.0011, "output": 0.0044}, "gpt-4.1": {"input": 0.002, "output": 0.008}, # New 2025 model "gpt-4.1-mini": {"input": 0.0004, "output": 0.0016}, "gpt-4.1-nano": {"input": 0.0001, "output": 0.0004}, } # Default pricing for unknown models default_pricing = {"input": 0.01, "output": 0.03} # Get pricing for the model (with fallbacks for model variants) # Sort by length descending to match most specific model first model_pricing = None for known_model in sorted(pricing.keys(), key=len, reverse=True): if model.startswith(known_model): model_pricing = pricing[known_model] break if not model_pricing: model_pricing = default_pricing # Calculate cost input_cost = (prompt_tokens / 1000) * model_pricing["input"] output_cost = (completion_tokens / 1000) * model_pricing["output"] total_cost = round(input_cost + output_cost, 6) return total_cost def _simple_json_parse(response_text) -> dict: """ Simple JSON parser for OpenAI models (no reasoning support). Args: response_text (str): Raw response from OpenAI Returns: dict: Parsed JSON with 'response', 'confidence', 'internal_reasoning' keys """ if not response_text or not isinstance(response_text, str): return { "response": str(response_text) if response_text else "", "confidence": "0.0", "internal_reasoning": "Empty or invalid response", } # Try to extract JSON from code blocks first import json # Look for ```json blocks if json_match := re.search(r"```json\s*(.*?)\s*```", response_text, re.DOTALL): json_text = json_match.group(1) elif json_match := re.search(r'\{[^{}]*"response"[^{}]*\}', response_text, re.DOTALL): json_text = json_match.group(0) else: # Fallback: treat entire response as answer return { "response": response_text.strip(), "confidence": "0.5", "internal_reasoning": "Could not parse JSON, using raw response", } try: parsed = json.loads(json_text) if isinstance(parsed, dict) and "response" in parsed: return { "response": str(parsed.get("response", "")), "confidence": str(parsed.get("confidence", "0.5")), "internal_reasoning": str(parsed.get("internal_reasoning", "")), } except json.JSONDecodeError: pass # Final fallback return { "response": response_text.strip(), "confidence": "0.5", "internal_reasoning": "JSON parsing failed, using raw response", }
[docs] class OpenAIAnswerBuilder(BaseAgent): """ 🎨 **The master craftsman of responses** - builds comprehensive answers from complex inputs. **What makes it special:** - **Multi-source Synthesis**: Combines search results, context, and knowledge seamlessly - **Context Awareness**: Understands conversation history and user intent - **Structured Output**: Generates well-formatted, coherent responses - **Template Power**: Uses Jinja2 for dynamic prompt construction - **Cost Optimization**: Tracks token usage and provides cost insights **Perfect for:** - Final answer generation in research workflows - Customer service response crafting - Content creation with multiple input sources - Detailed explanations combining technical and user-friendly language **Example Use Cases:** ```yaml # Comprehensive Q&A system - id: answer_builder type: openai-answer prompt: | Create a comprehensive answer using: - Search results: {{ previous_outputs.web_search }} - User context: {{ previous_outputs.user_profile }} - Classification: {{ previous_outputs.intent_classifier }} Provide a helpful, accurate response that addresses the user's specific needs. ``` **Advanced Features:** - Automatic reasoning extraction from <think> blocks - Confidence scoring for answer quality assessment - JSON response parsing with fallback handling - Template variable resolution with rich context """
[docs] def run(self, input_data) -> dict: """ Generate an answer using OpenAI's GPT model. Args: input_data (dict): Input data containing: - prompt (str): The prompt to use (optional, defaults to agent's prompt) - model (str): The model to use (optional, defaults to OPENAI_MODEL) - temperature (float): Temperature for generation (optional, defaults to 0.7) - parse_json (bool): Whether to parse JSON response (defaults to True) - error_tracker: Optional error tracking object - agent_id (str): Agent ID for error tracking Returns: dict: Returns parsed JSON dict with keys: response, confidence, internal_reasoning, _metrics """ # Extract parameters from input_data prompt = input_data.get("prompt", self.prompt) model = input_data.get("model", OPENAI_MODEL) temperature = float(input_data.get("temperature", 0.7)) parse_json = input_data.get("parse_json", True) error_tracker = input_data.get("error_tracker") agent_id = input_data.get( "agent_id", self.agent_id if hasattr(self, "agent_id") else "unknown", ) self_evaluation = """ # CONSTRAINS - Minimal confidence 0.9 - Exclusively base on evidence and data. - Always follow this JSON schema to return: ```json { "response": "<task response>", "confidence": "<score from 0 to 1 about task performed>", "internal_reasoning": "<a short sentence explaining internal reasoning tha generate the response>" } ``` """ full_prompt = f"{prompt}\n\n{input_data}\n\n{self_evaluation}" # Make API call to OpenAI import time start_time = time.time() status_code = 200 # Default success try: response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": full_prompt}], temperature=temperature, ) except Exception as e: # Track API errors and status codes if error_tracker: # Extract status code if it's an HTTP error if hasattr(e, "status_code"): status_code = e.status_code elif hasattr(e, "code"): status_code = e.code else: status_code = 500 error_tracker.record_error( "openai_api_error", agent_id, f"OpenAI API call failed: {e}", e, status_code=status_code, ) raise # Calculate latency latency_ms = round((time.time() - start_time) * 1000, 2) # Extract usage and cost metrics usage = response.usage prompt_tokens = usage.prompt_tokens if usage else 0 completion_tokens = usage.completion_tokens if usage else 0 total_tokens = usage.total_tokens if usage else 0 # Calculate cost (rough estimates for GPT models) cost_usd = _calculate_openai_cost(model, prompt_tokens, completion_tokens) # Extract and clean the response answer = response.choices[0].message.content.strip() # Create metrics object metrics = { "tokens": total_tokens, "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "latency_ms": latency_ms, "cost_usd": cost_usd, "model": model, "status_code": status_code, } # Parse JSON if requested (simple parsing for OpenAI models) if parse_json: # Simple JSON extraction for OpenAI models (not reasoning models) parsed_response = _simple_json_parse(answer) # Track silent degradation if JSON parsing failed and fell back to raw text if ( error_tracker and parsed_response.get("internal_reasoning") == "JSON parsing failed, using raw response" ): error_tracker.record_silent_degradation( agent_id, "openai_json_parsing_fallback", f"OpenAI response was not valid JSON, using raw text: {answer[:100]}...", ) else: # When JSON parsing is disabled, return raw response in expected format parsed_response = { "response": answer, "confidence": "0.5", "internal_reasoning": "Raw response without JSON parsing", } # Add metrics and formatted_prompt to parsed response parsed_response["_metrics"] = metrics parsed_response["formatted_prompt"] = ( prompt # Store only the rendered prompt, not the full context ) return parsed_response
[docs] class OpenAIBinaryAgent(OpenAIAnswerBuilder): """ ✅ **The precise decision maker** - makes accurate true/false determinations. **Decision-making excellence:** - **High Precision**: Optimized for clear binary classifications - **Context Sensitive**: Considers full context for nuanced decisions - **Confidence Scoring**: Provides certainty metrics for decisions - **Fast Processing**: Streamlined for quick yes/no determinations **Essential for:** - Content moderation (toxic/safe, appropriate/inappropriate) - Workflow gating (proceed/stop, valid/invalid) - Quality assurance (pass/fail, correct/incorrect) - User intent validation (question/statement, urgent/routine) **Real-world scenarios:** ```yaml # Content safety check - id: safety_check type: openai-binary prompt: "Is this content safe for all audiences? {{ input }}" # Search requirement detection - id: needs_search type: openai-binary prompt: "Does this question require current information? {{ input }}" # Priority classification - id: is_urgent type: openai-binary prompt: "Is this request urgent based on content and context? {{ input }}" ``` **Decision Quality:** - Leverages full GPT reasoning capabilities - Provides transparent decision rationale - Handles edge cases and ambiguous inputs gracefully """
[docs] def run(self, input_data) -> bool: """ Make a true/false decision using OpenAI's GPT model. Args: input_data (dict): Input data containing: - prompt (str): The prompt to use (optional, defaults to agent's prompt) - model (str): The model to use (optional, defaults to OPENAI_MODEL) - temperature (float): Temperature for generation (optional, defaults to 0.7) Returns: bool: True or False based on the model's response. """ # Override the parent method to add constraints to the prompt # Ask the model to only return a "true" or "false" response constraints = "**CONSTRAINTS** ONLY and STRICTLY Return boolean 'true' or 'false' value." # Get the original prompt and add constraints original_prompt = input_data.get("prompt", self.prompt) enhanced_prompt = f"{original_prompt}\n\n{constraints}" # Create new input_data with enhanced prompt enhanced_input = input_data.copy() enhanced_input["prompt"] = enhanced_prompt # Store the agent-enhanced prompt with template variables resolved # We need to render the enhanced prompt with the input data to show the actual prompt sent try: from jinja2 import Template template = Template(enhanced_prompt) rendered_enhanced_prompt = template.render(input=input_data.get("input", "")) self._last_formatted_prompt = rendered_enhanced_prompt except: # Fallback: simple replacement if Jinja2 fails self._last_formatted_prompt = enhanced_prompt.replace( "{{ input }}", str(input_data.get("input", "")), ) # Get the answer using the enhanced prompt response_data = super().run(enhanced_input) # Extract answer and preserve metrics and LLM response details if isinstance(response_data, dict): answer = response_data.get("response", "") # Preserve metrics and LLM response details for bubbling up self._last_metrics = response_data.get("_metrics", {}) self._last_response = response_data.get("response", "") self._last_confidence = response_data.get("confidence", "0.0") self._last_internal_reasoning = response_data.get("internal_reasoning", "") else: answer = str(response_data) self._last_metrics = {} self._last_response = answer self._last_confidence = "0.0" self._last_internal_reasoning = "Non-JSON response from LLM" # Convert to binary decision positive_indicators = ["yes", "true", "correct", "right", "affirmative"] for indicator in positive_indicators: if indicator in answer.lower(): return True return False
[docs] class OpenAIClassificationAgent(OpenAIAnswerBuilder): """ 🎯 **The intelligent router** - classifies inputs into predefined categories with precision. **Classification superpowers:** - **Multi-class Intelligence**: Handles complex category systems with ease - **Context Awareness**: Uses conversation history for better classification - **Confidence Metrics**: Provides certainty scores for each classification - **Dynamic Categories**: Supports runtime category adjustment - **Fallback Handling**: Graceful degradation for unknown categories **Essential for:** - Intent detection in conversational AI - Content categorization and routing - Topic classification for knowledge systems - Sentiment and emotion analysis - Domain-specific classification tasks **Classification patterns:** ```yaml # Customer service routing - id: intent_classifier type: openai-classification options: [question, complaint, compliment, request, technical_issue] prompt: "Classify customer intent: {{ input }}" # Content categorization - id: topic_classifier type: openai-classification options: [technology, science, business, entertainment, sports] prompt: "What topic does this article discuss? {{ input }}" # Urgency assessment - id: priority_classifier type: openai-classification options: [low, medium, high, critical] prompt: "Assess priority level based on content and context: {{ input }}" ``` **Advanced capabilities:** - Hierarchical classification support - Multi-label classification for complex content - Confidence thresholding for quality control - Custom category definitions with examples """
[docs] def run(self, input_data) -> str: """ Classify input using OpenAI's GPT model. Args: input_data (dict): Input data containing: - prompt (str): The prompt to use (optional, defaults to agent's prompt) - model (str): The model to use (optional, defaults to OPENAI_MODEL) - temperature (float): Temperature for generation (optional, defaults to 0.7) Returns: str: Category name based on the model's classification. """ # Extract categories from params or use defaults categories = self.params.get("options", []) constrains = "**CONSTRAINS**ONLY Return values from the given options. If not return 'not-classified'" # Get the base prompt base_prompt = input_data.get("prompt", self.prompt) # Create enhanced prompt with categories enhanced_prompt = f"{base_prompt} {constrains}\n Options:{categories}" # Create new input_data with enhanced prompt enhanced_input = input_data.copy() enhanced_input["prompt"] = enhanced_prompt # Store the agent-enhanced prompt with template variables resolved # We need to render the enhanced prompt with the input data to show the actual prompt sent try: from jinja2 import Template template = Template(enhanced_prompt) rendered_enhanced_prompt = template.render(input=input_data.get("input", "")) self._last_formatted_prompt = rendered_enhanced_prompt except: # Fallback: simple replacement if Jinja2 fails self._last_formatted_prompt = enhanced_prompt.replace( "{{ input }}", str(input_data.get("input", "")), ) # Use parent class to make the API call response_data = super().run(enhanced_input) # Extract answer and preserve metrics and LLM response details if isinstance(response_data, dict): answer = response_data.get("response", "") # Preserve metrics and LLM response details for bubbling up self._last_metrics = response_data.get("_metrics", {}) self._last_response = response_data.get("response", "") self._last_confidence = response_data.get("confidence", "0.0") self._last_internal_reasoning = response_data.get("internal_reasoning", "") else: answer = str(response_data) self._last_metrics = {} self._last_response = answer self._last_confidence = "0.0" self._last_internal_reasoning = "Non-JSON response from LLM" return answer