CortexFlowManager

The CortexFlowManager is the main entry point for the CortexFlow library. It manages the multi-tier memory system and provides methods for adding messages, generating responses, and managing knowledge.

class cortexflow.CortexFlowManager(config=None)[source]

Bases: ContextProvider

Main manager class for CortexFlow system. Coordinates between components for memory, knowledge, and external integrations. Implements the ContextProvider interface.

__init__(config=None)[source]

Initialize the CortexFlowManager with provided configuration.

Parameters:

config – Configuration for the system, if None, default config is used

add_knowledge(text: str, source: str = None, confidence: float = None) list[int][source]

Store important knowledge in the knowledge store.

Parameters:
  • text – Text to remember

  • source – Optional source of the knowledge

  • confidence – Optional confidence value for the knowledge

Returns:

List of IDs for the stored knowledge

add_logical_rule(name: str, premise_patterns: list[dict[str, Any]], conclusion_pattern: dict[str, Any], confidence: float = 0.8) bool[source]

Add a logical rule to the inference engine.

Parameters:
  • name – Rule name

  • premise_patterns – List of premise patterns that must be satisfied

  • conclusion_pattern – The conclusion pattern to infer

  • confidence – Rule confidence (0.0-1.0)

Returns:

Success status

add_message(role: str, content: str, metadata: dict[str, Any] = None) dict[str, Any][source]

Add a message to the conversation.

Parameters:
  • role – Message role (e.g., user, assistant, system)

  • content – Message content

  • metadata – Optional metadata for the message

Returns:

Message object that was added

add_probability_distribution(entity_id: int, relation_id: int, distribution_type: str, distribution_data: dict[str, Any]) None[source]

Add a probability distribution to represent uncertainty about a fact.

Parameters:
  • entity_id – Entity ID

  • relation_id – Relation ID

  • distribution_type – Type of distribution (discrete, gaussian, etc.)

  • distribution_data – Data representing the distribution

answer_why_question(query: str) list[dict[str, Any]][source]

Answer a why-question using backward chaining logical reasoning.

Parameters:

query – The why question to answer

Returns:

Explanation steps for the answer

cache_reasoning_pattern(pattern_key: str, pattern_result: Any) bool[source]

Cache a common reasoning pattern for reuse.

Parameters:
  • pattern_key – Unique identifier for the reasoning pattern

  • pattern_result – Result of the reasoning pattern

Returns:

True if successful, False otherwise

clear_context() None[source]

Clear all context data.

clear_memory() None[source]

Clear the conversation memory.

clear_performance_caches() dict[str, Any][source]

Clear all performance optimization caches.

Returns:

Dictionary with cache clearing statistics

close() None[source]

Close and clean up resources.

close_session(session_id: str | None = None) bool[source]

Close a session (defaults to current).

create_hop_indexes(max_hops: int = None) dict[str, Any][source]

Create indexes for multi-hop queries to speed up traversal.

Parameters:

max_hops – Maximum number of hops to index

Returns:

Indexing statistics

create_session(user_id: str, persona_id: str | None = None, metadata: dict[str, Any] | None = None)[source]

Create a new session and make it active.

detect_contradictions(entity_id=None, relation_type=None, max_results=100) list[dict[str, Any]][source]

Detect contradictions in the knowledge graph.

Parameters:
  • entity_id – Optional entity ID to check

  • relation_type – Optional relation type to check

  • max_results – Maximum number of results to return

Returns:

List of detected contradictions

property emotion_tracker

Return the EmotionTracker instance, or None.

property episodic_store

Access the episodic memory store (None if not enabled).

property event_bus

Access the event bus (None if not enabled).

generate_hypotheses(observation: str, max_hypotheses: int = None) list[dict[str, Any]][source]

Generate hypotheses to explain an observation using abductive reasoning.

Parameters:
  • observation – The observation to explain

  • max_hypotheses – Maximum number of hypotheses to generate (default uses config)

Returns:

