orka.nodes.loop_node module

class orka.nodes.loop_node.PastLoopMetadata[source]

Bases: TypedDict

loop_number: int
score: float
timestamp: str
insights: str
improvements: str
mistakes: str
result: Dict[str, Any]
class orka.nodes.loop_node.InsightCategory[source]

Bases: TypedDict

insights: str
improvements: str
mistakes: str
class orka.nodes.loop_node.FloatConvertible(*args, **kwargs)[source]

Bases: Protocol

class orka.nodes.loop_node.LoopNode(node_id: str, prompt: str | None = None, queue: List[Any] | None = None, memory_logger: RedisStackMemoryLogger | None = None, **kwargs: Any)[source]

Bases: BaseNode

A specialized node that executes an internal workflow repeatedly until a condition is met.

The LoopNode enables iterative improvement workflows by running a sub-workflow multiple times, learning from each iteration, and stopping when either a quality threshold is met or a maximum number of iterations is reached.

Key Features:
  • Iterative execution with quality thresholds

  • Cognitive insight extraction from each iteration

  • Learning from past iterations

  • Automatic loop termination based on scores or max iterations

  • Metadata tracking across iterations

max_loops

Maximum number of iterations allowed

Type:

int

score_threshold

Quality score required to stop iteration

Type:

float

score_extraction_pattern

Regex pattern to extract quality scores

Type:

str

cognitive_extraction

Configuration for extracting insights

Type:

dict

past_loops_metadata

Template for tracking iteration data

Type:

dict

internal_workflow

The workflow to execute in each iteration

Type:

dict

Example:

- id: improvement_loop
  type: loop
  max_loops: 5
  score_threshold: 0.85
  score_extraction_pattern: "QUALITY_SCORE:\s*([0-9.]+)"
  cognitive_extraction:
    enabled: true
    extract_patterns:
      insights: ["(?:provides?|shows?)\s+(.+?)(?:\n|$)"]
      improvements: ["(?:lacks?|needs?)\s+(.+?)(?:\n|$)"]
  past_loops_metadata:
    iteration: "{{ loop_number }}"
    score: "{{ score }}"
    insights: "{{ insights }}"
  internal_workflow:
    orchestrator:
      id: improvement-cycle
      agents: [analyzer, scorer]
__init__(node_id: str, prompt: str | None = None, queue: List[Any] | None = None, memory_logger: RedisStackMemoryLogger | None = None, **kwargs: Any) None[source]

Initialize the loop node.

Parameters:
  • node_id (str) – Unique identifier for the node.

  • prompt (Optional[str]) – Prompt or instruction for the node.

  • queue (Optional[List[Any]]) – Queue of agents or nodes to be processed.

  • memory_logger (Optional[RedisStackMemoryLogger]) – The RedisStackMemoryLogger instance.

  • **kwargs – Additional configuration parameters: - max_loops (int): Maximum number of loop iterations (default: 5) - score_threshold (float): Score threshold to meet before continuing (default: 0.8) - high_priority_agents (List[str]): Agent names to check first for scores (default: [“agreement_moderator”, “quality_moderator”, “score_moderator”]) - score_extraction_config (dict): Complete score extraction configuration with strategies - score_extraction_pattern (str): Regex pattern to extract score from results (deprecated, use score_extraction_config) - score_extraction_key (str): Direct key to look for score in result dict (deprecated, use score_extraction_config) - internal_workflow (dict): Complete workflow configuration to execute in loop - past_loops_metadata (dict): Template for past_loops object structure - cognitive_extraction (dict): Configuration for extracting valuable cognitive data

async run(payload: Dict[str, Any]) Dict[str, Any][source]

Execute the loop node with threshold checking.