Files
pluto 6338621ad4 feat(agent): show and manage Swarm services scaled to 0 replicas
Zero-replica Swarm services now appear in the dashboard as "stopped"
instead of being invisible. Start/stop/restart/pull/logs all handle
the synthetic swarm: IDs correctly. Uses low-level API for container
listing to avoid NotFound errors from Dead task containers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 21:13:58 -06:00

345 lines
14 KiB
Python

"""Docker operations wrapper for managing Swarm and Compose containers."""
import socket
from datetime import datetime, timezone
import docker
from docker.errors import NotFound
SWARM_SERVICE_LABEL = "com.docker.swarm.service.name"
class DockerOps:
"""Wraps the Docker SDK to manage containers on the local node.
Handles both standalone Compose containers and Swarm-managed
containers. Swarm containers are detected via the
``com.docker.swarm.service.name`` label and receive special
treatment: start/stop scale the service, restart force-updates it.
"""
def __init__(self) -> None:
self.client = docker.DockerClient.from_env()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _is_swarm(self, container) -> bool:
return SWARM_SERVICE_LABEL in container.labels
def _swarm_service_name(self, container) -> str | None:
return container.labels.get(SWARM_SERVICE_LABEL)
def _get_service(self, service_name):
"""Look up a Swarm service by name."""
services = self.client.services.list()
for svc in services:
if svc.name == service_name:
return svc
return None
@staticmethod
def _format_uptime(started_at: str) -> str:
"""Return a human-readable uptime string from an ISO timestamp."""
try:
# Docker timestamps may have nanosecond precision; truncate to
# microseconds so fromisoformat can parse them.
clean = started_at.replace("Z", "+00:00")
# Truncate fractional seconds to 6 digits if longer
if "." in clean:
parts = clean.split(".")
frac_and_tz = parts[1]
# Separate fractional part from timezone
for i, ch in enumerate(frac_and_tz):
if ch in ("+", "-"):
frac = frac_and_tz[:i][:6]
tz = frac_and_tz[i:]
clean = f"{parts[0]}.{frac}{tz}"
break
start = datetime.fromisoformat(clean)
delta = datetime.now(timezone.utc) - start
total_seconds = int(delta.total_seconds())
if total_seconds < 0:
return "just started"
days, remainder = divmod(total_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, _ = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
return " ".join(parts) if parts else "just started"
except Exception:
return "unknown"
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def list_containers(self) -> list[dict]:
"""List all containers (running + stopped) with metadata.
Also includes Swarm services scaled to 0 replicas (which have no
containers) so they remain visible in the dashboard.
Returns a list of dicts, each containing:
id, name, status, image, created, uptime, is_swarm,
swarm_service
Uses the low-level API to get container IDs to avoid NotFound errors
from containers that disappear between list and inspect (e.g. Swarm
task containers in the Dead state).
"""
raw_ids = [r["Id"] for r in self.client.api.containers(all=True)]
result = []
swarm_services_with_containers = set()
for cid in raw_ids:
try:
c = self.client.containers.get(cid)
except NotFound:
continue
image_tags = c.image.tags
image_name = image_tags[0] if image_tags else "<none>"
swarm_service = self._swarm_service_name(c)
if swarm_service:
swarm_services_with_containers.add(swarm_service)
started_at = c.attrs.get("State", {}).get("StartedAt", "")
result.append(
{
"id": c.short_id,
"name": c.name,
"status": c.status,
"image": image_name,
"created": c.attrs.get("Created", ""),
"uptime": self._format_uptime(started_at) if c.status == "running" else None,
"is_swarm": self._is_swarm(c),
"swarm_service": swarm_service,
}
)
# Include Swarm services with 0 replicas (only works on manager nodes)
try:
for svc in self.client.services.list():
if svc.name not in swarm_services_with_containers:
image = (
svc.attrs.get("Spec", {})
.get("TaskTemplate", {})
.get("ContainerSpec", {})
.get("Image", "<none>")
)
if "@" in image:
image = image.split("@")[0]
result.append(
{
"id": f"swarm:{svc.name}",
"name": svc.name,
"status": "stopped",
"image": image,
"created": svc.attrs.get("CreatedAt", ""),
"uptime": None,
"is_swarm": True,
"swarm_service": svc.name,
}
)
except Exception:
pass # Not a manager node or Swarm not active
return result
def start_container(self, container_id: str) -> dict:
"""Start a container. For Swarm services, scale to 1 replica."""
# Handle 0-replica Swarm services (synthetic swarm: IDs)
if container_id.startswith("swarm:"):
svc_name = container_id[6:]
try:
service = self._get_service(svc_name)
if service:
service.scale(1)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 1 replica",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
except Exception as exc:
return {"success": False, "message": str(exc)}
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.scale(1)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 1 replica",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.start()
return {"success": True, "message": f"Started container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def stop_container(self, container_id: str) -> dict:
"""Stop a container. For Swarm services, scale to 0 replicas."""
if container_id.startswith("swarm:"):
svc_name = container_id[6:]
try:
service = self._get_service(svc_name)
if service:
service.scale(0)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 0 replicas",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
except Exception as exc:
return {"success": False, "message": str(exc)}
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.scale(0)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 0 replicas",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.stop()
return {"success": True, "message": f"Stopped container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def restart_container(self, container_id: str) -> dict:
"""Restart a container. For Swarm services, force-update."""
if container_id.startswith("swarm:"):
svc_name = container_id[6:]
try:
service = self._get_service(svc_name)
if service:
service.force_update()
return {
"success": True,
"message": f"Force-updated Swarm service {svc_name}",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
except Exception as exc:
return {"success": False, "message": str(exc)}
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.force_update()
return {
"success": True,
"message": f"Force-updated Swarm service {svc_name}",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.restart()
return {"success": True, "message": f"Restarted container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def get_logs(self, container_id: str, tail: int = 100) -> dict:
"""Get recent logs from a container.
Returns:
dict with keys ``container`` (name) and ``logs`` (string).
"""
if container_id.startswith("swarm:"):
svc_name = container_id[6:]
return {
"container": svc_name,
"logs": "Service is scaled to 0 replicas — no logs available.\nStart the service to view logs.",
}
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"container": container_id, "logs": f"Container {container_id} not found"}
raw = container.logs(tail=tail)
return {
"container": container.name,
"logs": raw.decode("utf-8", errors="replace"),
}
def pull_image(self, container_id: str) -> dict:
"""Pull the latest version of the container's image."""
if container_id.startswith("swarm:"):
svc_name = container_id[6:]
service = self._get_service(svc_name)
if not service:
return {"success": False, "message": f"Swarm service {svc_name} not found"}
image = (
service.attrs.get("Spec", {})
.get("TaskTemplate", {})
.get("ContainerSpec", {})
.get("Image", "")
)
if "@" in image:
image = image.split("@")[0]
if not image:
return {"success": False, "message": f"No image found for service {svc_name}"}
try:
self.client.images.pull(image)
return {"success": True, "message": f"Pulled image {image}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
tags = container.image.tags
if not tags:
return {"success": False, "message": f"No image tag found for container {container.name}"}
image_name = tags[0]
try:
self.client.images.pull(image_name)
return {"success": True, "message": f"Pulled image {image_name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def get_health(self) -> dict:
"""Return node health info: hostname and container count.
Uses the low-level API to avoid NotFound errors from containers
stuck in Docker's 'Dead' state.
"""
containers = self.client.api.containers(all=True)
return {
"status": "healthy",
"hostname": socket.gethostname(),
"containers_total": len(containers),
}