orka.memory.kafka_logger module
Kafka Memory Logger Implementation
This file contains the hybrid KafkaMemoryLogger implementation that uses Kafka topics for event streaming and Redis for memory operations. This provides the best of both worlds: Kafka’s event streaming capabilities with Redis’s fast memory operations.
- class orka.memory.kafka_logger.KafkaMemoryLogger(bootstrap_servers: str = 'localhost:9092', redis_url: str | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: Dict[str, Any] | None = None, enable_hnsw: bool = True, vector_params: Dict[str, Any] | None = None)[source]
Bases:
BaseMemoryLogger
A hybrid memory logger that uses Kafka for event streaming and Redis for memory operations.
This implementation combines: - Kafka topics for persistent event streaming and audit trails - Redis for fast memory operations (hset, hget, sadd, etc.) and fork/join coordination
This approach provides both the scalability of Kafka and the performance of Redis.
- __init__(bootstrap_servers: str = 'localhost:9092', redis_url: str | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: Dict[str, Any] | None = None, enable_hnsw: bool = True, vector_params: Dict[str, Any] | None = None) None [source]
Initialize the hybrid Kafka + RedisStack memory logger.
- Parameters:
bootstrap_servers – Kafka bootstrap servers. Defaults to “localhost:9092”.
redis_url – RedisStack connection URL. Defaults to environment variable REDIS_URL.
stream_key – Key for the memory stream. Defaults to “orka:memory”.
debug_keep_previous_outputs – If True, keeps previous_outputs in log files for debugging.
decay_config – Configuration for memory decay functionality.
enable_hnsw – Enable HNSW vector indexing in RedisStack backend.
vector_params – HNSW configuration parameters.
- property redis: Redis
Return Redis client - prefer RedisStack client if available.
- log(agent_id: str, event_type: str, payload: Dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: Dict[str, Any] | None = None, agent_decay_config: Dict[str, Any] | None = None) None [source]
Log an event to both Kafka (for streaming) and Redis (for memory operations).
This hybrid approach ensures events are durably stored in Kafka while also being available in Redis for fast memory operations and coordination.
- hset(name: str, key: str, value: str | bytes | int | float) int [source]
Set a hash field using Redis.
- search_memories(query: str, num_results: int = 10, trace_id: str | None = None, node_id: str | None = None, memory_type: str | None = None, min_importance: float | None = None, log_type: str = 'memory', namespace: str | None = None) List[Dict[str, Any]] [source]
Search memories using RedisStack logger if available, otherwise return empty list.
- log_memory(content: str, node_id: str, trace_id: str, metadata: Dict[str, Any] | None = None, importance_score: float = 1.0, memory_type: str = 'short_term', expiry_hours: float | None = None) str [source]
Log memory using RedisStack logger if available.