CortexFlowManager
The CortexFlowManager is the main entry point for the CortexFlow library. It manages the multi-tier memory system and provides methods for adding messages, generating responses, and managing knowledge.
- class cortexflow.CortexFlowManager(config=None)[source]
Bases:
ContextProviderMain manager class for CortexFlow system. Coordinates between components for memory, knowledge, and external integrations. Implements the ContextProvider interface.
- __init__(config=None)[source]
Initialize the CortexFlowManager with provided configuration.
- Parameters:
config – Configuration for the system, if None, default config is used
- add_knowledge(text: str, source: str = None, confidence: float = None) list[int][source]
Store important knowledge in the knowledge store.
- Parameters:
text – Text to remember
source – Optional source of the knowledge
confidence – Optional confidence value for the knowledge
- Returns:
List of IDs for the stored knowledge
- add_logical_rule(name: str, premise_patterns: list[dict[str, Any]], conclusion_pattern: dict[str, Any], confidence: float = 0.8) bool[source]
Add a logical rule to the inference engine.
- Parameters:
name – Rule name
premise_patterns – List of premise patterns that must be satisfied
conclusion_pattern – The conclusion pattern to infer
confidence – Rule confidence (0.0-1.0)
- Returns:
Success status
- add_message(role: str, content: str, metadata: dict[str, Any] = None) dict[str, Any][source]
Add a message to the conversation.
- Parameters:
role – Message role (e.g., user, assistant, system)
content – Message content
metadata – Optional metadata for the message
- Returns:
Message object that was added
- add_probability_distribution(entity_id: int, relation_id: int, distribution_type: str, distribution_data: dict[str, Any]) None[source]
Add a probability distribution to represent uncertainty about a fact.
- Parameters:
entity_id – Entity ID
relation_id – Relation ID
distribution_type – Type of distribution (discrete, gaussian, etc.)
distribution_data – Data representing the distribution
- answer_why_question(query: str) list[dict[str, Any]][source]
Answer a why-question using backward chaining logical reasoning.
- Parameters:
query – The why question to answer
- Returns:
Explanation steps for the answer
- cache_reasoning_pattern(pattern_key: str, pattern_result: Any) bool[source]
Cache a common reasoning pattern for reuse.
- Parameters:
pattern_key – Unique identifier for the reasoning pattern
pattern_result – Result of the reasoning pattern
- Returns:
True if successful, False otherwise
- clear_performance_caches() dict[str, Any][source]
Clear all performance optimization caches.
- Returns:
Dictionary with cache clearing statistics
- create_hop_indexes(max_hops: int = None) dict[str, Any][source]
Create indexes for multi-hop queries to speed up traversal.
- Parameters:
max_hops – Maximum number of hops to index
- Returns:
Indexing statistics
- create_session(user_id: str, persona_id: str | None = None, metadata: dict[str, Any] | None = None)[source]
Create a new session and make it active.
- detect_contradictions(entity_id=None, relation_type=None, max_results=100) list[dict[str, Any]][source]
Detect contradictions in the knowledge graph.
- Parameters:
entity_id – Optional entity ID to check
relation_type – Optional relation type to check
max_results – Maximum number of results to return
- Returns:
List of detected contradictions
- property emotion_tracker
Return the EmotionTracker instance, or
None.
- property episodic_store
Access the episodic memory store (None if not enabled).
- property event_bus
Access the event bus (None if not enabled).
- generate_hypotheses(observation: str, max_hypotheses: int = None) list[dict[str, Any]][source]
Generate hypotheses to explain an observation using abductive reasoning.
- Parameters:
observation – The observation to explain
max_hypotheses – Maximum number of hypotheses to generate (default uses config)
- Returns:
List of hypotheses that could explain the observation
- generate_novel_implications(iterations: int = None) list[dict[str, Any]][source]
Generate novel implications using forward chaining.
- Parameters:
iterations – Number of forward chaining iterations (default uses config)
- Returns:
List of newly inferred facts
- generate_response(prompt: str = None, model: str = None) str[source]
Generate a response using the conversation context.
- Parameters:
prompt – Optional prompt to use instead of the conversation context
model – Model to use for generation
- Returns:
Generated response
- generate_response_stream(prompt: str = None, model: str = None) Iterator[str][source]
Generate a streaming response using the conversation context.
Note: Chain of Agents processing is not supported in streaming mode. If COA is enabled and the query is complex, this method will run COA synchronously first and then stream the resulting text.
- Parameters:
prompt – Optional prompt to use instead of the conversation context
model – Model to use for generation
- Yields:
Chunks of the generated response
- get_belief_revision_history(entity_id: int = None, relation_id: int = None, limit: int = 10) list[dict[str, Any]][source]
Get the revision history for beliefs about an entity or relation.
- Parameters:
entity_id – Optional entity ID filter
relation_id – Optional relation ID filter
limit – Maximum number of revisions to return
- Returns:
List of belief revisions
- get_cache_stats() dict[str, Any][source]
Get cache statistics including hit rates.
- Returns:
Dictionary with cache statistics
- get_conversation_context(max_tokens: int = None) dict[str, Any][source]
Get the full conversation context for generating a response.
- Parameters:
max_tokens – Maximum tokens for the context
- Returns:
Context with messages and knowledge
- get_dynamic_weighting_stats() dict[str, Any][source]
Get statistics about the dynamic weighting engine.
- Returns:
Dictionary with dynamic weighting statistics or None if not enabled
- get_knowledge(query: str) list[dict[str, Any]][source]
Get relevant knowledge for a query.
- Parameters:
query – Query text
- Returns:
List of relevant knowledge items
- get_performance_stats() dict[str, Any][source]
Get performance statistics from the optimizer.
- Returns:
Dictionary with performance statistics
- get_probability_distribution(entity_id: int, relation_id: int) dict[str, Any] | None[source]
Get the probability distribution for a fact.
- Parameters:
entity_id – Entity ID
relation_id – Relation ID
- Returns:
Probability distribution data or None if not found
- get_recent_episodes(user_id: str | None = None, limit: int = 10) list[source]
Get recent episodes from episodic memory.
- get_source_reliability(source_name: str) float[source]
Get the reliability score for a knowledge source.
- Parameters:
source_name – Name of the source
- Returns:
Reliability score (0.0-1.0)
- get_stats() dict[str, Any][source]
Get system-wide statistics.
- Returns:
Dictionary with various statistics
- get_temporal_facts(subject: str | None = None) list[source]
Get current temporal facts, optionally filtered by subject.
- multi_hop_query(query: str) dict[str, Any][source]
Perform multi-hop reasoning on a query.
- Parameters:
query – The query text
- Returns:
Dictionary with path, entities, score, and other reasoning results
- optimize_path_query(start_entity: str, end_entity: str, max_hops: int = 3, relation_constraints: list[str] = None) dict[str, Any][source]
Optimize a path query between entities using the query planning system.
- Parameters:
start_entity – Starting entity
end_entity – Target entity
max_hops – Maximum path length
relation_constraints – Optional list of allowed relation types
- Returns:
Optimized query plan
- optimize_query(query: dict[str, Any]) dict[str, Any][source]
Generate an optimized query plan for knowledge graph operations.
- Parameters:
query – Dictionary with query parameters
- Returns:
Optimized query plan
- partition_graph(method: str = None, partition_count: int = None) dict[str, Any][source]
Partition the knowledge graph for improved performance.
- Parameters:
method – Partitioning method (louvain, spectral, modularity)
partition_count – Target number of partitions
- Returns:
Partition statistics
- property persona_manager
Return the PersonaManager instance, or
None.
- query(query_text: str) dict[str, Any][source]
General query interface that routes to specialized query methods.
- Parameters:
query_text – The query text
- Returns:
Query result
- reason_with_incomplete_information(query: dict[str, Any], available_knowledge: list[dict[str, Any]]) dict[str, Any][source]
Reason with incomplete information to provide best possible answers.
- Parameters:
query – The query to answer
available_knowledge – Available knowledge to reason with
- Returns:
Reasoning result with confidence and explanation
- property relationship_tracker
Return the RelationshipTracker instance, or
None.
- remember_knowledge(text: str, source: str = None, confidence: float = None) list[int][source]
Store important knowledge in the knowledge store.
DEPRECATED: Use add_knowledge() instead.
- Parameters:
text – Text to remember
source – Optional source of the knowledge
confidence – Optional confidence value for the knowledge
- Returns:
List of IDs for the stored knowledge
- resolve_contradiction(contradiction: dict[str, Any], strategy: str = None) dict[str, Any][source]
Resolve a contradiction using the specified strategy.
- Parameters:
contradiction – Contradiction to resolve
strategy – Resolution strategy (auto, recency, confidence, reliability, or keep_both)
- Returns:
Resolution result
- property safety_pipeline
Access the safety pipeline (None if not enabled).
- search_episodes(query: str, max_results: int = 5) list[source]
Search episodic memory by text query.
- property temporal_manager
Access the temporal fact manager (None if not enabled).
- update_source_reliability(source_name: str, reliability_score: float, metadata: dict[str, Any] = None) None[source]
Update the reliability score for a knowledge source.
- Parameters:
source_name – Name of the source
reliability_score – Reliability score (0.0-1.0)
metadata – Optional metadata about the source
- property user_profile_manager
Return the UserProfileManager instance, or
None.
Core Methods
- CortexFlowManager.__init__(config=None)[source]
Initialize the CortexFlowManager with provided configuration.
- Parameters:
config – Configuration for the system, if None, default config is used
- CortexFlowManager.add_message(role: str, content: str, metadata: dict[str, Any] = None) dict[str, Any][source]
Add a message to the conversation.
- Parameters:
role – Message role (e.g., user, assistant, system)
content – Message content
metadata – Optional metadata for the message
- Returns:
Message object that was added
- CortexFlowManager.generate_response(prompt: str = None, model: str = None) str[source]
Generate a response using the conversation context.
- Parameters:
prompt – Optional prompt to use instead of the conversation context
model – Model to use for generation
- Returns:
Generated response
- CortexFlowManager.remember_knowledge(text: str, source: str = None, confidence: float = None) list[int][source]
Store important knowledge in the knowledge store.
DEPRECATED: Use add_knowledge() instead.
- Parameters:
text – Text to remember
source – Optional source of the knowledge
confidence – Optional confidence value for the knowledge
- Returns:
List of IDs for the stored knowledge
Example Usage
from cortexflow import CortexFlowManager, CortexFlowConfig, MemoryConfig, LLMConfig
# Create a configuration using nested config
config = CortexFlowConfig(
memory=MemoryConfig(
active_token_limit=2000,
working_token_limit=4000,
archive_token_limit=6000,
),
llm=LLMConfig(default_model="llama3"),
)
# Initialize the manager
manager = CortexFlowManager(config)
# Add messages
manager.add_message("system", "You are a helpful assistant.")
manager.add_message("user", "What's the capital of France?")
# Generate a response
response = manager.generate_response()
print(f"Assistant: {response}")
# Close the manager when done
manager.close()