Graph Flows
Graph flows are multi-step AI workflows where each step is a node that either calls tools, runs an AI agent, or pauses for human input. The graph engine executes nodes sequentially, persists state at each step, and can pause/resume across server restarts.
When to Use Graph Flows
| Use Case | Use Chat | Use Graph Flow |
|---|---|---|
| "Where is my order?" | Yes | No |
| "Process this NDR with approval" | No | Yes |
| "Score this order for fraud" | No | Yes |
| "Evaluate and process this refund" | No | Yes |
| Any multi-step workflow with gates | No | Yes |
Rule of thumb: If it needs human approval, multiple tool calls in sequence, or conditional branching — use a graph flow.
Built-in Flows
NDR Resolution
Handles non-delivery reports through investigation, AI analysis, and optional approval:
FetchOrderData → AnalyzeNDR → DecideAction → [ApproveAction] → ExecuteAction
↑
High-value (>₹5000) or
low AI confidence (<70%)
POST /api/v1/ai/agent/graph/run
{
"flow_type": "ndr_resolution",
"input_data": {
"order_id": "uuid",
"ndr_reason": "customer_unavailable",
"delivery_attempts": 2,
"customer_phone": "+919876543210"
}
}
Possible outcomes: REATTEMPT (schedule re-delivery), RTO (return to origin), CONTACT (reach out to customer), ESCALATE (manual handling).
Fraud Review
Scores orders for fraud risk with AI and routes for review:
AssessOrder → ScoreFraudRisk → FraudDecision → [ReviewFraud] → TakeAction
│
Low risk (<0.3) → auto-approve
Medium (0.3-0.7) → human review
High (>0.7) → auto-hold + review
POST /api/v1/ai/agent/graph/run
{
"flow_type": "fraud_review",
"input_data": {
"order_id": "uuid"
}
}
Possible outcomes: APPROVE (confirm order), HOLD (keep pending for investigation), CANCEL (cancel the order).
Refund Approval
Evaluates refund requests against policy:
LoadRefundContext → EvaluateRefund → RefundDecision → [ApproveRefund] → ProcessRefund
│
Amount ≤ ₹500 + return approved → auto-process
Amount > ₹500 or edge case → human approval
POST /api/v1/ai/agent/graph/run
{
"flow_type": "refund_approval",
"input_data": {
"order_id": "uuid",
"refund_amount": 2500,
"reason": "Product defective",
"return_id": "return-uuid"
}
}
Possible outcomes: APPROVE (full refund), PARTIAL (reduced amount), DENY (rejected).
Graph Run Lifecycle
1. Start a Flow
POST /api/v1/ai/agent/graph/run
{
"flow_type": "ndr_resolution",
"input_data": { ... }
}
Response (completed — no approval needed):
{
"run_id": "uuid",
"flow_type": "ndr_resolution",
"status": "completed",
"node_results": [
{"node_name": "FetchOrderData", "status": "completed", "output": {...}},
{"node_name": "AnalyzeNDR", "status": "completed", "output": {...}},
{"node_name": "DecideAction", "status": "completed", "output": {...}},
{"node_name": "ExecuteAction", "status": "completed", "output": {...}}
],
"final_output": {"action_taken": "REATTEMPT", "message": "NDR resolved: REATTEMPT"},
"total_tokens": 1250,
"total_cost": 0.0185
}
Response (paused — needs approval):
{
"run_id": "uuid",
"flow_type": "ndr_resolution",
"status": "paused",
"current_node": "ApproveAction",
"requires_human_input": true,
"human_input_schema": {
"type": "object",
"properties": {
"decision": {"type": "string", "enum": ["approve", "reject", "override"]},
"override_action": {"type": "string", "enum": ["REATTEMPT", "RTO", "CONTACT", "ESCALATE"]},
"notes": {"type": "string"}
},
"required": ["decision"]
},
"human_input_prompt": "NDR Resolution requires approval.\n\nOrder: ORD-123 (Total: 8500)\nAI Recommendation: REATTEMPT (confidence: 62%)..."
}
2. Resume with Human Input
POST /api/v1/ai/agent/graph/resume
{
"run_id": "uuid-from-paused-response",
"human_input": {
"decision": "approve",
"notes": "Customer confirmed availability"
}
}
The graph resumes from the paused node, processes the human input, and continues to completion.
3. Check Status
GET /api/v1/ai/agent/graph/{run_id}/status
4. List Runs
GET /api/v1/ai/agent/graph/runs?flow_type=ndr_resolution&status=paused
How the Engine Works
The graph engine is the core orchestrator. It's flow-agnostic — it runs any registered flow definition:
- Start: Creates a run record in tenant DB, begins at the entry node
- Execute: Runs each node, merges output into shared
statedict - Persist: After each node, saves state + result to tenant DB via callback
- Pause: If a
HumanInputNoderaisesHumanInputRequired, persists state and returns what input is needed - Resume: Loads persisted state, injects human input, continues from the paused node
- Complete: Marks run as completed with final output
- Safety: 20-node execution limit prevents infinite loops
State Flow
Each node's output is stored in state[node_name]:
# After FetchOrderData completes:
state = {
"FetchOrderData": {"order": {...}, "ndr_reason": "customer_unavailable", ...}
}
# After AnalyzeNDR:
state = {
"FetchOrderData": {"order": {...}, ...},
"AnalyzeNDR": {"action": "REATTEMPT", "confidence": 0.85, "reasoning": "..."}
}
Every node can read previous nodes' outputs from ctx.state.
Node Types
| Type | Base Class | Purpose | What You Implement |
|---|---|---|---|
| Tool | ToolNode |
Calls tools via the bridge | get_tool_calls(), process_results() |
| Agent | AgentNode |
Runs PydanticAI agent | build_prompt(), process_output() |
| Human | HumanInputNode |
Pauses for human input | get_input_schema(), get_prompt(), process_input() |
| Logic | BaseNode |
Pure routing/decision | run() |
See Adding Flows for how to build new flows with these node types.
Persistence
Graph state is persisted in the ai_graph_runs tenant table:
| Field | Type | Description |
|---|---|---|
id |
UUID | Run identifier |
flow_type |
string | ndr_resolution, fraud_review, etc. |
status |
string | running, paused, completed, failed, cancelled |
current_node |
string | Where execution is (null when complete) |
input_data |
jsonb | Original input |
state |
jsonb | Accumulated node outputs |
node_results |
jsonb | Array of completed node results |
final_output |
jsonb | Result when flow completes |
pending_input_schema |
jsonb | JSON Schema for human input (when paused) |
pending_input_prompt |
string | Description for the human reviewer |
total_tokens |
int | Total LLM tokens used |
total_cost |
decimal | Total estimated cost |
This means:
- Flows survive server restarts (state is in PostgreSQL)
- You can list paused flows and show them in a dashboard
- Completed flows provide an audit trail of every step