Skip to content
← All SDKs
PY

Python SDK

pip install orch8-io-sdk

Features

  • Async client built on httpx with async context manager support
  • 76 methods covering all engine domains — sequences, instances, pools, credentials, circuit breakers, approvals, and more
  • Pydantic models matching the Rust engine types
  • Polling worker with circuit breaker awareness — skips handlers with open breakers
  • Exponential backoff on poll failures (doubles up to 30s, resets on success)
  • on_task_complete / on_task_fail observability callbacks
  • Semaphore-based concurrency limiting (default max_concurrent=5)
  • Orch8Error class with status and structured body

Client methods (76)

Full management surface covering all engine API domains.

  • list_sequences / create_sequence / get_sequence / get_sequence_by_name / delete_sequence / deprecate_sequence / list_sequence_versions / migrate_instance
  • create_instance / batch_create_instances / get_instance / list_instances / update_instance_state / update_instance_context / send_signal / retry_instance / inject_blocks
  • get_outputs / get_execution_tree / list_audit_log
  • list_checkpoints / save_checkpoint / get_latest_checkpoint / prune_checkpoints
  • bulk_update_state / bulk_reschedule / list_dlq
  • list_approvals
  • list_worker_tasks / get_worker_task_stats / poll_tasks / poll_tasks_from_queue / complete_task / fail_task / heartbeat_task
  • create_cron / list_cron / get_cron / update_cron / delete_cron
  • create_trigger / list_triggers / get_trigger / delete_trigger / fire_trigger
  • create_plugin / list_plugins / get_plugin / update_plugin / delete_plugin
  • create_session / get_session / get_session_by_key / update_session_data / update_session_state / list_session_instances
  • list_pools / create_pool / get_pool / delete_pool / list_pool_resources / create_pool_resource / update_pool_resource / delete_pool_resource
  • list_credentials / create_credential / get_credential / delete_credential / update_credential
  • list_circuit_breakers / get_circuit_breaker / reset_circuit_breaker / list_tenant_circuit_breakers / get_tenant_circuit_breaker / reset_tenant_circuit_breaker
  • list_cluster_nodes / drain_node / health

API coverage

The engine exposes 122+ REST endpoints across 20 domains. = full, ~ = partial, W = via worker SDK, = use REST API directly.

DomainEndpointsThis SDK
Sequences7
Instances13
Checkpoints4
Audit log1
Bulk operations2
Dead letter queue1
Worker tasks7
Cron schedules5
Triggers5
Plugins5
Sessions6
Credentials5
Resource pools8
Approvals1
Circuit breakers6incl. per-tenant
Cluster2
Health2~ready only
SSE streaming1
Instance migration1
Block injection1

Management Client

Full API surface — sequences, instances, pools, credentials, cron, triggers, sessions, DLQ, circuit breakers, and more.

from orch8 import Orch8Client

async with Orch8Client(
    base_url="http://localhost:8080",
    tenant_id="my-tenant",
) as client:
    # Sequences
    seq = await client.create_sequence({
        "tenant_id": "my-tenant",
        "namespace": "default",
        "name": "onboarding-drip",
        "version": 1,
        "blocks": [
            {"type": "Step", "handler": "send_welcome_email"},
            {"type": "Step", "handler": "wait_48h", "delay": "48h"},
        ],
    })

    # Instances
    instance = await client.create_instance({
        "sequence_id": seq.id,
        "tenant_id": "my-tenant",
        "context": {"data": {"userId": "usr_123", "email": "user@example.com"}},
    })
    await client.send_signal(instance.id, "update_context", {"preferences": "weekly"})

    # Checkpoints
    await client.save_checkpoint(instance.id, {"progress": 42})
    latest = await client.get_latest_checkpoint(instance.id)

    # DLQ + bulk
    dlq = await client.list_dlq(limit=10)
    await client.bulk_update_state({"sequence_id": seq.id}, "Cancelled")

    # Cron
    cron = await client.create_cron({
        "tenant_id": "my-tenant",
        "sequence_id": seq.id,
        "expression": "0 9 * * 1",
        "timezone": "America/Sao_Paulo",
    })

    # Triggers
    trigger = await client.create_trigger({
        "slug": "new-signup",
        "tenant_id": "my-tenant",
        "sequence_id": seq.id,
        "trigger_type": "webhook",
    })

    # Sessions
    session = await client.create_session({
        "tenant_id": "my-tenant",
        "session_key": "user:usr_123",
    })

    # Circuit breakers
    breakers = await client.list_circuit_breakers()
    await client.reset_circuit_breaker("flaky_handler")

    # Cluster
    nodes = await client.list_cluster_nodes()

Polling Worker

Register handler functions and let the worker poll, execute, heartbeat, and report results automatically.

from orch8 import Orch8Client, Orch8Worker

async def send_welcome_email(task):
    email = task.context["data"]["email"]
    await send_email(email, "Welcome!")
    return {"sent": True}

async def check_engagement(task):
    score = await get_engagement_score(task.context["data"]["userId"])
    return {"route": "engaged" if score > 50 else "inactive"}

async with Orch8Client("http://localhost:8080", tenant_id="my-tenant") as client:
    worker = Orch8Worker(
        client=client,
        worker_id="worker-1",
        handlers={
            "send_welcome_email": send_welcome_email,
            "check_engagement": check_engagement,
        },
        poll_interval=2.0,       # seconds between polls per handler
        heartbeat_interval=30.0, # seconds between heartbeats
        max_concurrent=5,        # semaphore across all handlers
    )
    await worker.start()

Full API reference

All SDKs cover the complete engine API. For the raw REST documentation including request/response schemas and authentication details, see the API reference.

View full API reference (122+ endpoints) →