orka.nodes.rag_node module

class orka.nodes.rag_node.RAGNode(node_id: str, registry: Registry, prompt: str = '', queue: str = 'default', timeout: float | None = 30.0, max_concurrency: int = 10, top_k: int = 5, score_threshold: float = 0.7)[source]

Bases: BaseNode

RAG Node Implementation

A specialized node that performs Retrieval-Augmented Generation (RAG) operations by combining semantic search with language model generation.

Core Functionality

RAG Process: 1. Query Processing: Extract and prepare the input query 2. Embedding Generation: Convert query to vector representation 3. Memory Search: Find relevant documents using semantic similarity 4. Context Formatting: Structure retrieved documents for LLM consumption 5. Answer Generation: Use LLM to generate response based on context

Integration Points: - Memory Backend: Searches for relevant documents using vector similarity - Embedder Service: Generates query embeddings for semantic search - LLM Service: Generates final answers based on retrieved context - Registry System: Accesses shared resources through dependency injection

Architecture Details

Node Configuration: - top_k: Number of documents to retrieve (default: 5) - score_threshold: Minimum similarity score for relevance (default: 0.7) - timeout: Maximum execution time for the operation - max_concurrency: Limit on parallel executions

Resource Management: - Lazy initialization of expensive resources (memory, embedder, LLM) - Registry-based dependency injection for shared services - Automatic resource cleanup and lifecycle management - Thread-safe execution for concurrent operations

Error Handling: - Graceful handling of missing or invalid queries - Fallback responses when no relevant documents found - Structured error reporting with context preservation - Automatic retry logic for transient failures

Implementation Features

Search Capabilities: - Vector similarity search using embeddings - Configurable relevance thresholds - Top-k result limiting for performance - Metadata filtering and namespace support

Context Management: - Intelligent document formatting for LLM consumption - Source attribution and reference tracking - Context length optimization for model limits - Structured output with sources and confidence scores

LLM Integration: - Dynamic prompt construction with retrieved context - Configurable model parameters and settings - Response quality validation and filtering - Token usage tracking and optimization

Usage Examples

Basic Configuration:

agents:
  - id: rag_assistant
    type: rag
    top_k: 5
    score_threshold: 0.7
    timeout: 30.0

Advanced Configuration:

agents:
  - id: specialized_rag
    type: rag
    top_k: 10
    score_threshold: 0.8
    max_concurrency: 5
    llm_config:
      model: "gpt-4"
      temperature: 0.1
      max_tokens: 500

Integration with Memory:

# The node automatically integrates with the memory system
# Memory backend provides semantic search capabilities
# Embedder service generates query vectors
# LLM service generates final responses

Response Format

Successful Response:

{
  "result": {
    "answer": "Generated response based on retrieved context",
    "sources": [
      {
        "content": "Source document content",
        "score": 0.85,
        "metadata": {...}
      }
    ]
  },
  "status": "success",
  "error": null,
  "metadata": {"node_id": "rag_assistant"}
}

Error Response:

{
  "result": null,
  "status": "error",
  "error": "Query is required for RAG operation",
  "metadata": {"node_id": "rag_assistant"}
}

No Results Response:

{
  "result": {
    "answer": "I couldn't find any relevant information to answer your question.",
    "sources": []
  },
  "status": "success",
  "error": null,
  "metadata": {"node_id": "rag_assistant"}
}

Performance Considerations

Optimization Features: - Lazy resource initialization to reduce startup time - Configurable concurrency limits for resource management - Efficient context formatting to minimize token usage - Caching strategies for frequently accessed documents

Scalability: - Supports high-throughput query processing - Memory-efficient document handling - Parallel processing capabilities - Resource pooling for external services

Monitoring: - Execution timing and performance metrics - Search quality and relevance tracking - LLM usage and cost monitoring - Error rate and pattern analysis

async initialize() None[source]

Initialize the node and its resources.

async run(context: Context) dict[str, Any][source]

Run the RAG node with the given context.