Workflow YAML Reference

This page describes the complete YAML schema for defining Yagra workflows.

Overview

A workflow YAML file defines:

  • Nodes: Processing units (e.g., classifiers, generators, evaluators)

  • Edges: Transitions between nodes (unconditional or conditional)

  • Start/End points: Entry and exit nodes

Yagra validates the YAML against a Pydantic schema (GraphSpec) and builds a LangGraph StateGraph.

Top-Level Structure

version: "1.0"           # Required: Schema version
start_at: "node_id"      # Required: Entry node ID
end_at:                  # Required: List of exit node IDs
  - "finish"
  - "error_handler"
state_schema:            # Optional: Typed state field definitions
  field_name:
    type: str            # str | int | float | bool | list | dict | messages
nodes:                   # Required: List of node definitions
  - id: "node_1"
    handler: "handler_name"
    params: {}
edges:                   # Required: List of edge definitions
  - source: "node_1"
    target: "node_2"
    condition: null      # Optional: Conditional branching
params: {}               # Optional: Global parameters
interrupt_before:        # Optional: Pause before these nodes (HITL)
  - "review_node"
interrupt_after:         # Optional: Pause after these nodes (HITL)
  - "generate_node"

State Schema

The optional state_schema section defines typed fields for the workflow’s state. Yagra uses these definitions to build a typed TypedDict and configure LangGraph reducers.

Basic Field

state_schema:
  query:
    type: str
  count:
    type: int
  active:
    type: bool
  tags:
    type: list

Supported types: str, int, float, bool, list, dict, messages

Fan-In with Reducer

Use reducer: add on list fields to enable parallel fan-in (combines outputs from multiple concurrent nodes):

state_schema:
  results:
    type: list
    reducer: add    # operator.add: merges lists from parallel nodes

Chat History (MessagesState)

Use type: messages to enable LangGraph’s add_messages reducer for conversational state:

state_schema:
  messages:
    type: messages    # Activates add_messages: new messages are appended, not overwritten

Handlers should return {"messages": [new_message]} to append to the conversation history.

Node Specification

Each node defines a processing unit.

Basic Node

nodes:
  - id: "classifier"
    handler: "classify_intent"
  • id (str, required): Unique node identifier

  • handler (str, required): Handler function name (resolved via registry)

Node with Parameters

nodes:
  - id: "generator"
    handler: "generate_answer"
    params:
      prompt_ref: "../prompts/generator.yaml#system"
      model:
        provider: "openai"
        name: "gpt-4.1-mini"
        kwargs:
          temperature: 0.7
          max_tokens: 1000
  • params (dict, optional): Parameters passed to the handler

    • prompt_ref: External prompt reference (see Prompt & Model)

    • model: Model configuration (inline definition)

    • Custom parameters: Any additional data your handler needs

Subgraph Node

Use handler: "subgraph" with params.workflow_ref to embed another workflow YAML as a nested subgraph:

nodes:
  - id: "sub_agent"
    handler: "subgraph"
    params:
      workflow_ref: ./sub_workflow.yaml   # Relative path from this workflow file

The subgraph shares the parent’s registry and checkpointer. All handlers referenced in both YAMLs must be registered in the same registry when building the graph.

Resilience (Retry, Timeout, Fallback)

Nodes can declare retry, timeout, and fallback behavior directly in YAML:

nodes:
  - id: "translate"
    handler: "llm"
    retry:
      max_attempts: 3
      backoff: exponential    # exponential | fixed
      base_delay_seconds: 2
    timeout_seconds: 60
    fallback: fallback_translate
    params:
      prompt_ref: "prompts.yaml#translate"
      model: { provider: openai, name: gpt-4o-mini }

Fields:

Field

Type

Description

retry

object, optional

Retry configuration block

retry.max_attempts

int (1–10)

Maximum number of retry attempts. Default: 3

retry.backoff

exponential | fixed

Backoff strategy. Default: exponential

retry.base_delay_seconds

float (0–60)

Initial delay in seconds between retries. Default: 2.0

timeout_seconds

int (1–600), optional

Maximum execution time for the node in seconds

fallback

str, optional

Node ID to execute if this node fails after all retries

Retry behavior: When a node raises an exception, the retry wrapper re-executes it up to max_attempts times with backoff delays:

  • Exponential: delays are base * 2^(attempt-1) seconds (e.g., 2s, 4s, 8s)

  • Fixed: delays are always base_delay_seconds

Fallback behavior: If a node fails after all retries (or without retry), and fallback is specified, the error is captured in state["__error__"] and execution continues to the fallback node.

Schema validation:

  • The fallback node ID must exist in the workflow’s node list

  • Self-referencing fallbacks (e.g., fallback: translate on node translate) are rejected

  • Fuzzy match suggestions are provided for typos in fallback references

Backward compatibility: All fields are optional with None defaults. Existing workflows without retry/timeout/fallback are unaffected.

Node Handler Signature

Your handler function receives (state, params) or just (state):

def my_handler(state: AgentState, params: dict) -> dict:
    prompt = params.get("prompt", {})
    model = params.get("model", {})
    # ... process state and return updates
    return {"key": "value"}

