PY
Python SDK
pip install orch8-io-sdkFeatures
- ✓Async client built on httpx with async context manager support
- ✓76 methods covering all engine domains — sequences, instances, pools, credentials, circuit breakers, approvals, and more
- ✓Pydantic models matching the Rust engine types
- ✓Polling worker with circuit breaker awareness — skips handlers with open breakers
- ✓Exponential backoff on poll failures (doubles up to 30s, resets on success)
- ✓on_task_complete / on_task_fail observability callbacks
- ✓Semaphore-based concurrency limiting (default max_concurrent=5)
- ✓Orch8Error class with status and structured body
Client methods (76)
Full management surface covering all engine API domains.
- list_sequences / create_sequence / get_sequence / get_sequence_by_name / delete_sequence / deprecate_sequence / list_sequence_versions / migrate_instance
- create_instance / batch_create_instances / get_instance / list_instances / update_instance_state / update_instance_context / send_signal / retry_instance / inject_blocks
- get_outputs / get_execution_tree / list_audit_log
- list_checkpoints / save_checkpoint / get_latest_checkpoint / prune_checkpoints
- bulk_update_state / bulk_reschedule / list_dlq
- list_approvals
- list_worker_tasks / get_worker_task_stats / poll_tasks / poll_tasks_from_queue / complete_task / fail_task / heartbeat_task
- create_cron / list_cron / get_cron / update_cron / delete_cron
- create_trigger / list_triggers / get_trigger / delete_trigger / fire_trigger
- create_plugin / list_plugins / get_plugin / update_plugin / delete_plugin
- create_session / get_session / get_session_by_key / update_session_data / update_session_state / list_session_instances
- list_pools / create_pool / get_pool / delete_pool / list_pool_resources / create_pool_resource / update_pool_resource / delete_pool_resource
- list_credentials / create_credential / get_credential / delete_credential / update_credential
- list_circuit_breakers / get_circuit_breaker / reset_circuit_breaker / list_tenant_circuit_breakers / get_tenant_circuit_breaker / reset_tenant_circuit_breaker
- list_cluster_nodes / drain_node / health
API coverage
The engine exposes 122+ REST endpoints across 20 domains. ✓ = full, ~ = partial, W = via worker SDK, — = use REST API directly.
| Domain | Endpoints | This SDK |
|---|---|---|
| Sequences | 7 | ✓ |
| Instances | 13 | ✓ |
| Checkpoints | 4 | ✓ |
| Audit log | 1 | ✓ |
| Bulk operations | 2 | ✓ |
| Dead letter queue | 1 | ✓ |
| Worker tasks | 7 | ✓ |
| Cron schedules | 5 | ✓ |
| Triggers | 5 | ✓ |
| Plugins | 5 | ✓ |
| Sessions | 6 | ✓ |
| Credentials | 5 | ✓ |
| Resource pools | 8 | ✓ |
| Approvals | 1 | ✓ |
| Circuit breakers | 6 | ✓incl. per-tenant |
| Cluster | 2 | ✓ |
| Health | 2 | ~ready only |
| SSE streaming | 1 | — |
| Instance migration | 1 | ✓ |
| Block injection | 1 | ✓ |
Management Client
Full API surface — sequences, instances, pools, credentials, cron, triggers, sessions, DLQ, circuit breakers, and more.
from orch8 import Orch8Client
async with Orch8Client(
base_url="http://localhost:8080",
tenant_id="my-tenant",
) as client:
# Sequences
seq = await client.create_sequence({
"tenant_id": "my-tenant",
"namespace": "default",
"name": "onboarding-drip",
"version": 1,
"blocks": [
{"type": "Step", "handler": "send_welcome_email"},
{"type": "Step", "handler": "wait_48h", "delay": "48h"},
],
})
# Instances
instance = await client.create_instance({
"sequence_id": seq.id,
"tenant_id": "my-tenant",
"context": {"data": {"userId": "usr_123", "email": "user@example.com"}},
})
await client.send_signal(instance.id, "update_context", {"preferences": "weekly"})
# Checkpoints
await client.save_checkpoint(instance.id, {"progress": 42})
latest = await client.get_latest_checkpoint(instance.id)
# DLQ + bulk
dlq = await client.list_dlq(limit=10)
await client.bulk_update_state({"sequence_id": seq.id}, "Cancelled")
# Cron
cron = await client.create_cron({
"tenant_id": "my-tenant",
"sequence_id": seq.id,
"expression": "0 9 * * 1",
"timezone": "America/Sao_Paulo",
})
# Triggers
trigger = await client.create_trigger({
"slug": "new-signup",
"tenant_id": "my-tenant",
"sequence_id": seq.id,
"trigger_type": "webhook",
})
# Sessions
session = await client.create_session({
"tenant_id": "my-tenant",
"session_key": "user:usr_123",
})
# Circuit breakers
breakers = await client.list_circuit_breakers()
await client.reset_circuit_breaker("flaky_handler")
# Cluster
nodes = await client.list_cluster_nodes()Polling Worker
Register handler functions and let the worker poll, execute, heartbeat, and report results automatically.
from orch8 import Orch8Client, Orch8Worker
async def send_welcome_email(task):
email = task.context["data"]["email"]
await send_email(email, "Welcome!")
return {"sent": True}
async def check_engagement(task):
score = await get_engagement_score(task.context["data"]["userId"])
return {"route": "engaged" if score > 50 else "inactive"}
async with Orch8Client("http://localhost:8080", tenant_id="my-tenant") as client:
worker = Orch8Worker(
client=client,
worker_id="worker-1",
handlers={
"send_welcome_email": send_welcome_email,
"check_engagement": check_engagement,
},
poll_interval=2.0, # seconds between polls per handler
heartbeat_interval=30.0, # seconds between heartbeats
max_concurrent=5, # semaphore across all handlers
)
await worker.start()Full API reference
All SDKs cover the complete engine API. For the raw REST documentation including request/response schemas and authentication details, see the API reference.
View full API reference (122+ endpoints) →