CrowNest
Python SDK

Python API reference

Every public client and method in the crownest Python package, with parameters and return shapes.

This page lists every public method on the CrowNest and AsyncCrowNest clients, grouped by resource. Keyword arguments are snake_case; returned objects are JSON dictionaries whose field names match the REST API (camelCase), so a command result exposes result["exitCode"].

Sync and async clients have full parity: every method below exists on both, and the async variant takes the same arguments with await. stream_logs is a regular iterator on the sync client and an async iterator on the async client.

Clients

Construct a client directly or as a context manager.

from crownest import CrowNest, AsyncCrowNest, CrowNestApiError

client = CrowNest(api_key=None, base_url="https://api.crownest.dev", timeout=660)
  • api_key falls back to the CROWNEST_API_KEY env var.
  • The sync client supports with and .close(); the async client supports async with.
  • Failed calls raise CrowNestApiError with .status, .code, and .details.

client.sandboxes

Manages sandbox lifecycle. create and get return a SandboxHandle; list and kill return plain dictionaries.

create

client.sandboxes.create(
    *,
    template=None,            # "python" | "node" | "python-node" | "base"
    template_version_id=None, # pin an immutable TemplateVersion ("tplv_...")
    ttl_ms=None,              # requested lifetime in ms (plan-limited)
    metadata=None,            # dict[str, str], max 16 keys
    project_id=None,          # "prj_..."
    idempotency_key=None,     # auto-generated when omitted
) -> SandboxHandle

Returns a handle whose expiresAt reflects the resolved TTL. Raises sandbox_ttl_exceeded if ttl_ms exceeds your plan limit, and quota_exceeded at the concurrency limit.

get

client.sandboxes.get(sandbox_id) -> SandboxHandle

list

client.sandboxes.list() -> list[dict]

Returns live sandboxes; destroyed and failed records are excluded by default.

kill

client.sandboxes.kill(sandbox_id) -> dict

Destroys the sandbox and returns the record with status destroyed. Idempotent by state.

SandboxHandle

The handle returned by create and get wraps the sandbox object:

  • Attribute and mapping access: sandbox.id, sandbox["status"].
  • .to_dict() returns the plain dictionary.
  • .kill() destroys the sandbox.
  • .commands, .files, .artifacts, .previews are sub-clients bound to the sandbox — the same methods as below without the sandbox_id argument.

client.commands

Runs and tracks processes inside a sandbox. A non-zero exit code doesn't raise; check the returned dictionary's status and exitCode.

run

client.commands.run(
    sandbox_id,
    command,
    *,
    cwd=None,             # working directory, default /workspace
    env=None,             # dict[str, str]; CROWNEST* keys rejected
    timeout_ms=None,      # default 60000, max 600000
    collect=None,         # list of {"path": str, "name": str?}
    collect_on=None,      # "success" (default) | "always"
    idempotency_key=None, # auto-generated when omitted
) -> dict

Waits for the command to exit and returns the full command record including exitCode, stdout, and stderr.

start

client.commands.start(
    sandbox_id,
    command,
    *,
    cwd=None,
    env=None,
    timeout_ms=None,
    idempotency_key=None,
) -> dict

Returns immediately with status queued or starting. start doesn't accept collect or collect_on.

get

client.commands.get(command_id) -> dict

cancel

client.commands.cancel(command_id, *, mode=None) -> dict

mode is "graceful" (default, SIGTERM) or "force" (SIGKILL).

logs

client.commands.logs(command_id, *, after_seq=None, limit=None) -> list[dict]

Returns stored log chunks ({"commandId", "createdAt", "data", "seq", "stream"}). limit defaults to 50 with a maximum of 100.

stream_logs

client.commands.stream_logs(command_id, *, after_seq=None)
# sync: Iterator[dict]; async: AsyncIterator[dict]

Streams events live and blocks until the terminal event. Events are dictionaries with type of log, heartbeat, terminal, or error:

cmd = client.commands.start(sandbox.id, "pytest -q")
for event in client.commands.stream_logs(cmd["id"]):
    if event["type"] == "log":
        print(event["data"], end="")
    elif event["type"] == "terminal":
        print("exit code:", event["command"]["exitCode"])

client.files

Manages files inside /workspace. Direct reads and writes are capped at 256 KB; use download_url for larger files. Paths outside the workspace raise path_outside_workspace.

read

client.files.read(sandbox_id, path, *, encoding=None) -> str

encoding is "utf8" (default) or "base64".

write

client.files.write(
    sandbox_id,
    path,
    content,
    *,
    create_parents=None,  # default False
    encoding=None,        # "utf8" (default) | "base64"
    overwrite=None,       # default True
) -> dict

Returns the file's stat record.

delete

client.files.delete(sandbox_id, path) -> None

Deletes a file or empty directory. A non-empty directory raises directory_not_empty; there's no recursive delete.

list

client.files.list(sandbox_id, path="/workspace") -> list[dict]

stat

client.files.stat(sandbox_id, path) -> dict

mkdir

client.files.mkdir(sandbox_id, path, *, parents=None) -> dict

parents defaults to False.

move

client.files.move(sandbox_id, from_path, to_path, *, overwrite=None) -> dict

overwrite defaults to False.

download_url

client.files.download_url(sandbox_id, path) -> dict

Returns {"url", "method": "GET", "authMode": "api_key"} — a short-lived signed URL for files of any size.

client.artifacts

Exports workspace files to durable storage and manages them afterward. Artifacts outlive their sandbox.

create

client.artifacts.create(
    sandbox_id,
    path,                  # source path inside /workspace
    name=None,             # display name, derived from path when omitted
    idempotency_key=None,  # auto-generated when omitted
) -> dict

get

client.artifacts.get(artifact_id) -> dict

list

client.artifacts.list(sandbox_id) -> list[dict]

download

client.artifacts.download(artifact_id) -> bytes

download_url

client.artifacts.download_url(artifact_id) -> dict

Returns {"url", "method", "headers", "authMode"} for a short-lived signed download.

delete

client.artifacts.delete(artifact_id) -> dict

Idempotent by state; returns the artifact with deletedAt set.

client.previews

Exposes authenticated HTTP services from a sandbox at URLs like https://p-a1b2c3.preview.crownest.dev.

create

client.previews.create(sandbox_id, port, auth_mode=None) -> dict

port is the in-sandbox port (1–65535). auth_mode accepts only "authenticated" in v1, which is also the default. One active preview exists per sandbox and port; creating the same one again returns the existing preview.

get

client.previews.get(preview_id) -> dict

list

client.previews.list(sandbox_id) -> list[dict]

revoke

client.previews.revoke(preview_id) -> dict

Idempotent; returns the preview with revokedAt set.

client.projects

list

client.projects.list() -> list[dict]

Returns the projects your API key can access, each with id, orgId, name, and createdAt.

Next steps

On this page