Graph Flows

Graph flows are multi-step AI workflows where each step is a node that either calls tools, runs an AI agent, or pauses for human input. The graph engine executes nodes sequentially, persists state at each step, and can pause/resume across server restarts.

When to Use Graph Flows

Use Case Use Chat Use Graph Flow
"Where is my order?" Yes No
"Process this NDR with approval" No Yes
"Score this order for fraud" No Yes
"Evaluate and process this refund" No Yes
Any multi-step workflow with gates No Yes

Rule of thumb: If it needs human approval, multiple tool calls in sequence, or conditional branching — use a graph flow.

Built-in Flows

NDR Resolution

Handles non-delivery reports through investigation, AI analysis, and optional approval:

FetchOrderData → AnalyzeNDR → DecideAction → [ApproveAction] → ExecuteAction
                                                   ↑
                                        High-value (>₹5000) or
                                        low AI confidence (<70%)
POST /api/v1/ai/agent/graph/run
{
  "flow_type": "ndr_resolution",
  "input_data": {
    "order_id": "uuid",
    "ndr_reason": "customer_unavailable",
    "delivery_attempts": 2,
    "customer_phone": "+919876543210"
  }
}

Possible outcomes: REATTEMPT (schedule re-delivery), RTO (return to origin), CONTACT (reach out to customer), ESCALATE (manual handling).

Fraud Review

Scores orders for fraud risk with AI and routes for review:

AssessOrder → ScoreFraudRisk → FraudDecision → [ReviewFraud] → TakeAction
                                    │
                        Low risk (<0.3) → auto-approve
                        Medium (0.3-0.7) → human review
                        High (>0.7) → auto-hold + review
POST /api/v1/ai/agent/graph/run
{
  "flow_type": "fraud_review",
  "input_data": {
    "order_id": "uuid"
  }
}

Possible outcomes: APPROVE (confirm order), HOLD (keep pending for investigation), CANCEL (cancel the order).

Refund Approval

Evaluates refund requests against policy:

LoadRefundContext → EvaluateRefund → RefundDecision → [ApproveRefund] → ProcessRefund
                                          │
                           Amount ≤ ₹500 + return approved → auto-process
                           Amount > ₹500 or edge case → human approval
POST /api/v1/ai/agent/graph/run
{
  "flow_type": "refund_approval",
  "input_data": {
    "order_id": "uuid",
    "refund_amount": 2500,
    "reason": "Product defective",
    "return_id": "return-uuid"
  }
}

Possible outcomes: APPROVE (full refund), PARTIAL (reduced amount), DENY (rejected).

Graph Run Lifecycle

1. Start a Flow

POST /api/v1/ai/agent/graph/run
{
  "flow_type": "ndr_resolution",
  "input_data": { ... }
}

Response (completed — no approval needed):

{
  "run_id": "uuid",
  "flow_type": "ndr_resolution",
  "status": "completed",
  "node_results": [
    {"node_name": "FetchOrderData", "status": "completed", "output": {...}},
    {"node_name": "AnalyzeNDR", "status": "completed", "output": {...}},
    {"node_name": "DecideAction", "status": "completed", "output": {...}},
    {"node_name": "ExecuteAction", "status": "completed", "output": {...}}
  ],
  "final_output": {"action_taken": "REATTEMPT", "message": "NDR resolved: REATTEMPT"},
  "total_tokens": 1250,
  "total_cost": 0.0185
}

Response (paused — needs approval):

{
  "run_id": "uuid",
  "flow_type": "ndr_resolution",
  "status": "paused",
  "current_node": "ApproveAction",
  "requires_human_input": true,
  "human_input_schema": {
    "type": "object",
    "properties": {
      "decision": {"type": "string", "enum": ["approve", "reject", "override"]},
      "override_action": {"type": "string", "enum": ["REATTEMPT", "RTO", "CONTACT", "ESCALATE"]},
      "notes": {"type": "string"}
    },
    "required": ["decision"]
  },
  "human_input_prompt": "NDR Resolution requires approval.\n\nOrder: ORD-123 (Total: 8500)\nAI Recommendation: REATTEMPT (confidence: 62%)..."
}

2. Resume with Human Input

POST /api/v1/ai/agent/graph/resume
{
  "run_id": "uuid-from-paused-response",
  "human_input": {
    "decision": "approve",
    "notes": "Customer confirmed availability"
  }
}

The graph resumes from the paused node, processes the human input, and continues to completion.

3. Check Status

GET /api/v1/ai/agent/graph/{run_id}/status

4. List Runs

GET /api/v1/ai/agent/graph/runs?flow_type=ndr_resolution&status=paused

How the Engine Works

The graph engine is the core orchestrator. It's flow-agnostic — it runs any registered flow definition:

  1. Start: Creates a run record in tenant DB, begins at the entry node
  2. Execute: Runs each node, merges output into shared state dict
  3. Persist: After each node, saves state + result to tenant DB via callback
  4. Pause: If a HumanInputNode raises HumanInputRequired, persists state and returns what input is needed
  5. Resume: Loads persisted state, injects human input, continues from the paused node
  6. Complete: Marks run as completed with final output
  7. Safety: 20-node execution limit prevents infinite loops

State Flow

Each node's output is stored in state[node_name]:

# After FetchOrderData completes:
state = {
    "FetchOrderData": {"order": {...}, "ndr_reason": "customer_unavailable", ...}
}

# After AnalyzeNDR:
state = {
    "FetchOrderData": {"order": {...}, ...},
    "AnalyzeNDR": {"action": "REATTEMPT", "confidence": 0.85, "reasoning": "..."}
}

Every node can read previous nodes' outputs from ctx.state.

Node Types

Type Base Class Purpose What You Implement
Tool ToolNode Calls tools via the bridge get_tool_calls(), process_results()
Agent AgentNode Runs PydanticAI agent build_prompt(), process_output()
Human HumanInputNode Pauses for human input get_input_schema(), get_prompt(), process_input()
Logic BaseNode Pure routing/decision run()

See Adding Flows for how to build new flows with these node types.

Persistence

Graph state is persisted in the ai_graph_runs tenant table:

Field Type Description
id UUID Run identifier
flow_type string ndr_resolution, fraud_review, etc.
status string running, paused, completed, failed, cancelled
current_node string Where execution is (null when complete)
input_data jsonb Original input
state jsonb Accumulated node outputs
node_results jsonb Array of completed node results
final_output jsonb Result when flow completes
pending_input_schema jsonb JSON Schema for human input (when paused)
pending_input_prompt string Description for the human reviewer
total_tokens int Total LLM tokens used
total_cost decimal Total estimated cost

This means:

  • Flows survive server restarts (state is in PostgreSQL)
  • You can list paused flows and show them in a dashboard
  • Completed flows provide an audit trail of every step