Skip to content

Streaming, Approvals & Checkpoints

Real-time visibility into running workflows, a centralized approval queue for human-in-the-loop steps, and checkpoint save/restore for long-running orchestrations.

SSE Streaming

Subscribe to real-time execution progress for any instance using Server-Sent Events. The stream emits state changes, step outputs, and a terminal event when the instance completes.

Endpoint

GET /instances/{instance_id}/stream?poll_ms=500

Headers:
  Accept: text/event-stream

Query Parameters:
  poll_ms  — Polling interval in milliseconds (default: 500, min: 100, max: 5000)

Incremental

Only new outputs and state changes are emitted. No duplicate data.

Auto-Close

Stream closes automatically when instance reaches a terminal state.

Keepalive

SSE comment sent every 15 seconds to prevent proxy/load-balancer timeouts.

Backpressure

32-slot channel buffer. If client is slower than poll rate, oldest events are dropped.

Event Types

state — Instance state change

event: state
data: {"instance_id": "abc-123", "state": "running"}

output — Step completed with output

event: output
data: {"block_id": "fetch-data", "output": {"status": 200, "body": {...}}, "created_at": "..."}

done — Instance reached terminal state

event: done
data: {"state": "completed"}

error — Instance not found or disappeared

event: error
data: {"error": "instance not found"}

Client Example

// Browser / Node.js with EventSource
const es = new EventSource("/instances/abc-123/stream?poll_ms=300");

es.addEventListener("state", (e) => {
  const { state } = JSON.parse(e.data);
  console.log("State:", state);
});

es.addEventListener("output", (e) => {
  const output = JSON.parse(e.data);
  console.log("Step completed:", output.block_id);
});

es.addEventListener("done", (e) => {
  const { state } = JSON.parse(e.data);
  console.log("Finished:", state);
  es.close();
});

es.addEventListener("error", (e) => {
  console.error("Stream error");
  es.close();
});

Approvals Queue

A centralized endpoint that lists all workflow instances currently waiting on human review. Build approval dashboards, Slack bots, or mobile notifications on top of this API.

API

GET /approvals?tenant_id=tenant-1&limit=50&offset=0

Optional query params:
  tenant_id   Filter by tenant
  namespace   Filter by namespace
  offset      Pagination offset (default: 0)
  limit       Page size (default: 100)

Response:
{
  "items": [
    {
      "instance_id": "abc-123",
      "tenant_id": "tenant-1",
      "namespace": "production",
      "sequence_id": "expense-approval",
      "sequence_name": "Expense Approval Flow",
      "block_id": "manager-review",
      "prompt": "Approve expense of $2,500 for Q1 marketing?",
      "choices": [
        { "label": "Approve", "value": "approved" },
        { "label": "Reject", "value": "rejected" },
        { "label": "Need More Info", "value": "info_needed" }
      ],
      "store_as": "decision",
      "timeout_seconds": 86400,
      "escalation_handler": "escalate-to-director",
      "waiting_since": "2024-01-15T09:00:00Z",
      "deadline": "2024-01-16T09:00:00Z",
      "metadata": { "requester": "alice@company.com", "amount": 2500 }
    }
  ],
  "total": 12
}
FieldDescription
promptHuman-readable question displayed to the reviewer
choicesAvailable options with label (display) and value (stored). Defaults to Yes/No if not specified.
store_asContext key where the chosen value is stored after approval
deadlineComputed from waiting_since + timeout_seconds. Null if no timeout.
escalation_handlerHandler invoked if deadline passes without a decision

Completing an Approval

To approve or reject, send a signal to the waiting instance:

POST /instances/abc-123/signals

{
  "signal_type": "human_input:manager-review",
  "payload": {
    "decision": "approved",
    "reviewer": "bob@company.com",
    "notes": "Within budget, approved."
  }
}

The signal type must match the pattern human_input:{block_id}. The payload becomes the step output and is accessible to subsequent steps.

Checkpoints

Save and restore execution state at any point. Checkpoints are useful for long-running workflows where you want recovery points, debugging snapshots, or the ability to replay from a known state.

API

Save Checkpoint

POST /instances/{instance_id}/checkpoints

{
  "checkpoint_data": {
    "progress": 75,
    "processed_items": 1500,
    "last_cursor": "cursor_abc123",
    "accumulated_results": [...]
  }
}

Response (201):
{
  "id": "checkpoint-uuid"
}

List Checkpoints

GET /instances/{instance_id}/checkpoints

Response:
[
  {
    "id": "cp-1",
    "instance_id": "abc-123",
    "checkpoint_data": {...},
    "created_at": "2024-01-15T10:30:00Z"
  },
  {
    "id": "cp-2",
    "instance_id": "abc-123",
    "checkpoint_data": {...},
    "created_at": "2024-01-15T11:00:00Z"
  }
]

Get Latest Checkpoint

GET /instances/{instance_id}/checkpoints/latest

Response: Single Checkpoint object (404 if none exist)

Prune Checkpoints

POST /instances/{instance_id}/checkpoints/prune

{
  "keep": 3
}

Response:
{
  "count": 7
}

// Keeps the 3 most recent checkpoints, deletes 7 older ones

Checkpoint Patterns

Batch Processing Recovery

When processing 100K records, checkpoint every 1000. On failure, retry from the last checkpoint cursor instead of reprocessing everything.

Debug Snapshots

Save a checkpoint before a complex decision step. If the result is unexpected, inspect the checkpoint data to understand the state that led to that decision.

Periodic Pruning

For workflows that run for days, save checkpoints every hour and prune to keep only the 5 most recent. Balances recovery capability with storage efficiency.

Migration Rollback

Before a sequence version upgrade, checkpoint all running instances. If the new version introduces issues, restore from the checkpoint on the previous version.