6338621ad4
Zero-replica Swarm services now appear in the dashboard as "stopped" instead of being invisible. Start/stop/restart/pull/logs all handle the synthetic swarm: IDs correctly. Uses low-level API for container listing to avoid NotFound errors from Dead task containers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
345 lines
14 KiB
Python
345 lines
14 KiB
Python
"""Docker operations wrapper for managing Swarm and Compose containers."""
|
|
|
|
import socket
|
|
from datetime import datetime, timezone
|
|
|
|
import docker
|
|
from docker.errors import NotFound
|
|
|
|
SWARM_SERVICE_LABEL = "com.docker.swarm.service.name"
|
|
|
|
|
|
class DockerOps:
|
|
"""Wraps the Docker SDK to manage containers on the local node.
|
|
|
|
Handles both standalone Compose containers and Swarm-managed
|
|
containers. Swarm containers are detected via the
|
|
``com.docker.swarm.service.name`` label and receive special
|
|
treatment: start/stop scale the service, restart force-updates it.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.client = docker.DockerClient.from_env()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _is_swarm(self, container) -> bool:
|
|
return SWARM_SERVICE_LABEL in container.labels
|
|
|
|
def _swarm_service_name(self, container) -> str | None:
|
|
return container.labels.get(SWARM_SERVICE_LABEL)
|
|
|
|
def _get_service(self, service_name):
|
|
"""Look up a Swarm service by name."""
|
|
services = self.client.services.list()
|
|
for svc in services:
|
|
if svc.name == service_name:
|
|
return svc
|
|
return None
|
|
|
|
@staticmethod
|
|
def _format_uptime(started_at: str) -> str:
|
|
"""Return a human-readable uptime string from an ISO timestamp."""
|
|
try:
|
|
# Docker timestamps may have nanosecond precision; truncate to
|
|
# microseconds so fromisoformat can parse them.
|
|
clean = started_at.replace("Z", "+00:00")
|
|
# Truncate fractional seconds to 6 digits if longer
|
|
if "." in clean:
|
|
parts = clean.split(".")
|
|
frac_and_tz = parts[1]
|
|
# Separate fractional part from timezone
|
|
for i, ch in enumerate(frac_and_tz):
|
|
if ch in ("+", "-"):
|
|
frac = frac_and_tz[:i][:6]
|
|
tz = frac_and_tz[i:]
|
|
clean = f"{parts[0]}.{frac}{tz}"
|
|
break
|
|
|
|
start = datetime.fromisoformat(clean)
|
|
delta = datetime.now(timezone.utc) - start
|
|
total_seconds = int(delta.total_seconds())
|
|
if total_seconds < 0:
|
|
return "just started"
|
|
days, remainder = divmod(total_seconds, 86400)
|
|
hours, remainder = divmod(remainder, 3600)
|
|
minutes, _ = divmod(remainder, 60)
|
|
parts = []
|
|
if days:
|
|
parts.append(f"{days}d")
|
|
if hours:
|
|
parts.append(f"{hours}h")
|
|
if minutes:
|
|
parts.append(f"{minutes}m")
|
|
return " ".join(parts) if parts else "just started"
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def list_containers(self) -> list[dict]:
|
|
"""List all containers (running + stopped) with metadata.
|
|
|
|
Also includes Swarm services scaled to 0 replicas (which have no
|
|
containers) so they remain visible in the dashboard.
|
|
|
|
Returns a list of dicts, each containing:
|
|
id, name, status, image, created, uptime, is_swarm,
|
|
swarm_service
|
|
|
|
Uses the low-level API to get container IDs to avoid NotFound errors
|
|
from containers that disappear between list and inspect (e.g. Swarm
|
|
task containers in the Dead state).
|
|
"""
|
|
raw_ids = [r["Id"] for r in self.client.api.containers(all=True)]
|
|
result = []
|
|
swarm_services_with_containers = set()
|
|
|
|
for cid in raw_ids:
|
|
try:
|
|
c = self.client.containers.get(cid)
|
|
except NotFound:
|
|
continue
|
|
image_tags = c.image.tags
|
|
image_name = image_tags[0] if image_tags else "<none>"
|
|
swarm_service = self._swarm_service_name(c)
|
|
if swarm_service:
|
|
swarm_services_with_containers.add(swarm_service)
|
|
started_at = c.attrs.get("State", {}).get("StartedAt", "")
|
|
result.append(
|
|
{
|
|
"id": c.short_id,
|
|
"name": c.name,
|
|
"status": c.status,
|
|
"image": image_name,
|
|
"created": c.attrs.get("Created", ""),
|
|
"uptime": self._format_uptime(started_at) if c.status == "running" else None,
|
|
"is_swarm": self._is_swarm(c),
|
|
"swarm_service": swarm_service,
|
|
}
|
|
)
|
|
|
|
# Include Swarm services with 0 replicas (only works on manager nodes)
|
|
try:
|
|
for svc in self.client.services.list():
|
|
if svc.name not in swarm_services_with_containers:
|
|
image = (
|
|
svc.attrs.get("Spec", {})
|
|
.get("TaskTemplate", {})
|
|
.get("ContainerSpec", {})
|
|
.get("Image", "<none>")
|
|
)
|
|
if "@" in image:
|
|
image = image.split("@")[0]
|
|
result.append(
|
|
{
|
|
"id": f"swarm:{svc.name}",
|
|
"name": svc.name,
|
|
"status": "stopped",
|
|
"image": image,
|
|
"created": svc.attrs.get("CreatedAt", ""),
|
|
"uptime": None,
|
|
"is_swarm": True,
|
|
"swarm_service": svc.name,
|
|
}
|
|
)
|
|
except Exception:
|
|
pass # Not a manager node or Swarm not active
|
|
|
|
return result
|
|
|
|
def start_container(self, container_id: str) -> dict:
|
|
"""Start a container. For Swarm services, scale to 1 replica."""
|
|
# Handle 0-replica Swarm services (synthetic swarm: IDs)
|
|
if container_id.startswith("swarm:"):
|
|
svc_name = container_id[6:]
|
|
try:
|
|
service = self._get_service(svc_name)
|
|
if service:
|
|
service.scale(1)
|
|
return {
|
|
"success": True,
|
|
"message": f"Scaled Swarm service {svc_name} to 1 replica",
|
|
}
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
try:
|
|
container = self.client.containers.get(container_id)
|
|
except NotFound:
|
|
return {"success": False, "message": f"Container {container_id} not found"}
|
|
|
|
try:
|
|
if self._is_swarm(container):
|
|
svc_name = self._swarm_service_name(container)
|
|
service = self._get_service(svc_name)
|
|
if service:
|
|
service.scale(1)
|
|
return {
|
|
"success": True,
|
|
"message": f"Scaled Swarm service {svc_name} to 1 replica",
|
|
}
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
|
|
container.start()
|
|
return {"success": True, "message": f"Started container {container.name}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
def stop_container(self, container_id: str) -> dict:
|
|
"""Stop a container. For Swarm services, scale to 0 replicas."""
|
|
if container_id.startswith("swarm:"):
|
|
svc_name = container_id[6:]
|
|
try:
|
|
service = self._get_service(svc_name)
|
|
if service:
|
|
service.scale(0)
|
|
return {
|
|
"success": True,
|
|
"message": f"Scaled Swarm service {svc_name} to 0 replicas",
|
|
}
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
try:
|
|
container = self.client.containers.get(container_id)
|
|
except NotFound:
|
|
return {"success": False, "message": f"Container {container_id} not found"}
|
|
|
|
try:
|
|
if self._is_swarm(container):
|
|
svc_name = self._swarm_service_name(container)
|
|
service = self._get_service(svc_name)
|
|
if service:
|
|
service.scale(0)
|
|
return {
|
|
"success": True,
|
|
"message": f"Scaled Swarm service {svc_name} to 0 replicas",
|
|
}
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
|
|
container.stop()
|
|
return {"success": True, "message": f"Stopped container {container.name}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
def restart_container(self, container_id: str) -> dict:
|
|
"""Restart a container. For Swarm services, force-update."""
|
|
if container_id.startswith("swarm:"):
|
|
svc_name = container_id[6:]
|
|
try:
|
|
service = self._get_service(svc_name)
|
|
if service:
|
|
service.force_update()
|
|
return {
|
|
"success": True,
|
|
"message": f"Force-updated Swarm service {svc_name}",
|
|
}
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
try:
|
|
container = self.client.containers.get(container_id)
|
|
except NotFound:
|
|
return {"success": False, "message": f"Container {container_id} not found"}
|
|
|
|
try:
|
|
if self._is_swarm(container):
|
|
svc_name = self._swarm_service_name(container)
|
|
service = self._get_service(svc_name)
|
|
if service:
|
|
service.force_update()
|
|
return {
|
|
"success": True,
|
|
"message": f"Force-updated Swarm service {svc_name}",
|
|
}
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
|
|
container.restart()
|
|
return {"success": True, "message": f"Restarted container {container.name}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
def get_logs(self, container_id: str, tail: int = 100) -> dict:
|
|
"""Get recent logs from a container.
|
|
|
|
Returns:
|
|
dict with keys ``container`` (name) and ``logs`` (string).
|
|
"""
|
|
if container_id.startswith("swarm:"):
|
|
svc_name = container_id[6:]
|
|
return {
|
|
"container": svc_name,
|
|
"logs": "Service is scaled to 0 replicas — no logs available.\nStart the service to view logs.",
|
|
}
|
|
|
|
try:
|
|
container = self.client.containers.get(container_id)
|
|
except NotFound:
|
|
return {"container": container_id, "logs": f"Container {container_id} not found"}
|
|
|
|
raw = container.logs(tail=tail)
|
|
return {
|
|
"container": container.name,
|
|
"logs": raw.decode("utf-8", errors="replace"),
|
|
}
|
|
|
|
def pull_image(self, container_id: str) -> dict:
|
|
"""Pull the latest version of the container's image."""
|
|
if container_id.startswith("swarm:"):
|
|
svc_name = container_id[6:]
|
|
service = self._get_service(svc_name)
|
|
if not service:
|
|
return {"success": False, "message": f"Swarm service {svc_name} not found"}
|
|
image = (
|
|
service.attrs.get("Spec", {})
|
|
.get("TaskTemplate", {})
|
|
.get("ContainerSpec", {})
|
|
.get("Image", "")
|
|
)
|
|
if "@" in image:
|
|
image = image.split("@")[0]
|
|
if not image:
|
|
return {"success": False, "message": f"No image found for service {svc_name}"}
|
|
try:
|
|
self.client.images.pull(image)
|
|
return {"success": True, "message": f"Pulled image {image}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
try:
|
|
container = self.client.containers.get(container_id)
|
|
except NotFound:
|
|
return {"success": False, "message": f"Container {container_id} not found"}
|
|
|
|
tags = container.image.tags
|
|
if not tags:
|
|
return {"success": False, "message": f"No image tag found for container {container.name}"}
|
|
|
|
image_name = tags[0]
|
|
try:
|
|
self.client.images.pull(image_name)
|
|
return {"success": True, "message": f"Pulled image {image_name}"}
|
|
except Exception as exc:
|
|
return {"success": False, "message": str(exc)}
|
|
|
|
def get_health(self) -> dict:
|
|
"""Return node health info: hostname and container count.
|
|
|
|
Uses the low-level API to avoid NotFound errors from containers
|
|
stuck in Docker's 'Dead' state.
|
|
"""
|
|
containers = self.client.api.containers(all=True)
|
|
return {
|
|
"status": "healthy",
|
|
"hostname": socket.gethostname(),
|
|
"containers_total": len(containers),
|
|
}
|