Streaming, Approvals & Checkpoints
Real-time visibility into running workflows, a centralized approval queue for human-in-the-loop steps, and checkpoint save/restore for long-running orchestrations.
SSE Streaming
Subscribe to real-time execution progress for any instance using Server-Sent Events. The stream emits state changes, step outputs, and a terminal event when the instance completes.
Endpoint
GET /instances/{instance_id}/stream?poll_ms=500
Headers:
Accept: text/event-stream
Query Parameters:
poll_ms — Polling interval in milliseconds (default: 500, min: 100, max: 5000)Incremental
Only new outputs and state changes are emitted. No duplicate data.
Auto-Close
Stream closes automatically when instance reaches a terminal state.
Keepalive
SSE comment sent every 15 seconds to prevent proxy/load-balancer timeouts.
Backpressure
32-slot channel buffer. If client is slower than poll rate, oldest events are dropped.
Event Types
state — Instance state change
event: state
data: {"instance_id": "abc-123", "state": "running"}output — Step completed with output
event: output
data: {"block_id": "fetch-data", "output": {"status": 200, "body": {...}}, "created_at": "..."}done — Instance reached terminal state
event: done
data: {"state": "completed"}error — Instance not found or disappeared
event: error
data: {"error": "instance not found"}Client Example
// Browser / Node.js with EventSource
const es = new EventSource("/instances/abc-123/stream?poll_ms=300");
es.addEventListener("state", (e) => {
const { state } = JSON.parse(e.data);
console.log("State:", state);
});
es.addEventListener("output", (e) => {
const output = JSON.parse(e.data);
console.log("Step completed:", output.block_id);
});
es.addEventListener("done", (e) => {
const { state } = JSON.parse(e.data);
console.log("Finished:", state);
es.close();
});
es.addEventListener("error", (e) => {
console.error("Stream error");
es.close();
});Approvals Queue
A centralized endpoint that lists all workflow instances currently waiting on human review. Build approval dashboards, Slack bots, or mobile notifications on top of this API.
API
GET /approvals?tenant_id=tenant-1&limit=50&offset=0
Optional query params:
tenant_id — Filter by tenant
namespace — Filter by namespace
offset — Pagination offset (default: 0)
limit — Page size (default: 100)
Response:
{
"items": [
{
"instance_id": "abc-123",
"tenant_id": "tenant-1",
"namespace": "production",
"sequence_id": "expense-approval",
"sequence_name": "Expense Approval Flow",
"block_id": "manager-review",
"prompt": "Approve expense of $2,500 for Q1 marketing?",
"choices": [
{ "label": "Approve", "value": "approved" },
{ "label": "Reject", "value": "rejected" },
{ "label": "Need More Info", "value": "info_needed" }
],
"store_as": "decision",
"timeout_seconds": 86400,
"escalation_handler": "escalate-to-director",
"waiting_since": "2024-01-15T09:00:00Z",
"deadline": "2024-01-16T09:00:00Z",
"metadata": { "requester": "alice@company.com", "amount": 2500 }
}
],
"total": 12
}| Field | Description |
|---|---|
| prompt | Human-readable question displayed to the reviewer |
| choices | Available options with label (display) and value (stored). Defaults to Yes/No if not specified. |
| store_as | Context key where the chosen value is stored after approval |
| deadline | Computed from waiting_since + timeout_seconds. Null if no timeout. |
| escalation_handler | Handler invoked if deadline passes without a decision |
Completing an Approval
To approve or reject, send a signal to the waiting instance:
POST /instances/abc-123/signals
{
"signal_type": "human_input:manager-review",
"payload": {
"decision": "approved",
"reviewer": "bob@company.com",
"notes": "Within budget, approved."
}
}The signal type must match the pattern human_input:{block_id}. The payload becomes the step output and is accessible to subsequent steps.
Checkpoints
Save and restore execution state at any point. Checkpoints are useful for long-running workflows where you want recovery points, debugging snapshots, or the ability to replay from a known state.
API
Save Checkpoint
POST /instances/{instance_id}/checkpoints
{
"checkpoint_data": {
"progress": 75,
"processed_items": 1500,
"last_cursor": "cursor_abc123",
"accumulated_results": [...]
}
}
Response (201):
{
"id": "checkpoint-uuid"
}List Checkpoints
GET /instances/{instance_id}/checkpoints
Response:
[
{
"id": "cp-1",
"instance_id": "abc-123",
"checkpoint_data": {...},
"created_at": "2024-01-15T10:30:00Z"
},
{
"id": "cp-2",
"instance_id": "abc-123",
"checkpoint_data": {...},
"created_at": "2024-01-15T11:00:00Z"
}
]Get Latest Checkpoint
GET /instances/{instance_id}/checkpoints/latest
Response: Single Checkpoint object (404 if none exist)Prune Checkpoints
POST /instances/{instance_id}/checkpoints/prune
{
"keep": 3
}
Response:
{
"count": 7
}
// Keeps the 3 most recent checkpoints, deletes 7 older onesCheckpoint Patterns
Batch Processing Recovery
When processing 100K records, checkpoint every 1000. On failure, retry from the last checkpoint cursor instead of reprocessing everything.
Debug Snapshots
Save a checkpoint before a complex decision step. If the result is unexpected, inspect the checkpoint data to understand the state that led to that decision.
Periodic Pruning
For workflows that run for days, save checkpoints every hour and prune to keep only the 5 most recent. Balances recovery capability with storage efficiency.
Migration Rollback
Before a sequence version upgrade, checkpoint all running instances. If the new version introduces issues, restore from the checkpoint on the previous version.