Adding Graph Flows

This guide walks through creating a new graph flow from scratch. All flows follow the same pattern: define nodes, wire them together, register the flow.

Quick Start

A flow needs 3 things:

  1. Nodes — classes that do the work (each extends a base node type)
  2. FlowDefinition — declares the nodes and entry point
  3. Registration — one line in main.py to import the flow

Step 1: Create the Flow File

Create a new file in services/ai-agent/src/autocom_agent/graphs/flows/:

# graphs/flows/order_quality_check.py

from autocom_agent.graphs.engine import FlowDefinition, register_flow
from autocom_agent.graphs.nodes.base import (
    AgentNode, BaseNode, GraphContext, HumanInputNode,
    NodeResult, ToolNode,
)

Step 2: Define Your Nodes

ToolNode — Call a Tool

Use when you need data from Laravel (order lookup, customer data, etc.):

class FetchOrder(ToolNode):
    """Load order details for quality check."""

    def get_tool_calls(self, ctx: GraphContext) -> list[tuple[str, dict]]:
        # Return list of (tool_name, arguments)
        return [("orders.get", {"id": ctx.input_data["order_id"]})]

    def process_results(self, ctx: GraphContext, results: list[dict]) -> NodeResult:
        order = results[0]["result"] if results else {}
        return NodeResult(
            output={"order": order},
            next_node="CheckQuality",  # Name of the next node
        )

Key points:

  • get_tool_calls() returns tool names that match module bus endpoints
  • process_results() receives the tool results and decides the next node
  • next_node is a string matching a node name in the FlowDefinition

AgentNode — AI Reasoning

Use when you need the LLM to analyze, classify, or decide:

class CheckQuality(AgentNode):
    """AI checks order data quality."""

    def build_prompt(self, ctx: GraphContext) -> str:
        # Access previous node outputs via ctx.state
        order = ctx.state["FetchOrder"]["order"]

        return (
            f"Check this order for data quality issues:\n"
            f"- Order: {order.get('order_number')}\n"
            f"- Customer: {order.get('customer_id')}\n"
            f"- Items: {len(order.get('items', []))}\n"
            f"- Total: {order.get('total')}\n\n"
            f"Respond with JSON: "
            f'{{"quality": "good|issues|critical", "issues": ["list"]}}'
        )

    def process_output(self, ctx: GraphContext, agent_output: str) -> NodeResult:
        import json
        try:
            result = json.loads(agent_output.strip())
        except json.JSONDecodeError:
            result = {"quality": "issues", "issues": ["Could not parse"]}

        return NodeResult(
            output=result,
            next_node="RouteByQuality",
        )

Key points:

  • build_prompt() constructs the prompt from accumulated graph state
  • Previous nodes' outputs are at ctx.state["NodeName"]
  • process_output() parses the LLM's text response
  • The agent is automatically created with the tenant's LLM config and tools

BaseNode — Pure Logic

Use for routing decisions without AI or tools:

class RouteByQuality(BaseNode):
    """Route based on quality assessment."""

    async def run(self, ctx: GraphContext) -> NodeResult:
        quality = ctx.state["CheckQuality"].get("quality", "issues")

        if quality == "good":
            return NodeResult(output={"action": "pass"}, next_node=None)  # End flow
        elif quality == "critical":
            return NodeResult(output={"action": "review"}, next_node="ManualReview")
        else:
            return NodeResult(output={"action": "fix"}, next_node="AutoFix")

Key points:

  • next_node=None ends the flow
  • Use this for if/else routing without AI calls (fast, free)

HumanInputNode — Pause for Input

Use when a human needs to review/approve:

class ManualReview(HumanInputNode):
    """Pause for human review of critical quality issues."""

    def get_input_schema(self, ctx: GraphContext) -> dict:
        # JSON Schema for what input the human should provide
        return {
            "type": "object",
            "properties": {
                "decision": {
                    "type": "string",
                    "enum": ["fix", "accept", "cancel"],
                },
                "notes": {"type": "string"},
            },
            "required": ["decision"],
        }

    def get_prompt(self, ctx: GraphContext) -> str:
        # Human-readable description shown in the UI
        issues = ctx.state["CheckQuality"].get("issues", [])
        return f"Critical quality issues found:\n" + "\n".join(f"- {i}" for i in issues)

    def process_input(self, ctx: GraphContext, human_input: dict) -> NodeResult:
        # Called when the human responds (graph resumes)
        decision = human_input["decision"]
        if decision == "cancel":
            return NodeResult(output={"cancelled": True}, next_node=None)
        return NodeResult(
            output={"reviewed": True, "decision": decision},
            next_node="AutoFix",
        )

Key points:

  • First call raises HumanInputRequired (graph pauses, state persisted)
  • Second call (after resume) receives ctx.human_input and processes it
  • get_input_schema() returns JSON Schema — the frontend renders a form from this
  • get_prompt() provides context for the reviewer

Step 3: Register the Flow

Wire the nodes together:

register_flow(FlowDefinition(
    name="order_quality_check",
    description="Check order data quality with AI and optional human review",
    entry_node="FetchOrder",
    nodes={
        "FetchOrder": FetchOrder(),
        "CheckQuality": CheckQuality(),
        "RouteByQuality": RouteByQuality(),
        "ManualReview": ManualReview(),
        "AutoFix": AutoFix(),  # Your implementation
    },
))

Step 4: Import in main.py

# services/ai-agent/src/autocom_agent/main.py

import autocom_agent.graphs.flows.ndr_resolution      # noqa: F401
import autocom_agent.graphs.flows.fraud_review         # noqa: F401
import autocom_agent.graphs.flows.refund_approval      # noqa: F401
import autocom_agent.graphs.flows.order_quality_check   # noqa: F401  # ← new

Rebuild the Docker image and the flow is available.

Step 5: Use It

POST /api/v1/ai/agent/graph/run
{
  "flow_type": "order_quality_check",
  "input_data": {"order_id": "uuid"}
}

How State Flows

Every node's output is stored at ctx.state[node_name]:

# Node execution order: FetchOrder → CheckQuality → RouteByQuality

# Inside CheckQuality:
ctx.state["FetchOrder"]  # ← output from previous node
# {"order": {"id": "...", "total": 2500, ...}}

# Inside RouteByQuality:
ctx.state["FetchOrder"]    # ← still available
ctx.state["CheckQuality"]  # ← just completed
# {"quality": "critical", "issues": ["Missing address"]}

Each node can read all previous nodes' outputs. State accumulates as the graph executes.

Available Input Data

Nodes access the original flow input via ctx.input_data:

order_id = ctx.input_data["order_id"]

This is the input_data from the POST /graph/run request — it never changes during execution.

Error Handling

If a node raises an exception, the graph engine:

  1. Marks the run as failed with the error message
  2. Persists partial results (completed nodes before the failure)
  3. Returns the error in the response

Nodes should handle expected errors gracefully and use NodeResult to route to error-handling nodes when possible.

Testing Flows

# tests/test_quality_flow.py
from autocom_agent.graphs.flows.order_quality_check import FetchOrder, CheckQuality
from autocom_agent.graphs.nodes.base import GraphContext

async def test_fetch_order_routes_to_check():
    ctx = GraphContext(
        tenant_context=mock_context,
        bridge=mock_bridge,
        input_data={"order_id": "test-123"},
    )
    mock_bridge.mock_tool("orders.get", {"id": "test-123", "total": 100})

    node = FetchOrder()
    result = await node.run(ctx)

    assert result.next_node == "CheckQuality"
    assert "order" in result.output