"""Docker operations wrapper for managing Swarm and Compose containers.""" import socket from datetime import datetime, timezone import docker from docker.errors import NotFound SWARM_SERVICE_LABEL = "com.docker.swarm.service.name" class DockerOps: """Wraps the Docker SDK to manage containers on the local node. Handles both standalone Compose containers and Swarm-managed containers. Swarm containers are detected via the ``com.docker.swarm.service.name`` label and receive special treatment: start/stop scale the service, restart force-updates it. """ def __init__(self) -> None: self.client = docker.DockerClient.from_env() # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _is_swarm(self, container) -> bool: return SWARM_SERVICE_LABEL in container.labels def _swarm_service_name(self, container) -> str | None: return container.labels.get(SWARM_SERVICE_LABEL) def _get_service(self, service_name): """Look up a Swarm service by name.""" services = self.client.services.list() for svc in services: if svc.name == service_name: return svc return None @staticmethod def _format_uptime(started_at: str) -> str: """Return a human-readable uptime string from an ISO timestamp.""" try: # Docker timestamps may have nanosecond precision; truncate to # microseconds so fromisoformat can parse them. clean = started_at.replace("Z", "+00:00") # Truncate fractional seconds to 6 digits if longer if "." in clean: parts = clean.split(".") frac_and_tz = parts[1] # Separate fractional part from timezone for i, ch in enumerate(frac_and_tz): if ch in ("+", "-"): frac = frac_and_tz[:i][:6] tz = frac_and_tz[i:] clean = f"{parts[0]}.{frac}{tz}" break start = datetime.fromisoformat(clean) delta = datetime.now(timezone.utc) - start total_seconds = int(delta.total_seconds()) if total_seconds < 0: return "just started" days, remainder = divmod(total_seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, _ = divmod(remainder, 60) parts = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") return " ".join(parts) if parts else "just started" except Exception: return "unknown" # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def list_containers(self) -> list[dict]: """List all containers (running + stopped) with metadata. Also includes Swarm services scaled to 0 replicas (which have no containers) so they remain visible in the dashboard. Returns a list of dicts, each containing: id, name, status, image, created, uptime, is_swarm, swarm_service Uses the low-level API to get container IDs to avoid NotFound errors from containers that disappear between list and inspect (e.g. Swarm task containers in the Dead state). """ raw_ids = [r["Id"] for r in self.client.api.containers(all=True)] result = [] swarm_services_with_containers = set() for cid in raw_ids: try: c = self.client.containers.get(cid) except NotFound: continue image_tags = c.image.tags image_name = image_tags[0] if image_tags else "" swarm_service = self._swarm_service_name(c) if swarm_service: swarm_services_with_containers.add(swarm_service) started_at = c.attrs.get("State", {}).get("StartedAt", "") result.append( { "id": c.short_id, "name": c.name, "status": c.status, "image": image_name, "created": c.attrs.get("Created", ""), "uptime": self._format_uptime(started_at) if c.status == "running" else None, "is_swarm": self._is_swarm(c), "swarm_service": swarm_service, } ) # Include Swarm services with 0 replicas (only works on manager nodes) try: for svc in self.client.services.list(): if svc.name not in swarm_services_with_containers: image = ( svc.attrs.get("Spec", {}) .get("TaskTemplate", {}) .get("ContainerSpec", {}) .get("Image", "") ) if "@" in image: image = image.split("@")[0] result.append( { "id": f"swarm:{svc.name}", "name": svc.name, "status": "stopped", "image": image, "created": svc.attrs.get("CreatedAt", ""), "uptime": None, "is_swarm": True, "swarm_service": svc.name, } ) except Exception: pass # Not a manager node or Swarm not active return result def start_container(self, container_id: str) -> dict: """Start a container. For Swarm services, scale to 1 replica.""" # Handle 0-replica Swarm services (synthetic swarm: IDs) if container_id.startswith("swarm:"): svc_name = container_id[6:] try: service = self._get_service(svc_name) if service: service.scale(1) return { "success": True, "message": f"Scaled Swarm service {svc_name} to 1 replica", } return {"success": False, "message": f"Swarm service {svc_name} not found"} except Exception as exc: return {"success": False, "message": str(exc)} try: container = self.client.containers.get(container_id) except NotFound: return {"success": False, "message": f"Container {container_id} not found"} try: if self._is_swarm(container): svc_name = self._swarm_service_name(container) service = self._get_service(svc_name) if service: service.scale(1) return { "success": True, "message": f"Scaled Swarm service {svc_name} to 1 replica", } return {"success": False, "message": f"Swarm service {svc_name} not found"} container.start() return {"success": True, "message": f"Started container {container.name}"} except Exception as exc: return {"success": False, "message": str(exc)} def stop_container(self, container_id: str) -> dict: """Stop a container. For Swarm services, scale to 0 replicas.""" if container_id.startswith("swarm:"): svc_name = container_id[6:] try: service = self._get_service(svc_name) if service: service.scale(0) return { "success": True, "message": f"Scaled Swarm service {svc_name} to 0 replicas", } return {"success": False, "message": f"Swarm service {svc_name} not found"} except Exception as exc: return {"success": False, "message": str(exc)} try: container = self.client.containers.get(container_id) except NotFound: return {"success": False, "message": f"Container {container_id} not found"} try: if self._is_swarm(container): svc_name = self._swarm_service_name(container) service = self._get_service(svc_name) if service: service.scale(0) return { "success": True, "message": f"Scaled Swarm service {svc_name} to 0 replicas", } return {"success": False, "message": f"Swarm service {svc_name} not found"} container.stop() return {"success": True, "message": f"Stopped container {container.name}"} except Exception as exc: return {"success": False, "message": str(exc)} def restart_container(self, container_id: str) -> dict: """Restart a container. For Swarm services, force-update.""" if container_id.startswith("swarm:"): svc_name = container_id[6:] try: service = self._get_service(svc_name) if service: service.force_update() return { "success": True, "message": f"Force-updated Swarm service {svc_name}", } return {"success": False, "message": f"Swarm service {svc_name} not found"} except Exception as exc: return {"success": False, "message": str(exc)} try: container = self.client.containers.get(container_id) except NotFound: return {"success": False, "message": f"Container {container_id} not found"} try: if self._is_swarm(container): svc_name = self._swarm_service_name(container) service = self._get_service(svc_name) if service: service.force_update() return { "success": True, "message": f"Force-updated Swarm service {svc_name}", } return {"success": False, "message": f"Swarm service {svc_name} not found"} container.restart() return {"success": True, "message": f"Restarted container {container.name}"} except Exception as exc: return {"success": False, "message": str(exc)} def get_logs(self, container_id: str, tail: int = 100) -> dict: """Get recent logs from a container. Returns: dict with keys ``container`` (name) and ``logs`` (string). """ if container_id.startswith("swarm:"): svc_name = container_id[6:] return { "container": svc_name, "logs": "Service is scaled to 0 replicas — no logs available.\nStart the service to view logs.", } try: container = self.client.containers.get(container_id) except NotFound: return {"container": container_id, "logs": f"Container {container_id} not found"} raw = container.logs(tail=tail) return { "container": container.name, "logs": raw.decode("utf-8", errors="replace"), } def pull_image(self, container_id: str) -> dict: """Pull the latest version of the container's image.""" if container_id.startswith("swarm:"): svc_name = container_id[6:] service = self._get_service(svc_name) if not service: return {"success": False, "message": f"Swarm service {svc_name} not found"} image = ( service.attrs.get("Spec", {}) .get("TaskTemplate", {}) .get("ContainerSpec", {}) .get("Image", "") ) if "@" in image: image = image.split("@")[0] if not image: return {"success": False, "message": f"No image found for service {svc_name}"} try: self.client.images.pull(image) return {"success": True, "message": f"Pulled image {image}"} except Exception as exc: return {"success": False, "message": str(exc)} try: container = self.client.containers.get(container_id) except NotFound: return {"success": False, "message": f"Container {container_id} not found"} tags = container.image.tags if not tags: return {"success": False, "message": f"No image tag found for container {container.name}"} image_name = tags[0] try: self.client.images.pull(image_name) return {"success": True, "message": f"Pulled image {image_name}"} except Exception as exc: return {"success": False, "message": str(exc)} def get_health(self) -> dict: """Return node health info: hostname and container count. Uses the low-level API to avoid NotFound errors from containers stuck in Docker's 'Dead' state. """ containers = self.client.api.containers(all=True) return { "status": "healthy", "hostname": socket.gethostname(), "containers_total": len(containers), }