List of hypotheses that could explain the observation

generate_novel_implications(iterations: int = None) list[dict[str, Any]][source]

Generate novel implications using forward chaining.

Parameters:

iterations – Number of forward chaining iterations (default uses config)

Returns:

List of newly inferred facts

generate_response(prompt: str = None, model: str = None) str[source]

Generate a response using the conversation context.

Parameters:
  • prompt – Optional prompt to use instead of the conversation context

  • model – Model to use for generation

Returns:

Generated response

generate_response_stream(prompt: str = None, model: str = None) Iterator[str][source]

Generate a streaming response using the conversation context.

Note: Chain of Agents processing is not supported in streaming mode. If COA is enabled and the query is complex, this method will run COA synchronously first and then stream the resulting text.

Parameters:
  • prompt – Optional prompt to use instead of the conversation context

  • model – Model to use for generation

Yields:

Chunks of the generated response

get_belief_revision_history(entity_id: int = None, relation_id: int = None, limit: int = 10) list[dict[str, Any]][source]

Get the revision history for beliefs about an entity or relation.

Parameters:
  • entity_id – Optional entity ID filter

  • relation_id – Optional relation ID filter

  • limit – Maximum number of revisions to return

Returns:

List of belief revisions

get_cache_stats() dict[str, Any][source]

Get cache statistics including hit rates.

Returns:

Dictionary with cache statistics

get_context() dict[str, Any][source]

Get the current context for model consumption.

get_conversation_context(max_tokens: int = None) dict[str, Any][source]

Get the full conversation context for generating a response.

Parameters:

max_tokens – Maximum tokens for the context

Returns:

Context with messages and knowledge

get_current_session()[source]

Return the currently active session context, or None.

get_dynamic_weighting_stats() dict[str, Any][source]

Get statistics about the dynamic weighting engine.

Returns:

Dictionary with dynamic weighting statistics or None if not enabled

get_knowledge(query: str) list[dict[str, Any]][source]

Get relevant knowledge for a query.

Parameters:

query – Query text

Returns:

List of relevant knowledge items

get_performance_stats() dict[str, Any][source]

Get performance statistics from the optimizer.

Returns:

Dictionary with performance statistics

get_probability_distribution(entity_id: int, relation_id: int) dict[str, Any] | None[source]

Get the probability distribution for a fact.

Parameters:
  • entity_id – Entity ID

  • relation_id – Relation ID

Returns:

Probability distribution data or None if not found

get_recent_episodes(user_id: str | None = None, limit: int = 10) list[source]

Get recent episodes from episodic memory.

get_source_reliability(source_name: str) float[source]

Get the reliability score for a knowledge source.

Parameters:

source_name – Name of the source

Returns:

Reliability score (0.0-1.0)

get_stats() dict[str, Any][source]

Get system-wide statistics.

Returns:

Dictionary with various statistics

get_temporal_facts(subject: str | None = None) list[source]

Get current temporal facts, optionally filtered by subject.

multi_hop_query(query: str) dict[str, Any][source]

Perform multi-hop reasoning on a query.

Parameters:

query – The query text

Returns:

Dictionary with path, entities, score, and other reasoning results

optimize_path_query(start_entity: str, end_entity: str, max_hops: int = 3, relation_constraints: list[str] = None) dict[str, Any][source]

Optimize a path query between entities using the query planning system.

Parameters:
  • start_entity – Starting entity

  • end_entity – Target entity

  • max_hops – Maximum path length

  • relation_constraints – Optional list of allowed relation types

Returns:

Optimized query plan

optimize_query(query: dict[str, Any]) dict[str, Any][source]

Generate an optimized query plan for knowledge graph operations.

Parameters:

query – Dictionary with query parameters

Returns:

Optimized query plan

partition_graph(method: str = None, partition_count: int = None) dict[str, Any][source]

Partition the knowledge graph for improved performance.

