Files
farm-manager/agent/docker_ops.py
T
pluto fe76ca7456 fix: agent Dockerfile package structure and Dead container crash
- Dockerfile: COPY to /app/agent/ and use agent.main:app for proper
  package imports
- docker_ops: use low-level API in get_health() to avoid NotFound on
  containers stuck in Docker Dead state
- Add comprehensive README with architecture, API docs, and usage

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 22:48:20 -06:00

231 lines
8.8 KiB
Python

"""Docker operations wrapper for managing Swarm and Compose containers."""
import socket
from datetime import datetime, timezone
import docker
from docker.errors import NotFound
SWARM_SERVICE_LABEL = "com.docker.swarm.service.name"
class DockerOps:
"""Wraps the Docker SDK to manage containers on the local node.
Handles both standalone Compose containers and Swarm-managed
containers. Swarm containers are detected via the
``com.docker.swarm.service.name`` label and receive special
treatment: start/stop scale the service, restart force-updates it.
"""
def __init__(self) -> None:
self.client = docker.DockerClient.from_env()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _is_swarm(self, container) -> bool:
return SWARM_SERVICE_LABEL in container.labels
def _swarm_service_name(self, container) -> str | None:
return container.labels.get(SWARM_SERVICE_LABEL)
def _get_service(self, service_name):
"""Look up a Swarm service by name."""
services = self.client.services.list()
for svc in services:
if svc.name == service_name:
return svc
return None
@staticmethod
def _format_uptime(started_at: str) -> str:
"""Return a human-readable uptime string from an ISO timestamp."""
try:
# Docker timestamps may have nanosecond precision; truncate to
# microseconds so fromisoformat can parse them.
clean = started_at.replace("Z", "+00:00")
# Truncate fractional seconds to 6 digits if longer
if "." in clean:
parts = clean.split(".")
frac_and_tz = parts[1]
# Separate fractional part from timezone
for i, ch in enumerate(frac_and_tz):
if ch in ("+", "-"):
frac = frac_and_tz[:i][:6]
tz = frac_and_tz[i:]
clean = f"{parts[0]}.{frac}{tz}"
break
start = datetime.fromisoformat(clean)
delta = datetime.now(timezone.utc) - start
total_seconds = int(delta.total_seconds())
if total_seconds < 0:
return "just started"
days, remainder = divmod(total_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, _ = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
return " ".join(parts) if parts else "just started"
except Exception:
return "unknown"
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def list_containers(self) -> list[dict]:
"""List all containers (running + stopped) with metadata.
Returns a list of dicts, each containing:
id, name, status, image, created, uptime, is_swarm,
swarm_service
"""
containers = self.client.containers.list(all=True)
result = []
for c in containers:
image_tags = c.image.tags
image_name = image_tags[0] if image_tags else "<none>"
swarm_service = self._swarm_service_name(c)
started_at = c.attrs.get("State", {}).get("StartedAt", "")
result.append(
{
"id": c.short_id,
"name": c.name,
"status": c.status,
"image": image_name,
"created": c.attrs.get("Created", ""),
"uptime": self._format_uptime(started_at) if c.status == "running" else None,
"is_swarm": self._is_swarm(c),
"swarm_service": swarm_service,
}
)
return result
def start_container(self, container_id: str) -> dict:
"""Start a container. For Swarm services, scale to 1 replica."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.scale(1)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 1 replica",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.start()
return {"success": True, "message": f"Started container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def stop_container(self, container_id: str) -> dict:
"""Stop a container. For Swarm services, scale to 0 replicas."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.scale(0)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 0 replicas",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.stop()
return {"success": True, "message": f"Stopped container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def restart_container(self, container_id: str) -> dict:
"""Restart a container. For Swarm services, force-update."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.force_update()
return {
"success": True,
"message": f"Force-updated Swarm service {svc_name}",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.restart()
return {"success": True, "message": f"Restarted container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def get_logs(self, container_id: str, tail: int = 100) -> dict:
"""Get recent logs from a container.
Returns:
dict with keys ``container`` (name) and ``logs`` (string).
"""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"container": container_id, "logs": f"Container {container_id} not found"}
raw = container.logs(tail=tail)
return {
"container": container.name,
"logs": raw.decode("utf-8", errors="replace"),
}
def pull_image(self, container_id: str) -> dict:
"""Pull the latest version of the container's image."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
tags = container.image.tags
if not tags:
return {"success": False, "message": f"No image tag found for container {container.name}"}
image_name = tags[0]
try:
self.client.images.pull(image_name)
return {"success": True, "message": f"Pulled image {image_name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def get_health(self) -> dict:
"""Return node health info: hostname and container count.
Uses the low-level API to avoid NotFound errors from containers
stuck in Docker's 'Dead' state.
"""
containers = self.client.api.containers(all=True)
return {
"status": "healthy",
"hostname": socket.gethostname(),
"containers_total": len(containers),
}