Skip to content

API Reference

This document provides a complete API reference for the MASFactory framework, including detailed descriptions of all core classes, methods, and interfaces.

Usage Guide

  • Click the left navigation bar to quickly locate the corresponding module
  • Each class includes detailed constructor parameter descriptions and usage examples
  • Method parameters and return values have complete type annotations
  • Use Ctrl + F to quickly search for specific APIs

Version Information

This document corresponds to MASFactory v1.0.0

Core Modules

The core modules contain the basic components of the MASFactory framework, which are essential components for building any workflow.

Node Class

Base Node Class

Node is the abstract base class for all computing units in MASFactory, providing basic functionality for node variable management, message passing, and execution control.

python
class Node(ABC):
    def __init__(self,
                name: str,
                pull_keys: dict[str,dict|str] | None = None,
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Node name, used to identify this node in logs
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneInitial node variables of the node

Important Properties

PropertyTypeDescription
namestrNode name (read-only)
in_edgeslist[Edge]List of all incoming edges (read-only)
out_edgeslist[Edge]List of all outgoing edges (read-only)
input_keysdict[str,dict|str]Merged result of all incoming edge keys (read-only)
output_keysdict[str,dict|str]Merged result of all outgoing edge keys (read-only)
is_readyboolCheck if the node is ready for execution (read-only)
gateGateOpen/closed state of the node (read-only)

Core Methods

execute()
python
def execute(self, outer_env: dict[str,object] | None = None) -> None

Execute the complete process of the node.

Execution Steps:

  1. Update node variables
  2. Aggregate input messages from all incoming edges
  3. Call _forward method to process input
  4. Distribute output to all outgoing edges
  5. Update node variables

Parameters:

  • outer_env: Node variables of the external environment
_forward() [Abstract Method]
python
@abstractmethod
def _forward(self, input: dict[str,object]) -> dict[str,object]

Core computation logic of the node, must be implemented by subclasses.

Parameters:

  • input: Dictionary payload aggregated from incoming edges

Returns:

  • dict[str,object]: Dictionary payload to be dispatched to outgoing edges

Node Variable Processing Rules

  • pull_keys is None: Inherit all node variables from outer node
  • pull_keys is not None: Extract according to specified fields from outer node variables
  • pull_keys is empty dict: Do not inherit any outer node variables

Edge Class

Edge Connection Class

Edge connects two Nodes and is responsible for flow control and message passing.

python
class Edge:
    def __init__(self,
                sender: Node,
                receiver: Node,
                keys: dict[str,dict|str] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
senderNode-Node that sends messages
receiverNode-Node that receives messages
keysdict[str,dict|str] | NoneNoneMessage field mapping; defaults to {\"message\": \"\"}

Important Properties

PropertyTypeDescription
keysdict[str,dict|str]Key description mapping of the edge (read-only)
is_congestedboolCheck if the edge is congested (has unreceived messages) (read-only)
gateGateOpen/closed state of the edge (read-only)

Core Methods

send_message()
python
def send_message(self, message: dict[str,object]) -> None

Send message to the edge, waiting for the receiving node to retrieve.

Parameters:

  • message: Message dictionary to send

Exceptions:

  • RuntimeError: If the edge is already congested
  • KeyError: If any required key in edge.keys is missing in message
receive_message()
python
def receive_message() -> dict[str,object]

Receive message from the edge and clear congestion status.

Returns:

  • dict[str,object]: Received message dictionary

Exceptions:

  • RuntimeError: If the edge is not congested

Message Class

Message Base Class

Message is the abstract base class for all messages, providing a unified interface for message content.

python
class Message(ABC):
    def __init__(self)

Core Methods

add()
python
def __add__(self, other: Message) -> Message

Concatenate two message objects.

Parameters:

  • other: Another message to concatenate

Returns:

  • Message: Concatenated message
content()
python
def content() -> str

Get the string representation of the message.

Returns:

  • str: Message content
dict()
python
def dict() -> dict

Get the internal dictionary object of the message.

Returns:

  • dict: Dictionary representation of the message

Note

Modifying the returned dictionary object will affect the message content.


JsonMessage Class

JSON Format Message

JsonMessage is a JSON format message implementation, inheriting from the Message class.

python
class JsonMessage(Message):
    def __init__(self, content: dict)

Constructor Parameters

ParameterTypeDescription
contentdictContent of the message, represented as a Python dictionary

Special Methods

add()
python
def __add__(self, other: Message) -> JsonMessage

Merge the content of two messages.

Merge Rules:

  • If there are duplicate keys, values in other will override values in the current message
  • Return type remains JsonMessage

Example:

python
msg1 = JsonMessage({"x": 1, "z": 4})
msg2 = JsonMessage({"x": 2, "y": 3})
result = msg1 + msg2
# Result: {"x": 2, "y": 3, "z": 4}

MessageFormatter Class

Message Formatter Base Class

MessageFormatter is the abstract base class for all message formatters, implementing the singleton pattern.

python
class MessageFormatter(ABC):
    def __init__(self)

Core Methods

format() [Abstract Method]
python
@abstractmethod
def format(self, message: str, key_description: dict) -> Message

Format raw message string into a specific message object.

Parameters:

  • message: Raw message string
  • key_description: Dictionary describing the message structure

Returns:

  • Message: Formatted message object

Exceptions:

  • NotImplementedError: Subclasses must implement this method

JsonMessageFormatter Class

JSON Message Formatter

JsonMessageFormatter is a JSON format message formatter that converts JSON strings to JsonMessage objects.

python
class JsonMessageFormatter(MessageFormatter):
    def __init__(self)

Core Methods

format()
python
def format(self, message: str, key_description: dict) -> JsonMessage

Parse JSON string and validate its structure meets requirements.

Parameters:

  • message: JSON string message
  • key_description: Dictionary describing the message structure

Returns:

  • JsonMessage: JsonMessage object containing validated data

Exceptions:

  • KeyError: If required keys are missing
  • json.JSONDecodeError: If message is not valid JSON format

Features:

  • Supports extracting JSON content from ```json code blocks
  • Only extracts fields present in key_description
  • Validates missing required fields
format_to_json()
python
def format_to_json(self, message: str) -> dict

Convert message string to JSON dictionary.

Parameters:

  • message: Raw string, supports text containing ```json code blocks

Returns:

  • dict: Parsed JSON object

Exceptions:

  • json.JSONDecodeError: Thrown when string is not valid JSON

Component Modules

Agent Class

Agent Node

Agent is the basic computational unit in the graph, encapsulating large language models, instructions, tools, and memory modules.

python
class Agent(Node):
    def __init__(
        self,
        name: str,
        instructions: str | list[str],
        *,
        model: Model,
        formatters: list[MessageFormatter] | MessageFormatter | None = None,
        max_retries: int | None = 3,
        retry_delay: int | None = 1,
        retry_backoff: int | None = 2,
        prompt_template: str | list[str] | None = None,
        tools: list[Callable] | None = None,
        memories: list[Memory] | None = None,
        retrievers: list[Retrieval] | None = None,
        pull_keys: dict[str, dict|str] | None = {},
        push_keys: dict[str, dict|str] | None = {},
        model_settings: dict | None = None,
        role_name: str | None = None,
        attributes: dict[str, object] | None = None,
        hide_unused_fields: bool = False,
    )

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Unique identifier of Agent in the graph
instructionsstr | list[str]-Agent instructions defining its behavior and tasks
modelModel-Model adapter used to drive the Agent (required, keyword-only)
formattersMessageFormatter | list[MessageFormatter] | NoneNoneInput/output formatters (single = shared; two = [in, out])
max_retriesint | None3Maximum retries for model calls
retry_delayint | None1Base delay multiplier for exponential backoff retries
retry_backoffint | None2Exponential backoff base
prompt_templatestr | list[str] | NoneNonePrompt template
toolslist[Callable] | NoneNoneList of tool functions
memorieslist[Memory] | NoneNoneList of memory adapters
retrieverslist[Retrieval] | NoneNoneRetrieval adapters (RAG/MCP, etc.)
pull_keys`dict[str,dictstr] | None`{}
push_keys`dict[str,dictstr] | None`{}
model_settingsdict | NoneNoneAdditional parameters passed to the model
role_namestrNoneRole name of the Agent
attributesdict[str,object] | NoneNoneInitial local attributes for the agent
hide_unused_fieldsboolFalseWhether to omit unused fields when formatting prompts

Supported model_settings Parameters

ParameterTypeRangeDescription
temperaturefloat[0.0, 2.0]Temperature parameter controlling output randomness
max_tokensint-Maximum number of output tokens
top_pfloat[0.0, 1.0]Nucleus sampling parameter
stoplist[str]-List of tokens to stop generation

Usage Example

python
agent = Agent(
    name="writer",
    model=model,
    instructions="Write concise JSON answers",
    tools=[web_search],
    memories=[conversation_memory],
    pull_keys={"topic": "Current topic"},
    push_keys={"last_answer": "Latest answer"}
)

Tool Call Handling

When tools are provided, the model may produce tool call responses. Agent will automatically call and backfill results, then ask the model again until final content is returned.


BaseGraph Class

Base Graph Class

BaseGraph is the foundation for all graph types, providing basic functionality for node management and edge connections.

python
class BaseGraph(Node):
    def __init__(self, 
                name: str, 
                pull_keys: dict[str,dict|str] | None = None, 
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None,
                build_func: Callable | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Name of the graph, used to identify this graph
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneInitial node variables of the graph
build_funcCallable | NoneNoneOptional build callback executed before child build() (signature: (graph: BaseGraph) -> None)

Core Methods

create_node()
python
def create_node(self, cls: type[Node] | NodeTemplate, *args, **kwargs) -> Node

Create a new node in the graph.

Parameters:

  • cls: Type of node to create, must be a subclass of Node
  • *args: Positional arguments passed to node constructor
  • **kwargs: Keyword arguments passed to node constructor

Returns:

  • Node: Created node instance

Exceptions:

  • TypeError: If cls is not a Node subclass or NodeTemplate
  • ValueError: If the node name is invalid/duplicated, or if the type is restricted (RootGraph / SingleAgent)

Restrictions:

  • Cannot create RootGraph type nodes
  • Cannot create SingleAgent type nodes
create_edge()
python
def create_edge(self,
               sender: Node,
               receiver: Node,
               keys: dict[str, dict|str] | None = None) -> Edge

Create an edge between two nodes.

Parameters:

  • sender: Node that sends messages
  • receiver: Node that receives messages
  • keys: Key dictionary defining message field mapping

Returns:

  • Edge: Created edge instance

Exceptions:

  • ValueError: If nodes are not in the graph, or creating edge would form cycles or duplicate edges

Safety Checks:

  • Loop detection
  • Duplicate edge detection
build()
python
def build() -> None

Build the graph and all its child nodes.

check_built()
python
def check_built() -> bool

Check if the graph is built.

Returns:

  • bool: Returns True if the graph and all its child nodes are built

LogicSwitch Class

Logic Branch Node

LogicSwitch is a node that routes input to different output edges based on conditions, similar to switch statements in programming languages.

python
class LogicSwitch(Node):
    def __init__(self, 
                name: str, 
                pull_keys: dict[str,dict|str] | None = None, 
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None,
                routes: dict[str, Callable[[dict, dict[str,object]], bool]] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Name of the node
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneInitial local attributes for this node
routesdict[str, Callable] | NoneNoneOptional declarative routes: {receiver_node_name: predicate} (compiled during build())

Core Methods

condition_binding()
python
def condition_binding(self, 
                     condition: Callable[[dict, dict[str,object]], bool], 
                     out_edge: Edge) -> None

Bind an output edge with a condition callback function.

Parameters:

  • condition: Function that receives the aggregated input message (dict) and node attributes (dict) and returns a boolean
  • out_edge: Output edge to associate with the condition

Exceptions:

  • ValueError: If the edge is already bound
  • ValueError: If out_edge is not in the node's output edges

Usage Example

python
# Create logic switch
switch = graph.create_node(LogicSwitch, "content_router")

# Create two target nodes
positive_handler = graph.create_node(Agent, 
    name="positive_handler",
    model=model,
    instructions="Handle positive content"
)

negative_handler = graph.create_node(Agent,
    name="negative_handler", 
    model=model,
    instructions="Handle negative content"
)

# Create output edges
e1 = graph.create_edge(switch, positive_handler, {"content": "Content"})
e2 = graph.create_edge(switch, negative_handler, {"content": "Content"})

# Bind conditions
switch.condition_binding(
    lambda message, attrs: "positive" in str(message.get("content", "")).lower(), 
    e1
)
switch.condition_binding(
    lambda message, attrs: "negative" in str(message.get("content", "")).lower(), 
    e2
)

Routing Logic

  • When LogicSwitch executes, it evaluates each condition
  • Messages are sent to all edges where conditions are true
  • Supports multi-path output, one input can be sent to multiple output edges simultaneously

Loop Class

Loop Graph Structure

Loop is a special graph structure that implements loop logic, allowing repeated execution of subgraphs until termination conditions are met.

python
class Loop(BaseGraph):
    def __init__(self,
                name: str,
                max_iterations: int = 10,
                model: Model | None = None,
                terminate_condition_prompt: str | None = None,
                terminate_condition_function: Callable | None = None,
                pull_keys: dict[str,dict|str] | None = None,
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None,
                initial_messages: dict[str,object] | None = None,
                edges: list[tuple[str,str] | tuple[str,str,dict[str,dict|str]]] | None = None,
                nodes: list[tuple] | None = None,
                build_func: Callable | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Name of the loop
max_iterationsint10Maximum number of loop iterations
modelModel | NoneNoneLLM adapter for evaluating termination conditions
terminate_condition_promptstr | NoneNonePrompt for LLM to evaluate termination conditions
terminate_condition_functionCallable | NoneNoneTermination predicate (takes precedence over terminate_condition_prompt)
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneInitial attributes of the loop graph
initial_messagesdict[str,object] | NoneNoneInitial messages injected into the internal control flow
edgeslist[tuple] | NoneNoneDeclarative edge list (sender, receiver[, keys])
nodeslist[tuple] | NoneNoneDeclarative node list (e.g. (name, NodeTemplate) or higher-order structures)
build_funcCallable | NoneNoneOptional build callback (signature: (graph: BaseGraph) -> None)

Internal Structure

Loop contains special control nodes internally:

  • Controller: Controls maximum loop count and evaluates termination conditions at the beginning of each loop
  • TerminateNode: Used to exit the loop during execution (equivalent to break statement)

Special Methods

edge_from_controller()
python
def edge_from_controller(self, 
                        receiver: Node, 
                        keys: dict[str, dict|str] | None = None) -> Edge

Create an edge from the internal Controller to a specified node.

edge_to_controller()
python
def edge_to_controller(self,
                      sender: Node,
                      keys: dict[str, dict|str] | None = None) -> Edge

Create an edge from a specified node to the internal Controller.

edge_to_terminate_node()
python
def edge_to_terminate_node(self,
                          sender: Node,
                          keys: dict[str, dict|str] | None = None) -> Edge

Create an edge from a specified node to TerminateNode for early loop exit.

Usage Example

python
# Create loop graph
loop = graph.create_node(Loop,
    name="data_processing_loop",
    max_iterations=5,
    model=model,
    terminate_condition_prompt="Check if expected results have been achieved"
)

# Create processing node within loop
processor = loop.create_node(Agent,
    name="processor",
    model=model,
    instructions="Process data and check if continuation is needed"
)

# Establish loop connections
loop.edge_from_controller(processor, {"data": "Data to process"})
loop.edge_to_controller(processor, {"result": "Processing result"})

Loop Connection Rules

  1. Nodes within Loop must connect to internal controller through edge_from_controller and edge_to_controller
  2. Nodes not connected to controller will not participate in loop execution
  3. edge_to_terminate_node is optional, used for early loop exit
  4. Loop ends when maximum iterations are reached or termination conditions are met

Graph Class

Standard Graph Implementation

Graph is the standard implementation of base graph, providing entry and exit nodes, supporting construction of complex node networks.

python
class Graph(BaseGraph):
    def __init__(self, name: str, 
                pull_keys: dict[str,dict|str] | None = None, 
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None,
                edges: list[tuple[str,str] | tuple[str,str,dict[str,dict|str]]] | None = None,
                nodes: list[tuple] | None = None,
                build_func: Callable | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Name of the graph, used for log identification
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneDefault node variables
edgeslist[tuple] | NoneNoneDeclarative edge list (sender, receiver[, keys])
nodeslist[tuple] | NoneNoneDeclarative node list (e.g. (name, NodeTemplate) or higher-order structures)
build_funcCallable | NoneNoneOptional build callback (signature: (graph: BaseGraph) -> None)

Core Methods

edge_from_entry(receiver, keys)

Create an edge from entry node to specified node.

edge_to_exit(sender, keys)

Create an edge from specified node to exit node.

Features

  • Entry/Exit Nodes: Automatically creates EntryNode and ExitNode
  • Polling Execution: Executes through polling ready nodes until exit is ready
  • Flexible Connections: Supports arbitrarily complex node connection patterns

Removed Component

AutoGraph has been removed from current MASFactory versions.


RootGraph Class

Root Graph Implementation

RootGraph is the outermost graph that can be directly instantiated and invoked by users.

python
class RootGraph(Graph):
    def __init__(self,
                name: str,
                attributes: dict[str,object] | None = None,
                edges: list[tuple[str,str] | tuple[str,str,dict[str,dict|str]]] | None = None,
                nodes: list[tuple[str, NodeTemplate]] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Name of the graph
attributesdict[str,object] | NoneNoneInitial node variables of the graph
edgeslist[tuple] | NoneNoneDeclarative edge list (sender, receiver[, keys])
nodeslist[tuple[str, NodeTemplate]] | NoneNoneDeclarative node list [(name, NodeTemplate), ...]

Core Methods

invoke(input, attributes=None)

Start executing RootGraph.

  • input (dict): System input, needs to align with incoming edge keys
  • attributes (dict | None): Runtime attributes merged into graph attributes before execution
  • Returns: tuple[dict, dict] - (output_dict, attributes_dict)

Usage Example

python
graph = RootGraph("demo")
# ... create nodes/edges ...
graph.build()
out, attrs = graph.invoke({"question": "hi"})

SingleAgent Class

Single Agent

SingleAgent is a simplified, independent Agent for executing single tasks, can be used independently of Graph.

python
class SingleAgent(Agent):
    def __init__(self,
                name: str,
                model: Model,
                instructions: str | list[str],
                prompt_template: str | list[str] | None = None,
                max_retries: int = 3,
                retry_delay: int = 1,
                retry_backoff: int = 2,
                tools: list[Callable] = None,
                memories: list[Memory] | None = None,
                retrievers: list[Retrieval] | None = None,
                model_settings: dict | None = None,
                role_name: str = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Node name
modelModel-Model adapter
instructionsstr | list[str]-Agent instructions (system prompt)
prompt_templatestr | list[str] | NoneNonePrompt template (user prompt)
max_retriesint3Maximum retries for model calls
retry_delayint1Base delay multiplier for exponential backoff retries
retry_backoffint2Exponential backoff base
toolslist[Callable]NoneAvailable tools list
memorieslist[Memory] | NoneNoneMemory modules list
retrieverslist[Retrieval] | NoneNoneRetrieval adapters (RAG/MCP, etc.)
model_settingsdict | NoneNoneModel invocation parameters
role_namestr | NoneNoneRole name

Features

  • Independent Use: Can be used independently of graph structure
  • Simplified Interface: Provides simpler invoke method
  • Complete Functionality: Supports full functionality including tool calls, memory management

DynamicAgent Class

Dynamic Agent

DynamicAgent can dynamically adjust instructions based on input, supporting runtime behavior configuration.

python
class DynamicAgent(Agent):
    def __init__(self,
                name: str,
                model: Model,
                default_instructions: str = "",
                tools: list[Callable] = None,
                formatters: list[MessageFormatter] | MessageFormatter = None,
                max_retries: int = 3,
                retry_delay: int = 1,
                retry_backoff: int = 2,
                pull_keys: dict[str,dict|str] | None = {},
                push_keys: dict[str,dict|str] | None = {},
                instruction_key: str = "instructions",
                role_name: str = None,
                prompt_template: str = None,
                model_settings: dict | None = None,
                memories: list[Memory] = None,
                retrievers: list[Retrieval] = None,
                attributes: dict[str,object] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Node name
modelModel-Model adapter
default_instructionsstr""Default instructions
instruction_keystr"instructions"Key name for dynamic instructions
formattersMessageFormatter | list[MessageFormatter] | NoneNoneInput/output formatters (same semantics as Agent)
max_retriesint3Maximum retries for model calls
retry_delayint1Base delay multiplier for exponential backoff retries
retry_backoffint2Exponential backoff base
pull_keys`dict[str,dictstr] | None`{}
push_keys`dict[str,dictstr] | None`{}
memorieslist[Memory] | NoneNoneMemory adapters
retrieverslist[Retrieval] | NoneNoneRetrieval adapters (RAG/MCP, etc.)
attributesdict[str,object] | NoneNoneInitial local attributes for the agent

Features

  • Dynamic Instructions: Can update instructions at runtime through incoming edge messages
  • Flexible Configuration: Supports custom instruction key names
  • Complete Functionality: Inherits all functionality from Agent

AgentSwitch Class

Agent Router

AgentSwitch is an LLM-based switch node: bind each out edge with a natural-language condition; the model evaluates the input and routes to matched edges.

python
class AgentSwitch(BaseSwitch[str]):
    def __init__(self,
                name: str,
                model: Model,
                pull_keys: dict[str,dict|str] | None = None,
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None,
                routes: dict[str,str] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Agent name
modelModel-LLM adapter for evaluating conditions
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneInitial local attributes for this node
routesdict[str,str] | NoneNoneOptional declarative routes: {receiver_node_name: condition_text} (compiled during build())

Core Methods

condition_binding(condition, edge)

Bind condition description for output edge.

  • condition (str): Condition description text
  • edge (Edge): Output edge to bind

Usage Example

python
sw = AgentSwitch("router", model)
e1 = graph.create_edge(sw, agent1, {"x": "Solution A"})
e2 = graph.create_edge(sw, agent2, {"x": "Solution B"})
sw.condition_binding("Answer contains keyword yes", e1)
sw.condition_binding("Answer contains keyword no", e2)

CustomNode Class

Custom Node

CustomNode allows users to implement custom computation logic through callback functions, an important way to extend MASFactory functionality.

python
class CustomNode(Node):
    def __init__(self,
                name: str,
                forward: Callable[..., dict[str,object]] | None = None,
                memories: list[Memory] | None = None,
                tools: list[Callable] | None = None,
                retrievers: list[Retrieval] | None = None,
                pull_keys: dict[str,dict|str] | None = None,
                push_keys: dict[str,dict|str] | None = None,
                attributes: dict[str,object] | None = None)

Constructor Parameters

ParameterTypeDefaultDescription
namestr-Name of the node
forwardCallable | NoneNoneCustom forward function
memorieslist[Memory] | NoneNoneAvailable memory for current node
toolslist[Callable] | NoneNoneAvailable tools for current node
retrieverslist[Retrieval] | NoneNoneRetrieval adapters available to this node
pull_keys`dict[str,dictstr] | None`None
push_keys`dict[str,dictstr] | None`None
attributesdict[str,object] | NoneNoneInitial local attributes for this node

Forward Callback Function

The core of CustomNode is the forward callback function that defines the node's computation logic. The callback function supports multiple parameter combinations:

python
# 1 parameter: input data only
def simple_forward(input_data):
    return {"result": f"Processed: {input_data}"}

# 2 parameters: input data + node variables
def forward_with_attributes(input_data, attributes):
    count = attributes.get("count", 0) + 1
    attributes["count"] = count
    return {"result": f"Processing #{count}: {input_data}"}

# 3 parameters: input data + node variables + memory
def forward_with_memory(input_data, attributes, memories):
    if memories:
        memories[0].insert("last_input", str(input_data))
    return {"result": f"Processed with memory: {input_data}"}

# 4 parameters: input data + node variables + memory + tools
def forward_with_tools(input_data, attributes, memories, tools):
    # Can call tools
    return {"result": f"Processed with tools: {input_data}"}

# 5 parameters: input data + node variables + memory + tools + retrievers
def forward_with_retrievers(input_data, attributes, memories, tools, retrievers):
    return {"result": f"Processed with retrievers: {input_data}"}

# 6 parameters: input data + node variables + memory + tools + retrievers + node object
def forward_full(input_data, attributes, memories, tools, retrievers, node):
    return {"result": f"Node {node.name} processed: {input_data}"}

Core Methods

set_forward()
python
def set_forward(self, forward_callback: Callable) -> None

Dynamically set custom forward function.

Parameters:

  • forward_callback: Callback function with same parameter structure as forward in constructor

Usage Example

python
def custom_processor(input_data, attributes, memories, tools, retrievers, node):
    """
    Custom processing function example
    """
    # Implement custom logic
    result = perform_custom_logic(input_data)
    
    # Can access and modify node variables
    attributes["processing_count"] = attributes.get("processing_count", 0) + 1
    
    # Can use memory and tools
    if memories:
        memories[0].insert("last_input", str(input_data))
    
    return {"result": result}

# Create custom node
custom_node = graph.create_node(CustomNode,
    name="custom_processor",
    forward=custom_processor,
    memories=[history_memory],
    tools=[search_tool]
)

# Or dynamically set callback
custom_node = graph.create_node(CustomNode, name="dynamic_node")
custom_node.set_forward(custom_processor)

Callback Function Parameters

  • If no forward function is provided, the node will pass input directly to output
  • The number of parameters in the callback function determines the number of parameters passed to the function
  • Supports callback functions with 1-6 parameters

Model

Model Class

Model Adapter Base Class

Model is the abstract base class for unified interface to interact with various large language models.

python
class Model(ABC):
    def __init__(self,
                model_name: str | None = None,
                invoke_settings: dict | None = None,
                *args, **kwargs)

Constructor Parameters

ParameterTypeDefaultDescription
model_namestr | NoneNoneModel name
invoke_settingsdict | NoneNoneDefault invocation settings

Important Properties

PropertyTypeDescription
model_namestrName of the model (read-only)
descriptionstrDescription of the model (read-only)

Core Methods

invoke() [Abstract Method]
python
@abstractmethod
def invoke(self,
          messages: list[dict],
          tools: list[dict] | None,
          settings: dict | None = None,
          **kwargs) -> dict

Invoke large language model and get response.

Parameters:

  • messages: List containing conversation history
  • tools: Optional tools list
  • settings: Model-specific parameters
  • **kwargs: Other parameters

Returns:

  • dict: Dictionary containing response type and content

Return Format:

python
# Content response
{"type": ModelResponseType.CONTENT, "content": "..."}

# Tool call response
{"type": ModelResponseType.TOOL_CALL, "content": [
    {"id": str|None, "name": str, "arguments": dict}, ...
]}

OpenAIModel Class

OpenAI Model Adapter

OpenAIModel implements the model adapter for interacting with OpenAI API.

python
class OpenAIModel(Model):
    def __init__(self,
                model_name: str,
                api_key: str,
                base_url: str | None = None,
                invoke_settings: dict | None = None,
                **kwargs)

Constructor Parameters

ParameterTypeDefaultDescription
model_namestr-OpenAI model name (e.g., "gpt-4o-mini")
api_keystr-OpenAI API key
base_urlstr | NoneNoneAPI base URL
invoke_settingsdict | NoneNoneDefault invocation settings

Common usage

It is common to read credentials/model name from environment variables and pass them explicitly:

python
import os
from masfactory import OpenAIModel

model = OpenAIModel(
    model_name=os.getenv("OPENAI_MODEL_NAME", "gpt-4o-mini"),
    api_key=os.getenv("OPENAI_API_KEY", ""),
    base_url=os.getenv("OPENAI_BASE_URL") or os.getenv("BASE_URL") or None,
)

Supported Settings Parameters

ParameterTypeRangeDescription
temperaturefloat[0.0, 2.0]Control output randomness
max_tokensint-Maximum token count
top_pfloat[0.0, 1.0]Nucleus sampling parameter
stoplist[str]-Stop token list

AnthropicModel Class

Anthropic Model Adapter

AnthropicModel implements the model adapter for interacting with Anthropic Claude API.

python
class AnthropicModel(Model):
    def __init__(self,
                model_name: str,
                api_key: str,
                base_url: str | None = None,
                invoke_settings: dict | None = None,
                **kwargs)

Constructor Parameters

ParameterTypeDefaultDescription
model_namestr-Anthropic model name (e.g., "claude-3-opus-20240229")
api_keystr-Anthropic API key
base_urlstr | NoneNoneAPI base URL (optional)
invoke_settingsdict | NoneNoneDefault invocation settings

Supported Models

  • claude-3-opus-20240229
  • claude-3-sonnet-20240229
  • claude-3-haiku-20240307

GeminiModel Class

Google Gemini Model Adapter

GeminiModel implements the model adapter for interacting with Google Gemini API.

python
class GeminiModel(Model):
    def __init__(self,
                model_name: str,
                api_key: str,
                base_url: str | None = None,
                invoke_settings: dict | None = None,
                **kwargs)

Constructor Parameters

ParameterTypeDefaultDescription
model_namestr-Gemini model name (e.g., "gemini-pro")
api_keystr-Google AI API key
base_urlstr | NoneNoneAPI base URL (optional)
invoke_settingsdict | NoneNoneDefault invocation settings

Supported Models

  • gemini-pro
  • gemini-pro-vision
  • gemini-1.5-pro

Memory System

Memory Class (ContextBlock injection)

Memory = a writable context source

In the current MASFactory API, memories do not expose the legacy query(...) -> str interface.
Instead, a Memory acts as a context source (ContextProvider) and returns structured ContextBlocks via get_blocks(...), which Agents inject into the user payload as a CONTEXT field during Observe.

python
class Memory(ContextProvider, ABC):
    def __init__(self, context_label: str, *, passive: bool = True, active: bool = False)
    def insert(self, key: str, value: str)
    def update(self, key: str, value: str)
    def delete(self, key: str, index: int = -1)
    def reset(self)
    def get_blocks(self, query: ContextQuery, *, top_k: int = 8) -> list[ContextBlock]

Key semantics

  • context_label: source label (used in rendering and debugging)
  • passive=True: auto-inject into CONTEXT
  • active=True: exposed as tools for on-demand retrieval (retrieve_context)

For details, see: /guide/context_adapters.


HistoryMemory Class (chat history)

HistoryMemory

HistoryMemory stores chat history and injects it as chat-style messages (between system and user).
It does not emit ContextBlocks (get_blocks(...) always returns empty).

python
class HistoryMemory(Memory, HistoryProvider):
    def __init__(self, top_k: int = 10, memory_size: int = 1000, context_label: str = "CONVERSATION_HISTORY")
    def insert(self, role: str, response: str)
    def get_messages(self, query: ContextQuery | None = None, *, top_k: int = -1) -> list[dict]

top_k convention

  • top_k=-1: use the instance default configured in __init__
  • top_k=0: return as many as possible (bounded by memory_size)
  • top_k<0: return empty

Example

python
from masfactory import HistoryMemory

memory = HistoryMemory(top_k=10, memory_size=50)
memory.insert("user", "Hello, tell me about MASFactory")
memory.insert("assistant", "Sure.")

print(memory.get_messages(top_k=2))

VectorMemory Class (semantic memory)

VectorMemory

VectorMemory ranks stored items by embedding cosine similarity and injects them into CONTEXT as ContextBlocks.

python
class VectorMemory(Memory):
    def __init__(
        self,
        embedding_function: Callable[[str], np.ndarray],
        top_k: int = 10,
        query_threshold: float = 0.8,
        memory_size: int = 20,
        context_label: str = "SEMANTIC_KNOWLEDGE",
        *,
        passive: bool = True,
        active: bool = False,
    )

Constructor Parameters

ParameterTypeDefaultDescription
embedding_functionCallable[[str], np.ndarray]-Function that maps text to an embedding vector
top_kint10Default number of blocks to return when injecting context
query_thresholdfloat0.8Similarity threshold
memory_sizeint20Max number of stored items
context_labelstr"SEMANTIC_KNOWLEDGE"Context source label

Notes

  • get_blocks(...) uses ContextQuery.query_text as the retrieval query (best-effort extracted by Agents).
  • Returned blocks include a similarity score for debugging.

Legacy note

If you see older docs mentioning KeyValueMemory / SummaryMemory / StorageVectorMemory: those types are not part of the current API surface.


Enumeration Types

ModelResponseType

Model Response Type

Defines enumeration for large language model response types.

python
class ModelResponseType(Enum):
    CONTENT = "content"      # Plain text content
    TOOL_CALL = "tool_call"  # Tool call request

Enumeration Values

ValueLiteralDescription
TOOL_CALL"tool_call"Indicates model's response is one or more tool call requests
CONTENT"content"Indicates model's response is plain text content

Gate

Gate State

Defines open/closed state for nodes and edges.

python
class Gate(Enum):
    CLOSED = "CLOSED"  # Closed state
    OPEN = "OPEN"      # Open state

Tool System

ToolAdapter Class

Tool Adapter

ToolAdapter manages a set of callable tool functions and can convert them to JSON Schema format required by LLM.

python
class ToolAdapter:
    def __init__(self, tools: list[Callable])

Constructor Parameters

ParameterTypeDescription
toolslist[Callable]List of callable functions managed as tools

Important Properties

details
python
@property
def details(self) -> dict

Generate detailed information for all registered tools in JSON Schema format.

Returns:

  • dict: List containing descriptions of all tools, each description includes "name", "description", and "parameters"

Features:

  • Automatic introspection of function signatures and docstrings
  • Supports type mapping for Optional/Union/List/Dict etc.
  • Builds descriptions compliant with LLM function call specifications

Core Methods

call()
python
def call(self, name: str, arguments: dict) -> str

Call tool by name and arguments.

Parameters:

  • name: Name of the tool to call (function name)
  • arguments: Parameter dictionary passed to tool function

Returns:

  • str: Return value after tool function execution

Tool Function Specifications

Tool functions need to follow these specifications to ensure correct JSON Schema generation:

python
def web_search(query: str, max_results: int = 5) -> str:
    """
    Search for information on the web
    
    Args:
        query (str): Search keywords
        max_results (int): Maximum number of results, default is 5
        
    Returns:
        str: Text description of search results
    """
    # Implement search logic
    results = perform_web_search(query, max_results)
    return format_search_results(results)

def calculate_statistics(numbers: list[float]) -> dict:
    """
    Calculate statistical information for a list of numbers
    
    Args:
        numbers (list[float]): List of numbers
        
    Returns:
        dict: Statistical information including mean, max, min values
    """
    import statistics
    return {
        "mean": statistics.mean(numbers),
        "median": statistics.median(numbers),
        "max": max(numbers),
        "min": min(numbers),
        "std_dev": statistics.stdev(numbers)
    }

Usage Example

python
# Define tool functions
tools = [web_search, calculate_statistics]

# Create tool adapter
tool_adapter = ToolAdapter(tools)

# Get tool details (JSON Schema format)
tool_details = tool_adapter.details

# Manually call tool
result = tool_adapter.call("web_search", {
    "query": "artificial intelligence", 
    "max_results": 3
})

# Use tools in Agent
agent = graph.create_node(Agent,
    name="tool_agent",
    model=model,
    instructions="You are an assistant with multiple tool capabilities",
    tools=tools
)

Supported Type Mapping

Python TypeJSON Schema Type
str{"type": "string"}
int{"type": "integer"}
float{"type": "number"}
bool{"type": "boolean"}
list[T]{"type": "array", "items": <T mapping>}
dict{"type": "object"}
Optional[T]Union type handling
Union[T1, T2, ...]{"anyOf": [<T1 mapping>, <T2 mapping>, ...]}

Tool Function Best Practices

  1. Complete Type Annotations: Ensure all parameters and return values have type annotations
  2. Detailed Docstrings: Provide clear function descriptions and parameter explanations
  3. Error Handling: Add appropriate error handling in tool functions
  4. Consistent Return Format: Maintain consistency in tool function return formats

Retrieval Module (RAG / Retrieval)

Retrieval Class (ContextBlock injection)

Retrieval = a read-only context source

In the current MASFactory API, retrievers (RAG) return structured ContextBlocks via get_blocks(...). Agents inject selected blocks into the user payload as a CONTEXT field during Observe.

python
class Retrieval(ContextProvider, ABC):
    def __init__(self, context_label: str, *, passive: bool = True, active: bool = False)
    def get_blocks(self, query: ContextQuery, *, top_k: int = 8) -> list[ContextBlock]

top_k convention (built-ins)

  • top_k=0: return as many as possible
  • top_k<0: return empty

For more on passive vs active retrieval (tools), see: /guide/context_adapters.


VectorRetriever Class

Vector Retrieval Implementation

VectorRetriever retrieves relevant documents based on vector embeddings and similarity search.

python
class VectorRetriever(Retrieval):
    def __init__(
        self,
        documents: dict[str, str],
        embedding_function: Callable[[str], np.ndarray],
        *,
        similarity_threshold: float = 0.7,
        context_label: str = "VECTOR_RETRIEVER",
        passive: bool = True,
        active: bool = False,
    )

Constructor Parameters

ParameterTypeDefaultDescription
documentsdict[str, str]-Mapping from document ID to document content
embedding_functionCallable[[str], np.ndarray]-Text → embedding function
similarity_thresholdfloat0.7Similarity threshold
context_labelstr"VECTOR_RETRIEVER"Context source label

Features

  • Vector Embeddings: Pre-compute vector embeddings for all documents
  • Cosine Similarity: Use cosine similarity to calculate relevance between query and documents
  • Efficient Retrieval: Fast retrieval based on vector similarity

FileSystemRetriever Class

File System Retrieval Implementation

FileSystemRetriever loads documents from file system and supports vector retrieval with caching capabilities.

python
class FileSystemRetriever(Retrieval):
    def __init__(
        self,
        docs_dir: str | Path,
        embedding_function: Callable[[str], np.ndarray],
        *,
        file_extension: str = ".txt",
        similarity_threshold: float = 0.7,
        cache_path: str | Path | None = None,
        context_label: str = "FILESYSTEM_RETRIEVER",
        passive: bool = True,
        active: bool = False,
    )

Constructor Parameters

ParameterTypeDefaultDescription
docs_dirstr-Document directory path
embedding_functionCallable[[str], np.ndarray]-Text → embedding function
file_extensionstr".txt"File extension to load
similarity_thresholdfloat0.7Similarity threshold
cache_pathstr | Path | NoneNoneEmbedding cache file path
context_labelstr"FILESYSTEM_RETRIEVER"Context source label

Features

  • File System Scanning: Automatically scan document files in specified directory
  • Caching Mechanism: Support persistent caching of embedding vectors
  • Flexible Configuration: Support various file extensions and directory structures

SimpleKeywordRetriever Class

::: Info Keyword Retrieval Implementation SimpleKeywordRetriever uses keyword matching for document retrieval, suitable for simple scenarios. :::

python
class SimpleKeywordRetriever(Retrieval):
    def __init__(
        self,
        documents: dict[str, str],
        *,
        context_label: str = "KEYWORD_RETRIEVER",
        passive: bool = True,
        active: bool = False,
    )

Constructor Parameters

ParameterTypeDefaultDescription
documentsdict[str, str]-Mapping from document ID to document content
context_labelstr"KEYWORD_RETRIEVER"Context source label

Features

  • Keyword Matching: Calculate relevance based on simple word frequency statistics
  • Lightweight Implementation: No need for vector embeddings, low computational overhead
  • Quick Deployment: Suitable for small document sets or prototype development

MCP (external context sources)

MCP Class

MCP = integrate external context via a callable

MCP is a lightweight ContextProvider. You provide a callable that returns items, and MASFactory maps them into ContextBlocks and injects them into CONTEXT.

python
class MCP(ContextProvider):
    def __init__(
        self,
        *,
        name: str = "MCP",
        call: Callable[[ContextQuery, int], Iterable[dict[str, Any]]],
        text_key: str = "text",
        uri_key: str = "uri",
        chunk_id_key: str = "chunk_id",
        score_key: str = "score",
        title_key: str = "title",
        metadata_key: str = "metadata",
        dedupe_key_key: str = "dedupe_key",
        passive: bool = True,
        active: bool = False,
    )

Example (Observe-only injection)

python
from masfactory import Agent
from masfactory.adapters.context.types import ContextQuery
from masfactory.adapters.mcp import MCP

def call(query: ContextQuery, top_k: int):
    return [{"text": f"[MCP] {query.query_text}", "uri": "mcp://demo"}]

mcp_provider = MCP(name="DemoMCP", call=call, passive=True, active=False)

agent = Agent(
    name="demo",
    model=object(),
    instructions="You are a concise assistant.",
    prompt_template="{query}",
    retrievers=[mcp_provider],
)

_, user_prompt, _ = agent.observe({"query": "What is MCP?"})
print(user_prompt)