orka.memory.kafka_logger module

Kafka Memory Logger Implementation

This file contains the hybrid KafkaMemoryLogger implementation that uses Kafka topics for event streaming and Redis for memory operations. This provides the best of both worlds: Kafka’s event streaming capabilities with Redis’s fast memory operations.

class orka.memory.kafka_logger.KafkaMemoryLogger(bootstrap_servers: str = 'localhost:9092', redis_url: str | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: Dict[str, Any] | None = None, enable_hnsw: bool = True, vector_params: Dict[str, Any] | None = None)[source]

Bases: BaseMemoryLogger

A hybrid memory logger that uses Kafka for event streaming and Redis for memory operations.

This implementation combines: - Kafka topics for persistent event streaming and audit trails - Redis for fast memory operations (hset, hget, sadd, etc.) and fork/join coordination

This approach provides both the scalability of Kafka and the performance of Redis.

__init__(bootstrap_servers: str = 'localhost:9092', redis_url: str | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: Dict[str, Any] | None = None, enable_hnsw: bool = True, vector_params: Dict[str, Any] | None = None) None[source]

Initialize the hybrid Kafka + RedisStack memory logger.

Parameters:
  • bootstrap_servers – Kafka bootstrap servers. Defaults to “localhost:9092”.

  • redis_url – RedisStack connection URL. Defaults to environment variable REDIS_URL.

  • stream_key – Key for the memory stream. Defaults to “orka:memory”.

  • debug_keep_previous_outputs – If True, keeps previous_outputs in log files for debugging.

  • decay_config – Configuration for memory decay functionality.

  • enable_hnsw – Enable HNSW vector indexing in RedisStack backend.

  • vector_params – HNSW configuration parameters.

property redis: Redis

Return Redis client - prefer RedisStack client if available.

log(agent_id: str, event_type: str, payload: Dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: Dict[str, Any] | None = None, agent_decay_config: Dict[str, Any] | None = None) None[source]

Log an event to both Kafka (for streaming) and Redis (for memory operations).

This hybrid approach ensures events are durably stored in Kafka while also being available in Redis for fast memory operations and coordination.

tail(count: int = 10) List[Dict[str, Any]][source]

Retrieve recent events from memory buffer.

hset(name: str, key: str, value: str | bytes | int | float) int[source]

Set a hash field using Redis.

hget(name: str, key: str) str | None[source]

Get a hash field using Redis.

hkeys(name: str) List[str][source]

Get hash keys using Redis.

hdel(name: str, *keys: str) int[source]

Delete hash fields using Redis.

smembers(name: str) List[str][source]

Get set members using Redis.

sadd(name: str, *values: str) int[source]

Add to set using Redis.

srem(name: str, *values: str) int[source]

Remove from set using Redis.

get(key: str) str | None[source]

Get a value using Redis.

set(key: str, value: str | bytes | int | float) bool[source]

Set a value using Redis.

delete(*keys: str) int[source]

Delete keys using Redis.

search_memories(query: str, num_results: int = 10, trace_id: str | None = None, node_id: str | None = None, memory_type: str | None = None, min_importance: float | None = None, log_type: str = 'memory', namespace: str | None = None) List[Dict[str, Any]][source]

Search memories using RedisStack logger if available, otherwise return empty list.

log_memory(content: str, node_id: str, trace_id: str, metadata: Dict[str, Any] | None = None, importance_score: float = 1.0, memory_type: str = 'short_term', expiry_hours: float | None = None) str[source]

Log memory using RedisStack logger if available.

ensure_index() bool[source]

Ensure memory index exists using RedisStack logger if available.

close() None[source]

Close both Kafka producer and Redis connection.

__del__()[source]

Cleanup on object deletion.

cleanup_expired_memories(dry_run: bool = False) Dict[str, Any][source]

Clean up expired memory entries using Redis-based approach.

This delegates to Redis for cleanup while also cleaning the in-memory buffer.

get_memory_stats() Dict[str, Any][source]

Get memory usage statistics from both Redis backend and local memory buffer.