Source code for orka.startup.orchestrator

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Service Orchestrator
===================

This module handles the main orchestration of OrKa services including startup,
monitoring, and shutdown coordination.
"""

import asyncio
import logging
import subprocess
import sys
from typing import Dict

from .backend import start_backend
from .cleanup import cleanup_services
from .config import get_memory_backend
from .infrastructure.health import (
    display_error,
    display_service_endpoints,
    display_shutdown_complete,
    display_shutdown_message,
    display_startup_success,
    monitor_backend_process,
    wait_for_services,
)
from .infrastructure.kafka import start_kafka_docker
from .infrastructure.redis import start_native_redis

logger = logging.getLogger(__name__)


[docs] def start_infrastructure(backend: str) -> Dict[str, subprocess.Popen]: """ Start the infrastructure services natively. Redis will be started as a native process on port 6380. Kafka services will still use Docker when needed. Args: backend: The backend type ('redis', 'redisstack', 'kafka', or 'dual') Returns: Dict[str, subprocess.Popen]: Dictionary of started processes Raises: RuntimeError: If Redis Stack is not available or fails to start subprocess.CalledProcessError: If Kafka Docker services fail to start """ processes = {} print(f"Starting {backend.upper()} backend...") # Always start Redis natively for all backends (except when explicitly using Docker) if backend in ["redis", "redisstack", "kafka", "dual"]: redis_proc = start_native_redis(6380) if redis_proc is not None: processes["redis"] = redis_proc # If redis_proc is None, Redis is running via Docker and managed by Docker daemon # Start Kafka services via Docker only when needed if backend in ["kafka", "dual"]: start_kafka_docker() return processes
[docs] async def main() -> None: """ Main entry point for starting and managing OrKa services. This asynchronous function: 1. Determines which backend to use (Redis, Kafka, or dual) 2. Starts the appropriate infrastructure services (Redis natively, Kafka via Docker) 3. Waits for services to be ready 4. Launches the OrKa backend server 5. Monitors the backend process to ensure it's running 6. Handles graceful shutdown on keyboard interrupt The function runs until interrupted (e.g., via Ctrl+C), at which point it cleans up all started processes and containers. """ # Determine backend type backend = get_memory_backend() # Display startup information display_service_endpoints(backend) # Track all processes for cleanup processes = {} backend_proc = None try: # Start infrastructure processes = start_infrastructure(backend) # Wait for services to be ready wait_for_services(backend) # Start Orka backend backend_proc = start_backend(backend) processes["backend"] = backend_proc display_startup_success() # Monitor processes await monitor_backend_process(backend_proc) except KeyboardInterrupt: display_shutdown_message() except Exception as e: display_error(e) finally: # Always cleanup processes cleanup_services(backend, processes) display_shutdown_complete()
[docs] def run_startup() -> None: """ Run the startup process with proper error handling. This function serves as the main entry point and handles keyboard interrupts and unexpected errors gracefully. """ try: asyncio.run(main()) except KeyboardInterrupt: # Handle any remaining KeyboardInterrupt that might bubble up print("\n🛑 Shutdown complete.") except Exception as e: logger.error(f"Unexpected error: {e}") sys.exit(1)