Python SDK
Full reference for the ciderstack package.
pip install ciderstackRequires Python 3.8+ and requests >= 2.28.0.
For pairing support: pip install ciderstack[pairing] (adds cryptography >= 41.0.0).
Client configuration
from ciderstack import FleetClient
client = FleetClient(
"192.168.1.100", # Required — IP or hostname
api_token="csk_abc123...", # Auth: API token (or use node_id)
port=9473, # Optional — default 9473
timeout=30, # Optional — request timeout in seconds
)| Parameter | Type | Default | Description |
|---|---|---|---|
host | str | required | IP address or hostname of the Fleet node |
node_id | str | None | Trusted node ID from pairing |
api_token | str | None | API token string |
port | int | 9473 | Fleet HTTP server port |
timeout | int | 30 | Request timeout in seconds |
Provide exactly one of node_id or api_token.
Class constants:
| Constant | Value |
|---|---|
FleetClient.DEFAULT_PORT | 9473 |
FleetClient.DEFAULT_TIMEOUT | 30 (seconds) |
FleetClient.DEFAULT_EXEC_TIMEOUT | 300 (5 min) |
Pairing
creds = FleetClient.pair(host, code, name=None, port=9473)
# Returns dict with node_id, keys, responder infoSee Authentication for full details.
Node information
# Get node hardware and software info
info = client.get_node_info()
print(f"{info.name} ({info.machine_model})")
print(f"macOS {info.os_version}, Fleet v{info.fleet_version}")
print(f"{info.cpu_cores} cores, {info.total_memory_gb} GB RAM")
# Get real-time resource stats
stats = client.get_node_stats()
print(f"CPU: {stats.cpu_usage_percent:.1f}%")
print(f"Memory: {stats.memory_used_gb:.1f}/{stats.memory_total_gb} GB")
print(f"Disk: {stats.disk_used_gb:.1f}/{stats.disk_total_gb} GB")
print(f"VMs: {stats.running_vm_count}/{stats.total_vm_count}")| Method | Returns | RPC |
|---|---|---|
get_node_info() | NodeInfo | GetNodeInfo |
get_node_stats() | NodeStats | GetNodeStats |
VM management
# List and find VMs
vms = client.list_vms()
vm = client.get_vm("vm-uuid") # None if not found
# Lifecycle
client.start_vm(vm.id)
client.suspend_vm(vm.id) # Pause (stays in memory)
client.start_vm(vm.id) # Resume from suspend
client.stop_vm(vm.id)
client.start_vm_recovery(vm.id) # Boot into macOS Recovery
# Clone, rename, delete
clone = client.clone_vm(vm.id, "my-clone")
client.rename_vm(clone.id, "new-name")
client.delete_vm(clone.id)
# Create a new VM from an OCI image
vm_id = client.create_vm(
name="ci-runner-01",
cpu_count=4,
memory_mb=8192,
disk_gb=64,
oci_image="ghcr.io/myorg/macos-base:latest",
)| Method | Returns | RPC | Mutating |
|---|---|---|---|
list_vms() | List[VM] | ListVMs | No |
get_vm(vm_id) | Optional[VM] | — | No |
start_vm(vm_id) | bool | StartVM | Yes |
stop_vm(vm_id) | bool | StopVM | Yes |
suspend_vm(vm_id) | bool | SuspendVM | Yes |
start_vm_recovery(vm_id) | bool | StartVMRecovery | Yes |
clone_vm(vm_id, new_name) | VM | CloneVM | Yes |
rename_vm(vm_id, new_name) | bool | RenameVM | Yes |
delete_vm(vm_id) | bool | DeleteVM | Yes |
create_vm(name, ...) | str | CreateVMOnNode | Yes |
create_vm parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | VM name |
cpu_count | int | 4 | Virtual CPUs |
memory_mb | int | 8192 | Memory in MB |
disk_gb | int | 64 | Disk size in GB |
ipsw_path | str | None | Path to IPSW on the node |
oci_image | str | None | OCI image reference |
VM settings
# Get current settings
settings = client.get_vm_settings(vm_id)
# Update settings (only provided fields are changed)
client.update_vm_settings(vm_id, cpu_count=8, memory_size=16384)
# Set display resolution
client.update_vm_settings(vm_id, display_width=1920, display_height=1080)
# Mark as disposable with 1-hour TTL
client.update_vm_settings(vm_id, intent="disposable", ttl_seconds=3600)
# Clear the TTL
client.update_vm_settings(vm_id, ttl_seconds=-1)| Method | Returns | RPC | Mutating |
|---|---|---|---|
get_vm_settings(vm_id) | dict | GetVMSettings | No |
update_vm_settings(vm_id, ...) | bool | UpdateVMSettings | Yes |
Settings parameters:
| Parameter | Type | Description |
|---|---|---|
cpu_count | int | Number of virtual CPUs |
memory_size | int | Memory in MB |
display_width | int | Display width in pixels |
display_height | int | Display height in pixels |
intent | str | "disposable", "persistent", "interactive", or "ci" |
ttl_seconds | int | Time-to-live in seconds. -1 to clear. |
Snapshots
# Create a snapshot
snapshot = client.create_snapshot(vm_id, "pre-update", "Before system update")
# List snapshots
snapshots = client.list_snapshots(vm_id)
for s in snapshots:
print(f"{s.name}: {s.created_at.isoformat()} ({s.size_bytes} bytes)")
# Restore (VM must be stopped)
client.restore_snapshot(vm_id, snapshot.id)
# Delete
client.delete_snapshot(vm_id, snapshot.id)| Method | Returns | RPC | Mutating |
|---|---|---|---|
list_snapshots(vm_id) | List[Snapshot] | ListSnapshots | No |
create_snapshot(vm_id, name, description?) | Snapshot | CreateSnapshot | Yes |
restore_snapshot(vm_id, snapshot_id) | bool | RestoreSnapshot | Yes |
delete_snapshot(vm_id, snapshot_id) | bool | DeleteSnapshot | Yes |
Command execution
Execute shell commands on running VMs via SSH. The HTTP timeout is automatically extended for long-running commands.
result = client.exec_command(
vm_id, "uname -a",
ssh_user="admin", ssh_password="password",
)
if result.success:
print(result.stdout)
print(f"Completed in {result.duration}s")
else:
print(f"Exit {result.exit_code}: {result.stderr}")
# Long-running command with extended timeout
build = client.exec_command(
vm_id,
"xcodebuild -scheme MyApp build",
ssh_user="admin",
ssh_password="password",
timeout=600, # 10 minutes
)| Method | Returns | RPC | Mutating |
|---|---|---|---|
exec_command(vm_id, command, ...) | ExecResult | ExecCommand | Yes |
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
vm_id | str | required | Target VM |
command | str | required | Shell command |
ssh_user | str | "cideradmin" | SSH username |
ssh_password | str | None | SSH password |
timeout | int | 300 | Command timeout in seconds |
Templates
# List templates
templates = client.list_templates()
for t in templates:
print(f"{t.name} ({t.category}) — {t.source_type}")
print(f" Defaults: {t.default_cpu} CPU, {t.default_memory_mb} MB, {t.default_disk_gb} GB")
# Look up by ID or name
template = client.get_template("template-uuid")
by_name = client.get_template_by_name("CI Runner Base")
# Create from a stopped VM
new_template = client.create_template_from_vm(
vm_id="abc-123-...",
name="Clean macOS 15 Base",
description="Fresh macOS Sequoia with Xcode CLI tools",
category="base",
keep_original_vm=True,
)
# Delete
client.delete_template(new_template.id)| Method | Returns | RPC | Mutating |
|---|---|---|---|
list_templates() | List[Template] | ListTemplates | No |
get_template(template_id) | Optional[Template] | GetTemplate | No |
get_template_by_name(name) | Optional[Template] | GetTemplate | No |
create_template_from_vm(vm_id, name, ...) | Template | CreateTemplateFromVM | Yes |
delete_template(template_id) | bool | DeleteTemplate | Yes |
create_template_from_vm parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
vm_id | str | required | Source VM (must be stopped) |
name | str | required | Template name |
description | str | None | Description |
category | str | None | "base", "dev", "cicd", "testing", "custom" |
keep_original_vm | bool | True | Keep the source VM after template creation |
Shared folders
Share directories between the host and VM using VirtioFS.
# List shared folders
folders = client.list_shared_folders(vm_id)
for f in folders:
mode = "ro" if f.get("readOnly") else "rw"
status = "enabled" if f.get("isEnabled") else "disabled"
print(f"{f['name']}: {f['hostPath']} ({mode}, {status})")
# Add a shared folder
client.add_shared_folder(
vm_id=vm_id,
name="project-src",
host_path="/Users/dev/project",
read_only=False,
)
# Toggle on/off
client.set_shared_folder_enabled(vm_id, "project-src", False)
client.set_shared_folder_enabled(vm_id, "project-src", True)
# Remove
client.remove_shared_folder(vm_id, "project-src")| Method | Returns | RPC | Mutating |
|---|---|---|---|
list_shared_folders(vm_id) | List[dict] | ListSharedFolders | No |
add_shared_folder(vm_id, name, host_path, ...) | bool | AddSharedFolder | Yes |
remove_shared_folder(vm_id, name) | bool | RemoveSharedFolder | Yes |
set_shared_folder_enabled(vm_id, name, enabled) | bool | SetSharedFolderEnabled | Yes |
add_shared_folder parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
vm_id | str | required | Target VM |
name | str | required | Display name |
host_path | str | required | Absolute path on the host |
read_only | bool | False | Mount as read-only |
mount_tag | str | None | VirtioFS mount tag |
Image management
# List available IPSWs and OCI images
ipsws = client.list_ipsws()
images = client.list_oci_images()
# Download an IPSW to the node
client.download_ipsw(
"https://updates.cdn-apple.com/.../restore.ipsw",
"macOS 15.1",
"15.1",
)
# Get IPSW metadata
info = client.get_ipsw_info("/path/to/restore.ipsw")
# Pull an OCI image (public)
client.pull_oci_image("ghcr.io/myorg/macos-base:latest")
# Pull an OCI image (private registry)
client.pull_oci_image(
"ghcr.io/myorg/private:v1",
username="user",
password="ghp_token...",
)
# Push a VM as an OCI image (returns task ID)
task_id = client.push_image(vm_id, "ghcr.io/myorg/my-vm:latest")| Method | Returns | RPC | Mutating |
|---|---|---|---|
list_ipsws() | List[dict] | ListRemoteIPSWs | No |
list_oci_images() | List[dict] | ListRemoteOCIImages | No |
download_ipsw(url, name, version) | bool | DownloadIPSWOnNode | Yes |
get_ipsw_info(path) | dict | GetIPSWInfo | No |
pull_oci_image(image_ref, username?, password?) | bool | PullOCIImageOnNode | Yes |
push_image(vm_id, image_name, insecure?) | str | PushImage | Yes |
Tasks
Monitor background tasks like VM creation, image pulls, and IPSW downloads.
tasks = client.get_tasks()
for task in tasks:
print(f"[{task.status}] {task.title}: {task.progress:.0f}%")
if task.progress_text:
print(f" {task.progress_text}")
if task.error_message:
print(f" ERROR: {task.error_message}")
# Include completed tasks
all_tasks = client.get_tasks(include_completed=True)| Method | Returns | RPC |
|---|---|---|
get_tasks(include_completed?) | List[Task] | GetTasks |
Fleet overview
Get aggregated fleet status. Only available on the manager node.
overview = client.get_fleet_overview()
# Fleet-wide stats (raw dict)
stats = overview["overview"]["stats"]
print(f"Nodes: {stats['connectedNodes']}/{stats['totalNodes']}")
print(f"VMs: {stats['runningVMs']}/{stats['totalVMs']} running")
print(f"CPU: {stats['aggregatedCPUUsage']:.1f}%")
# Manager node
mgr = overview["overview"]["managerNode"]
print(f"Manager: {mgr['name']} ({mgr['hostname']})")
# Worker nodes
for worker in overview["overview"]["workerNodes"]:
print(f"{worker['name']}: {worker['connectionState']}")
# All VMs across the fleet
for vm in overview["overview"]["vms"]:
print(f"{vm['name']} on {vm['hostName']}: {vm['state']}")
# Activity log
events = client.get_fleet_events(limit=50)
for e in events:
print(f"[{e['eventType']}] {e['nodeName']}: {e['message']}")
# Filter by event type
start_events = client.get_fleet_events(limit=20, event_type="vm_started")
# Node resource availability
resources = client.get_remote_resources()| Method | Returns | RPC |
|---|---|---|
get_fleet_overview() | dict | GetFleetOverview |
get_fleet_events(limit?, event_type?) | List[dict] | GetFleetEvents |
get_remote_resources() | dict | GetRemoteResources |
API token management
See Authentication for full details.
| Method | Returns | RPC | Mutating |
|---|---|---|---|
generate_api_token(name, permissions?) | APIToken | GenerateAPIToken | Yes |
revoke_api_token(token_id) | bool | RevokeAPIToken | Yes |
list_api_tokens() | List[APITokenSummary] | ListAPITokens | No |
Fleet utilities
# Unpair a node
client.unpair("node-uuid")
# Clean up VMs with expired TTL
cleaned = client.cleanup_vms()
print(f"Cleaned {cleaned} VMs")
# Force cleanup (includes persistent VMs)
force_clean = client.cleanup_vms(force=True)| Method | Returns | RPC | Mutating |
|---|---|---|---|
unpair(node_id) | bool | Unpair | Yes |
cleanup_vms(force?) | int | CleanupVMs | Yes |
get_ipsw_info(path) | dict | GetIPSWInfo | No |
Error handling
All API errors raise FleetError:
from ciderstack import FleetClient, FleetError
try:
client.start_vm("non-existent-id")
except FleetError as e:
print(f"Fleet error: {e}")
print(f"Response: {e.response}")FleetError has a response attribute containing the raw API response for debugging.
Types and enums
Enums
from enum import Enum
class VMState(str, Enum):
STOPPED = "stopped"
STARTING = "starting"
RUNNING = "running"
PAUSED = "paused"
STOPPING = "stopping"
ERROR = "error"
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class NodeRole(str, Enum):
WORKER = "worker"
MANAGER = "manager"
OPERATOR = "operator"
class APITokenPermissions(str, Enum):
READ_ONLY = "readOnly"
FULL_ACCESS = "fullAccess"Dataclasses
All dataclasses include a from_dict(cls, data: dict) class method for parsing API responses.
@dataclass
class NodeInfo:
node_id: str
name: str
hostname: str
ip_address: str
port: int
machine_model: str
os_version: str
cpu_cores: int
total_memory_gb: int
fleet_version: str
@dataclass
class NodeStats:
node_id: str
timestamp: datetime
cpu_usage_percent: float
memory_used_gb: float
memory_total_gb: float
disk_used_gb: float
disk_total_gb: float
running_vm_count: int
total_vm_count: int
@dataclass
class VM:
id: str
name: str
state: VMState
cpu_count: int
memory_mb: int
disk_size_gb: Optional[int]
os_version: Optional[str]
ip_address: Optional[str]
@dataclass
class Snapshot:
id: str
name: str
description: Optional[str]
created_at: datetime
size_bytes: int
@dataclass
class ExecResult:
success: bool
exit_code: int
stdout: str
stderr: str
duration: float
@dataclass
class Template:
id: str
name: str
description: Optional[str]
macos_version: Optional[str]
icon_name: str
category: str
source_type: str
created_at: datetime
modified_at: datetime
disk_size_bytes: Optional[int]
default_cpu: int
default_memory_mb: int
default_disk_gb: int
@dataclass
class APIToken:
id: str
name: str
token: str
permissions: APITokenPermissions
created_at: datetime
last_used_at: Optional[datetime]
@dataclass
class APITokenSummary:
id: str
name: str
token_prefix: str
permissions: APITokenPermissions
created_at: datetime
last_used_at: Optional[datetime]
@dataclass
class Task:
id: str
type: str
title: str
status: TaskStatus
progress: float
progress_text: Optional[str]
started_at: datetime
completed_at: Optional[datetime]
vm_id: Optional[str]
vm_name: Optional[str]
error_message: Optional[str]Exports
The SDK exports the following from ciderstack:
Classes: FleetClient, FleetError
Enums: VMState, TaskStatus, NodeRole, APITokenPermissions
Dataclasses: Template, APIToken, APITokenSummary
Note:
NodeInfo,NodeStats,VM,Snapshot,ExecResult, andTaskare returned by methods but not exported in__all__. They can be imported directly fromciderstack.typesif needed for type hints.
See also
- JavaScript / TypeScript SDK — JavaScript equivalent
- API Reference — Side-by-side comparison
- Examples & Workflows — CI/CD, monitoring, provisioning
- Authentication — Tokens and pairing