Documentation Index
Fetch the complete documentation index at: https://docs.cloudcruise.com/llms.txt
Use this file to discover all available pages before exploring further.
Python SDK Overview
Contributions are welcome at https://github.com/CloudCruise/cloudcruise-python.
Report bugs via GitHub issues or Discord.
The CloudCruise Python SDK lets you launch browser automation workflows, stream run events, verify webhooks, and manage vault credentials directly from Python services.
Installation
Set the required credentials in code or via environment variables (CLOUDCRUISE_API_KEY, CLOUDCRUISE_ENCRYPTION_KEY). Retrieve them from CloudCruise Settings.
from cloudcruise import CloudCruise, CloudCruiseParams
client = CloudCruise(
CloudCruiseParams(
api_key="your-api-key",
encryption_key="your-encryption-key",
)
)
1. Creating a Run
from cloudcruise import StartRunRequest
request = StartRunRequest(
workflow_id="workflow-123",
run_input_variables={
"variable_1": "https://example.com",
"variable_2": "john_doe",
},
)
run = client.runs.start(request)
print("Session ID:", run.sessionId)
2. Listening to run events
Attach per-event handlers for a strongly typed experience. Each handler receives a flattened payload with type, payload, timestamp, and the original _raw message.
def on_execution_step(event):
payload = event.get("payload", {})
current_step = payload.get("current_step")
next_step = payload.get("next_step")
print(f"[SESSION: {handle.sessionId}] STEP: {current_step} -> {next_step} @ {event.get('timestamp')}")
def on_execution_start(event):
workflow_id = event.get("payload", {}).get("workflow_id")
print(f"[SESSION: {handle.sessionId}] START: {workflow_id} @ {event.get('timestamp')}")
handle = client.runs.start(
StartRunRequest(
workflow_id="workflow-123",
run_input_variables={"target": "https://example.com"},
)
)
handle.on("execution.step", on_execution_step)
handle.on("execution.start", on_execution_start)
handle.on("error", lambda err: print(f"[SESSION: {handle.sessionId}] ERROR: {err}"))
handle.on("end", lambda info: print(f"[SESSION: {handle.sessionId}] Workflow completed with status: {info.get('type')}"))
You can also subscribe to run.event once and switch on event["type"]:
def on_run_event(event):
if event.get("type") == "execution.step":
payload = event.get("payload", {})
print("Execution step:", payload.get("current_step"), "->", payload.get("next_step"))
else:
print("Event:", event.get("type"))
handle.on("run.event", on_run_event)
Async iteration (streaming)
Prefer a loop? Iterate over the handle to process the underlying SSE messages. Each item is the original SSE envelope.
for message in handle:
data = (message.get("data") or {})
event_type = data.get("event")
if event_type != "execution.success":
print("Received:", event_type)
continue
print("Workflow completed successfully")
break
Other available events
open: connection established
ping: heartbeat messages
reconnect: reconnect attempts with {"attemptDelayMs": ...}
message: catch-all mirror of run.event and ping
error: SSE errors
end: terminal status for the session
Example:
handle.on("ping", lambda payload: print("ping:", payload))
handle.on("reconnect", lambda payload: print("reconnect in", payload.get("attemptDelayMs"), "ms"))
3. Wait for Completion
You can block until the workflow completes. For long-running workflows this may take a while, so use cautiously. The returned value is the raw JSON response from GET /run/{session_id}.
run = client.runs.start(
StartRunRequest(
workflow_id="workflow-123",
run_input_variables={"target": "https://example.com"},
)
)
result = run.wait()
print("Run completed:", result.get("status"))
print("Output data:", result.get("data"))
Next Steps
- Explore the Run API for detailed workflow execution options
- Learn about Vault API for secure credential management
- Check out Workflow API to manage your workflows
- Set up Webhooks for event notifications