Adding Graph Flows
This guide walks through creating a new graph flow from scratch. All flows follow the same pattern: define nodes, wire them together, register the flow.
Quick Start
A flow needs 3 things:
- Nodes — classes that do the work (each extends a base node type)
- FlowDefinition — declares the nodes and entry point
- Registration — one line in
main.pyto import the flow
Step 1: Create the Flow File
Create a new file in services/ai-agent/src/autocom_agent/graphs/flows/:
# graphs/flows/order_quality_check.py
from autocom_agent.graphs.engine import FlowDefinition, register_flow
from autocom_agent.graphs.nodes.base import (
AgentNode, BaseNode, GraphContext, HumanInputNode,
NodeResult, ToolNode,
)
Step 2: Define Your Nodes
ToolNode — Call a Tool
Use when you need data from Laravel (order lookup, customer data, etc.):
class FetchOrder(ToolNode):
"""Load order details for quality check."""
def get_tool_calls(self, ctx: GraphContext) -> list[tuple[str, dict]]:
# Return list of (tool_name, arguments)
return [("orders.get", {"id": ctx.input_data["order_id"]})]
def process_results(self, ctx: GraphContext, results: list[dict]) -> NodeResult:
order = results[0]["result"] if results else {}
return NodeResult(
output={"order": order},
next_node="CheckQuality", # Name of the next node
)
Key points:
get_tool_calls()returns tool names that match module bus endpointsprocess_results()receives the tool results and decides the next nodenext_nodeis a string matching a node name in the FlowDefinition
AgentNode — AI Reasoning
Use when you need the LLM to analyze, classify, or decide:
class CheckQuality(AgentNode):
"""AI checks order data quality."""
def build_prompt(self, ctx: GraphContext) -> str:
# Access previous node outputs via ctx.state
order = ctx.state["FetchOrder"]["order"]
return (
f"Check this order for data quality issues:\n"
f"- Order: {order.get('order_number')}\n"
f"- Customer: {order.get('customer_id')}\n"
f"- Items: {len(order.get('items', []))}\n"
f"- Total: {order.get('total')}\n\n"
f"Respond with JSON: "
f'{{"quality": "good|issues|critical", "issues": ["list"]}}'
)
def process_output(self, ctx: GraphContext, agent_output: str) -> NodeResult:
import json
try:
result = json.loads(agent_output.strip())
except json.JSONDecodeError:
result = {"quality": "issues", "issues": ["Could not parse"]}
return NodeResult(
output=result,
next_node="RouteByQuality",
)
Key points:
build_prompt()constructs the prompt from accumulated graph state- Previous nodes' outputs are at
ctx.state["NodeName"] process_output()parses the LLM's text response- The agent is automatically created with the tenant's LLM config and tools
BaseNode — Pure Logic
Use for routing decisions without AI or tools:
class RouteByQuality(BaseNode):
"""Route based on quality assessment."""
async def run(self, ctx: GraphContext) -> NodeResult:
quality = ctx.state["CheckQuality"].get("quality", "issues")
if quality == "good":
return NodeResult(output={"action": "pass"}, next_node=None) # End flow
elif quality == "critical":
return NodeResult(output={"action": "review"}, next_node="ManualReview")
else:
return NodeResult(output={"action": "fix"}, next_node="AutoFix")
Key points:
next_node=Noneends the flow- Use this for if/else routing without AI calls (fast, free)
HumanInputNode — Pause for Input
Use when a human needs to review/approve:
class ManualReview(HumanInputNode):
"""Pause for human review of critical quality issues."""
def get_input_schema(self, ctx: GraphContext) -> dict:
# JSON Schema for what input the human should provide
return {
"type": "object",
"properties": {
"decision": {
"type": "string",
"enum": ["fix", "accept", "cancel"],
},
"notes": {"type": "string"},
},
"required": ["decision"],
}
def get_prompt(self, ctx: GraphContext) -> str:
# Human-readable description shown in the UI
issues = ctx.state["CheckQuality"].get("issues", [])
return f"Critical quality issues found:\n" + "\n".join(f"- {i}" for i in issues)
def process_input(self, ctx: GraphContext, human_input: dict) -> NodeResult:
# Called when the human responds (graph resumes)
decision = human_input["decision"]
if decision == "cancel":
return NodeResult(output={"cancelled": True}, next_node=None)
return NodeResult(
output={"reviewed": True, "decision": decision},
next_node="AutoFix",
)
Key points:
- First call raises
HumanInputRequired(graph pauses, state persisted) - Second call (after resume) receives
ctx.human_inputand processes it get_input_schema()returns JSON Schema — the frontend renders a form from thisget_prompt()provides context for the reviewer
Step 3: Register the Flow
Wire the nodes together:
register_flow(FlowDefinition(
name="order_quality_check",
description="Check order data quality with AI and optional human review",
entry_node="FetchOrder",
nodes={
"FetchOrder": FetchOrder(),
"CheckQuality": CheckQuality(),
"RouteByQuality": RouteByQuality(),
"ManualReview": ManualReview(),
"AutoFix": AutoFix(), # Your implementation
},
))
Step 4: Import in main.py
# services/ai-agent/src/autocom_agent/main.py
import autocom_agent.graphs.flows.ndr_resolution # noqa: F401
import autocom_agent.graphs.flows.fraud_review # noqa: F401
import autocom_agent.graphs.flows.refund_approval # noqa: F401
import autocom_agent.graphs.flows.order_quality_check # noqa: F401 # ← new
Rebuild the Docker image and the flow is available.
Step 5: Use It
POST /api/v1/ai/agent/graph/run
{
"flow_type": "order_quality_check",
"input_data": {"order_id": "uuid"}
}
How State Flows
Every node's output is stored at ctx.state[node_name]:
# Node execution order: FetchOrder → CheckQuality → RouteByQuality
# Inside CheckQuality:
ctx.state["FetchOrder"] # ← output from previous node
# {"order": {"id": "...", "total": 2500, ...}}
# Inside RouteByQuality:
ctx.state["FetchOrder"] # ← still available
ctx.state["CheckQuality"] # ← just completed
# {"quality": "critical", "issues": ["Missing address"]}
Each node can read all previous nodes' outputs. State accumulates as the graph executes.
Available Input Data
Nodes access the original flow input via ctx.input_data:
order_id = ctx.input_data["order_id"]
This is the input_data from the POST /graph/run request — it never changes during execution.
Error Handling
If a node raises an exception, the graph engine:
- Marks the run as
failedwith the error message - Persists partial results (completed nodes before the failure)
- Returns the error in the response
Nodes should handle expected errors gracefully and use NodeResult to route to error-handling nodes when possible.
Testing Flows
# tests/test_quality_flow.py
from autocom_agent.graphs.flows.order_quality_check import FetchOrder, CheckQuality
from autocom_agent.graphs.nodes.base import GraphContext
async def test_fetch_order_routes_to_check():
ctx = GraphContext(
tenant_context=mock_context,
bridge=mock_bridge,
input_data={"order_id": "test-123"},
)
mock_bridge.mock_tool("orders.get", {"id": "test-123", "total": 100})
node = FetchOrder()
result = await node.run(ctx)
assert result.next_node == "CheckQuality"
assert "order" in result.output