Parameters:
  • method – Partitioning method (louvain, spectral, modularity)

  • partition_count – Target number of partitions

Returns:

Partition statistics

property persona_manager

Return the PersonaManager instance, or None.

query(query_text: str) dict[str, Any][source]

General query interface that routes to specialized query methods.

Parameters:

query_text – The query text

Returns:

Query result

reason_with_incomplete_information(query: dict[str, Any], available_knowledge: list[dict[str, Any]]) dict[str, Any][source]

Reason with incomplete information to provide best possible answers.

Parameters:
  • query – The query to answer

  • available_knowledge – Available knowledge to reason with

Returns:

Reasoning result with confidence and explanation

property relationship_tracker

Return the RelationshipTracker instance, or None.

remember_knowledge(text: str, source: str = None, confidence: float = None) list[int][source]

Store important knowledge in the knowledge store.

DEPRECATED: Use add_knowledge() instead.

Parameters:
  • text – Text to remember

  • source – Optional source of the knowledge

  • confidence – Optional confidence value for the knowledge

Returns:

List of IDs for the stored knowledge

reset_dynamic_weighting() None[source]

Reset dynamic weighting to default values.

resolve_contradiction(contradiction: dict[str, Any], strategy: str = None) dict[str, Any][source]

Resolve a contradiction using the specified strategy.

Parameters:
  • contradiction – Contradiction to resolve

  • strategy – Resolution strategy (auto, recency, confidence, reliability, or keep_both)

Returns:

Resolution result

property safety_pipeline

Access the safety pipeline (None if not enabled).

search_episodes(query: str, max_results: int = 5) list[source]

Search episodic memory by text query.

switch_session(session_id: str)[source]

Switch to an existing session.

property temporal_manager

Access the temporal fact manager (None if not enabled).

update_source_reliability(source_name: str, reliability_score: float, metadata: dict[str, Any] = None) None[source]

Update the reliability score for a knowledge source.

Parameters:
  • source_name – Name of the source

  • reliability_score – Reliability score (0.0-1.0)

  • metadata – Optional metadata about the source

property user_profile_manager

Return the UserProfileManager instance, or None.

Core Methods

CortexFlowManager.__init__(config=None)[source]

Initialize the CortexFlowManager with provided configuration.

Parameters:

config – Configuration for the system, if None, default config is used

CortexFlowManager.add_message(role: str, content: str, metadata: dict[str, Any] = None) dict[str, Any][source]

Add a message to the conversation.

Parameters:
  • role – Message role (e.g., user, assistant, system)

  • content – Message content

  • metadata – Optional metadata for the message

Returns:

Message object that was added

CortexFlowManager.generate_response(prompt: str = None, model: str = None) str[source]

Generate a response using the conversation context.

Parameters:
  • prompt – Optional prompt to use instead of the conversation context

  • model – Model to use for generation

Returns:

Generated response

CortexFlowManager.remember_knowledge(text: str, source: str = None, confidence: float = None) list[int][source]

Store important knowledge in the knowledge store.

DEPRECATED: Use add_knowledge() instead.

Parameters:
  • text – Text to remember

  • source – Optional source of the knowledge

  • confidence – Optional confidence value for the knowledge

Returns:

List of IDs for the stored knowledge

CortexFlowManager.close() None[source]

Close and clean up resources.

Example Usage

from cortexflow import CortexFlowManager, CortexFlowConfig, MemoryConfig, LLMConfig

# Create a configuration using nested config
config = CortexFlowConfig(
    memory=MemoryConfig(
        active_token_limit=2000,
        working_token_limit=4000,
        archive_token_limit=6000,
    ),
    llm=LLMConfig(default_model="llama3"),
)

# Initialize the manager
manager = CortexFlowManager(config)

# Add messages
manager.add_message("system", "You are a helpful assistant.")
manager.add_message("user", "What's the capital of France?")

# Generate a response
response = manager.generate_response()
print(f"Assistant: {response}")

# Close the manager when done
manager.close()