Source code for orka.nodes.fork_node

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning
from .base_node import BaseNode


[docs] class ForkNode(BaseNode): """ A node that splits the workflow into parallel branches. Can handle both sequential and parallel execution of agent branches. """
[docs] def __init__(self, node_id, prompt=None, queue=None, memory_logger=None, **kwargs): """ Initialize the fork node. Args: node_id (str): Unique identifier for the node. prompt (str, optional): Prompt or instruction for the node. queue (list, optional): Queue of agents or nodes to be processed. memory_logger: Logger for tracking node state. **kwargs: Additional configuration parameters. """ super().__init__(node_id=node_id, prompt=prompt, queue=queue, **kwargs) self.memory_logger = memory_logger self.targets = kwargs.get("targets", []) # Store the fork branches self.config = kwargs # Store config explicitly self.mode = kwargs.get("mode", "sequential") # Default to sequential execution
[docs] async def run(self, orchestrator, context): """ Execute the fork operation by creating parallel branches. Args: orchestrator: The orchestrator instance managing the workflow. context: Context data for the fork operation. Returns: dict: Status and fork group information. Raises: ValueError: If no targets are specified. """ targets = self.config.get("targets", []) if not targets: raise ValueError( f"ForkNode '{self.node_id}' requires non-empty 'targets' list." ) # Generate a unique ID for this fork group fork_group_id = orchestrator.fork_manager.generate_group_id(self.node_id) all_flat_agents = [] # Store all agents in a flat list # Process each branch in the targets for branch in self.targets: if isinstance(branch, list): # Branch is a sequence - only queue the FIRST agent now first_agent = branch[0] if self.mode == "sequential": # For sequential mode, only queue the first agent orchestrator.enqueue_fork([first_agent], fork_group_id) orchestrator.fork_manager.track_branch_sequence( fork_group_id, branch ) else: # For parallel mode, queue all agents orchestrator.enqueue_fork(branch, fork_group_id) all_flat_agents.extend(branch) else: # Single agent, flat structure (fallback) orchestrator.enqueue_fork([branch], fork_group_id) all_flat_agents.append(branch) # Create the fork group with all agents orchestrator.fork_manager.create_group(fork_group_id, all_flat_agents) # Store fork group mapping and agent list using backend-agnostic methods self.memory_logger.hset( f"fork_group_mapping:{self.node_id}", "group_id", fork_group_id ) self.memory_logger.sadd(f"fork_group:{fork_group_id}", *all_flat_agents) return {"status": "forked", "fork_group": fork_group_id}