Yagra tries calling with params first, falls back to state-only if that fails.

Edge Specification

Edges define transitions between nodes.

Unconditional Edge

edges:
  - source: "node_1"
    target: "node_2"

Always transitions from node_1 to node_2.

Conditional Edge

edges:
  - source: "classifier"
    target: "faq_bot"
    condition: "faq"
  - source: "classifier"
    target: "general_bot"
    condition: "general"
  • condition (str, optional): Branching label

  • The source node must return {"__next__": "faq"} or {"__next__": "general"}

Contract: The source node is responsible for setting __next__ in the state.

Example:

def classifier(state: AgentState, params: dict) -> dict:
    intent = "faq" if "pricing" in state["query"] else "general"
    return {"intent": intent, "__next__": intent}

Fan-Out Edge (Parallel Dispatch)

Use fan_out to dispatch items from a list in parallel using LangGraph’s Send API:

edges:
  - source: "prepare"
    target: "process_item"
    fan_out:
      items_key: items    # State key containing the list (e.g., state["items"])
      item_key: item      # Key passed to each parallel invocation (e.g., state["item"])
  • fan_out is mutually exclusive with condition

  • The target node receives {item_key: single_item} for each element in state[items_key]

  • Use reducer: add on the output state field to merge results from all parallel executions

Example: Map-Reduce

state_schema:
  items:
    type: list
  results:
    type: list
    reducer: add

edges:
  - source: "prepare"
    target: "process_item"
    fan_out:
      items_key: items
      item_key: item
  - source: "process_item"
    target: "aggregate"

Handler for process_item:

def process_handler(state: dict) -> dict:
    item = state["item"]   # Single item from the fan-out
    result = do_work(item)
    return {"results": [result]}   # Appended to state["results"] via reducer: add

Start and End Points

start_at

The node ID where execution begins.

start_at: "classifier"

end_at

A list of node IDs where execution can terminate.

end_at:
  - "finish"
  - "error"

Yagra registers these nodes as LangGraph finish points. When execution reaches any of these nodes, the graph stops.

Note: Do not write END explicitly in YAML—Yagra handles this internally.

Global Parameters

Optional top-level params apply to all nodes unless overridden.

params:
  default_temperature: 0.7
  retry_limit: 3

nodes:
  - id: "node_1"
    handler: "handler_1"
    params:
      temperature: 0.9  # Overrides default

HITL / Interrupt

Use interrupt_before and interrupt_after to pause execution at specific nodes for Human-in-the-Loop (HITL) review. Yagra passes these lists directly to LangGraph’s compile(interrupt_before=..., interrupt_after=...).

interrupt_before

Pause execution before the listed nodes run. Use this to require human approval before a critical action.

interrupt_before:
  - "send_email"
  - "deploy"

When the graph reaches send_email, execution suspends. The human can inspect state and then call Yagra.resume() to continue.

interrupt_after

Pause execution after the listed nodes run. Use this to allow humans to review or modify the node’s output before the workflow continues.

interrupt_after:
  - "generate_draft"

Resume after Interrupt

from yagra import Yagra

app = Yagra(registry=registry, checkpointer=checkpointer)
app.run(workflow_path="workflow.yaml", state={"query": "hello"}, config={"configurable": {"thread_id": "1"}})

# --- human reviews state here ---

app.resume(config={"configurable": {"thread_id": "1"}})

A checkpointer is required for interrupt/resume to work. Yagra passes it to StateGraph.compile().

Note: interrupt_before and interrupt_after node IDs must exist in the nodes list.

Validation Rules

Yagra validates workflows before building the graph:

  1. Schema compliance: YAML must match GraphSpec Pydantic model

  2. Node ID uniqueness: No duplicate node IDs

  3. Edge references: All source/target must reference existing nodes

  4. Start/End validity: start_at and end_at nodes must exist

  5. Prompt references: prompt_ref paths must resolve to valid files

  6. Edge rules: Mixed conditional and unconditional edges from the same source are not allowed; fan_out edges cannot be combined with other edge types from the same source

  7. State schema: reducer: add requires type: list or type: messages

  8. Fallback references: fallback must reference an existing node; self-references are rejected

  9. Prompt-state consistency (warning): {variable} placeholders in prompts should exist in state_schema or upstream output_key; output_key should be declared in state_schema when defined

Use yagra validate to check compliance:

yagra validate --workflow workflow.yaml --format json

Example: Complete Workflow

version: "1.0"
start_at: "retrieve"
end_at:
  - "generate"

nodes:
  - id: "retrieve"
    handler: "retrieve_documents"
    params:
      top_k: 5
  - id: "rerank"
    handler: "rerank_documents"
    params:
      prompt_ref: "../prompts/rerank.yaml#system"
  - id: "generate"
    handler: "generate_answer"
    params:
      prompt_ref: "../prompts/generate.yaml#system"
      model:
        provider: "anthropic"
        name: "claude-3-sonnet"

edges:
  - source: "retrieve"
    target: "rerank"
  - source: "rerank"
    target: "generate"

Next Steps