Skip to main content

Python SDK Overview

Contributions are welcome at https://github.com/CloudCruise/cloudcruise-python. Report bugs via GitHub issues or Discord. The CloudCruise Python SDK lets you launch browser automation workflows, stream run events, verify webhooks, and manage vault credentials directly from Python services.

Installation

pip install cloudcruise
Set the required credentials in code or via environment variables (CLOUDCRUISE_API_KEY, CLOUDCRUISE_ENCRYPTION_KEY). Retrieve them from CloudCruise Settings.
from cloudcruise import CloudCruise, CloudCruiseParams

client = CloudCruise(
    CloudCruiseParams(
        api_key="your-api-key",
        encryption_key="your-encryption-key",
    )
)

1. Creating a Run

from cloudcruise import StartRunRequest

request = StartRunRequest(
    workflow_id="workflow-123",
    run_input_variables={
        "variable_1": "https://example.com",
        "variable_2": "john_doe",
    },
)

run = client.runs.start(request)
print("Session ID:", run.sessionId)

2. Listening to run events

Attach per-event handlers for an attribute-access experience. Each handler receives a FlattenedRunEvent with type, payload, timestamp, expires_at, and raw.
def on_execution_step(event):
    payload = event.payload
    current_step = payload.get("current_step")
    next_step = payload.get("next_step")
    print(f"[SESSION: {handle.sessionId}] STEP: {current_step} -> {next_step} @ {event.timestamp}")


def on_execution_start(event):
    workflow_id = event.payload.get("workflow_id")
    print(f"[SESSION: {handle.sessionId}] START: {workflow_id} @ {event.timestamp}")


handle = client.runs.start(
    StartRunRequest(
        workflow_id="workflow-123",
        run_input_variables={"target": "https://example.com"},
    )
)

handle.on("execution.step", on_execution_step)
handle.on("execution.start", on_execution_start)

handle.on("error", lambda err: print(f"[SESSION: {handle.sessionId}] ERROR: {err}"))
handle.on("end", lambda info: print(f"[SESSION: {handle.sessionId}] Workflow completed with status: {info.get('type')}"))
You can also subscribe to run.event once and switch on event.type:
def on_run_event(event):
    if event.type == "execution.step":
        payload = event.payload
        print("Execution step:", payload.get("current_step"), "->", payload.get("next_step"))
    else:
        print("Event:", event.type)


handle.on("run.event", on_run_event)

Async iteration (streaming)

Prefer a loop? Iterate over the handle to process the underlying SSE messages. Each item is the original SSE envelope.
for message in handle:
    data = (message.get("data") or {})
    event_type = data.get("event")
    if event_type != "execution.success":
        print("Received:", event_type)
        continue

    print("Workflow completed successfully")
    break

Other available events

  • open: connection established
  • ping: heartbeat messages
  • reconnect: reconnect attempts with {"attemptDelayMs": ...}
  • message: catch-all mirror of run.event and ping
  • error: SSE errors
  • end: terminal status for the session
Example:
handle.on("ping", lambda payload: print("ping:", payload))
handle.on("reconnect", lambda payload: print("reconnect in", payload.get("attemptDelayMs"), "ms"))

3. Wait for Completion

You can block until the workflow completes. For long-running workflows this may take a while, so use cautiously. The returned value is a RunResult dataclass.
run = client.runs.start(
    StartRunRequest(
        workflow_id="workflow-123",
        run_input_variables={"target": "https://example.com"},
    )
)
result = run.wait()
print("Run completed:", result.status)
print("Output data:", result.data)

Next Steps

  • Explore the Run API for detailed workflow execution options
  • Learn about Vault API for secure credential management
  • Check out Workflow API to manage your workflows
  • Set up Webhooks for event notifications