feat(agent): Docker operations with Swarm/Compose detection (TDD)

Add DockerOps class wrapping the Docker SDK with methods for listing,
starting, stopping, restarting containers, fetching logs, and pulling
images.  Swarm containers are detected via labels and receive special
handling (scale/force-update instead of direct container ops).
Includes 16 passing tests using mocked Docker client.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 18:57:48 -06:00
parent fdfb9edfb0
commit 8b3e2e5067
2 changed files with 615 additions and 0 deletions
+226
View File
@@ -0,0 +1,226 @@
"""Docker operations wrapper for managing Swarm and Compose containers."""
import socket
from datetime import datetime, timezone
import docker
from docker.errors import NotFound
SWARM_SERVICE_LABEL = "com.docker.swarm.service.name"
class DockerOps:
"""Wraps the Docker SDK to manage containers on the local node.
Handles both standalone Compose containers and Swarm-managed
containers. Swarm containers are detected via the
``com.docker.swarm.service.name`` label and receive special
treatment: start/stop scale the service, restart force-updates it.
"""
def __init__(self) -> None:
self.client = docker.DockerClient.from_env()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _is_swarm(self, container) -> bool:
return SWARM_SERVICE_LABEL in container.labels
def _swarm_service_name(self, container) -> str | None:
return container.labels.get(SWARM_SERVICE_LABEL)
def _get_service(self, service_name):
"""Look up a Swarm service by name."""
services = self.client.services.list()
for svc in services:
if svc.name == service_name:
return svc
return None
@staticmethod
def _format_uptime(started_at: str) -> str:
"""Return a human-readable uptime string from an ISO timestamp."""
try:
# Docker timestamps may have nanosecond precision; truncate to
# microseconds so fromisoformat can parse them.
clean = started_at.replace("Z", "+00:00")
# Truncate fractional seconds to 6 digits if longer
if "." in clean:
parts = clean.split(".")
frac_and_tz = parts[1]
# Separate fractional part from timezone
for i, ch in enumerate(frac_and_tz):
if ch in ("+", "-"):
frac = frac_and_tz[:i][:6]
tz = frac_and_tz[i:]
clean = f"{parts[0]}.{frac}{tz}"
break
start = datetime.fromisoformat(clean)
delta = datetime.now(timezone.utc) - start
total_seconds = int(delta.total_seconds())
if total_seconds < 0:
return "just started"
days, remainder = divmod(total_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, _ = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
return " ".join(parts) if parts else "just started"
except Exception:
return "unknown"
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def list_containers(self) -> list[dict]:
"""List all containers (running + stopped) with metadata.
Returns a list of dicts, each containing:
id, name, status, image, created, uptime, is_swarm,
swarm_service
"""
containers = self.client.containers.list(all=True)
result = []
for c in containers:
image_tags = c.image.tags
image_name = image_tags[0] if image_tags else "<none>"
swarm_service = self._swarm_service_name(c)
started_at = c.attrs.get("State", {}).get("StartedAt", "")
result.append(
{
"id": c.short_id,
"name": c.name,
"status": c.status,
"image": image_name,
"created": c.attrs.get("Created", ""),
"uptime": self._format_uptime(started_at) if c.status == "running" else None,
"is_swarm": self._is_swarm(c),
"swarm_service": swarm_service,
}
)
return result
def start_container(self, container_id: str) -> dict:
"""Start a container. For Swarm services, scale to 1 replica."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.scale(1)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 1 replica",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.start()
return {"success": True, "message": f"Started container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def stop_container(self, container_id: str) -> dict:
"""Stop a container. For Swarm services, scale to 0 replicas."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.scale(0)
return {
"success": True,
"message": f"Scaled Swarm service {svc_name} to 0 replicas",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.stop()
return {"success": True, "message": f"Stopped container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def restart_container(self, container_id: str) -> dict:
"""Restart a container. For Swarm services, force-update."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
try:
if self._is_swarm(container):
svc_name = self._swarm_service_name(container)
service = self._get_service(svc_name)
if service:
service.force_update()
return {
"success": True,
"message": f"Force-updated Swarm service {svc_name}",
}
return {"success": False, "message": f"Swarm service {svc_name} not found"}
container.restart()
return {"success": True, "message": f"Restarted container {container.name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def get_logs(self, container_id: str, tail: int = 100) -> dict:
"""Get recent logs from a container.
Returns:
dict with keys ``container`` (name) and ``logs`` (string).
"""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"container": container_id, "logs": f"Container {container_id} not found"}
raw = container.logs(tail=tail)
return {
"container": container.name,
"logs": raw.decode("utf-8", errors="replace"),
}
def pull_image(self, container_id: str) -> dict:
"""Pull the latest version of the container's image."""
try:
container = self.client.containers.get(container_id)
except NotFound:
return {"success": False, "message": f"Container {container_id} not found"}
tags = container.image.tags
if not tags:
return {"success": False, "message": f"No image tag found for container {container.name}"}
image_name = tags[0]
try:
self.client.images.pull(image_name)
return {"success": True, "message": f"Pulled image {image_name}"}
except Exception as exc:
return {"success": False, "message": str(exc)}
def get_health(self) -> dict:
"""Return node health info: hostname and container count."""
containers = self.client.containers.list(all=True)
return {
"status": "healthy",
"hostname": socket.gethostname(),
"containers_total": len(containers),
}
+389
View File
@@ -0,0 +1,389 @@
"""Tests for agent.docker_ops.DockerOps — Docker operations wrapper.
Uses unittest.mock to mock the Docker SDK client so no real Docker
daemon is required.
"""
import datetime
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
from docker.errors import NotFound
# ---------------------------------------------------------------------------
# Helpers to build mock container objects
# ---------------------------------------------------------------------------
def _make_container(
id="abc123def456",
name="my-app",
status="running",
image_tag="nginx:latest",
created=None,
labels=None,
):
"""Return a MagicMock that behaves like a docker Container object."""
c = MagicMock()
c.id = id
c.short_id = id[:12]
c.name = name
c.status = status
img = MagicMock()
img.tags = [image_tag]
c.image = img
c.attrs = {
"Created": created or "2026-03-04T12:00:00.000000000Z",
"State": {
"StartedAt": "2026-03-04T12:00:00.000000000Z",
},
}
c.labels = labels or {}
return c
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestListContainers:
"""DockerOps.list_containers returns structured container info."""
@patch("docker.DockerClient.from_env")
def test_list_containers_returns_expected_fields(self, mock_from_env):
"""Each dict has id, name, status, image, created, uptime,
is_swarm, swarm_service."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container()
mock_client.containers.list.return_value = [container]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.list_containers()
assert len(result) == 1
c = result[0]
assert c["id"] == container.short_id
assert c["name"] == "my-app"
assert c["status"] == "running"
assert c["image"] == "nginx:latest"
assert "created" in c
assert "uptime" in c
assert c["is_swarm"] is False
assert c["swarm_service"] is None
@patch("docker.DockerClient.from_env")
def test_list_containers_detects_swarm(self, mock_from_env):
"""Containers with com.docker.swarm.service.name label are
marked is_swarm=True with the service name populated."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(
name="mystack_web.1.xyz",
labels={"com.docker.swarm.service.name": "mystack_web"},
)
mock_client.containers.list.return_value = [container]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.list_containers()
assert result[0]["is_swarm"] is True
assert result[0]["swarm_service"] == "mystack_web"
@patch("docker.DockerClient.from_env")
def test_list_containers_includes_stopped(self, mock_from_env):
"""Exited containers should still appear with status 'exited'."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(status="exited")
mock_client.containers.list.return_value = [container]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.list_containers()
assert result[0]["status"] == "exited"
# Verify list was called with all=True to include stopped containers
mock_client.containers.list.assert_called_once_with(all=True)
class TestStartContainer:
@patch("docker.DockerClient.from_env")
def test_start_container_success(self, mock_from_env):
"""start_container calls container.start() and returns success."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(status="exited")
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.start_container("abc123def456")
container.start.assert_called_once()
assert result["success"] is True
assert "message" in result
@patch("docker.DockerClient.from_env")
def test_start_nonexistent_container(self, mock_from_env):
"""start_container on missing container returns success=False."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
mock_client.containers.get.side_effect = NotFound("not found")
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.start_container("nonexistent")
assert result["success"] is False
assert "message" in result
class TestStopContainer:
@patch("docker.DockerClient.from_env")
def test_stop_container_success(self, mock_from_env):
"""stop_container calls container.stop() and returns success."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container()
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.stop_container("abc123def456")
container.stop.assert_called_once()
assert result["success"] is True
assert "message" in result
@patch("docker.DockerClient.from_env")
def test_stop_nonexistent_container(self, mock_from_env):
"""stop_container on missing container returns success=False."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
mock_client.containers.get.side_effect = NotFound("not found")
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.stop_container("nonexistent")
assert result["success"] is False
class TestRestartContainer:
@patch("docker.DockerClient.from_env")
def test_restart_container_success(self, mock_from_env):
"""restart_container calls container.restart() and returns success."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container()
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.restart_container("abc123def456")
container.restart.assert_called_once()
assert result["success"] is True
assert "message" in result
class TestGetLogs:
@patch("docker.DockerClient.from_env")
def test_get_logs_returns_container_and_logs(self, mock_from_env):
"""get_logs returns {container: name, logs: string}."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container()
container.logs.return_value = b"line1\nline2\nline3"
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.get_logs("abc123def456")
assert result["container"] == "my-app"
assert "line1" in result["logs"]
container.logs.assert_called_once_with(tail=100)
@patch("docker.DockerClient.from_env")
def test_get_logs_custom_tail(self, mock_from_env):
"""get_logs respects the tail parameter."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container()
container.logs.return_value = b"log data"
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
ops.get_logs("abc123def456", tail=50)
container.logs.assert_called_once_with(tail=50)
class TestPullImage:
@patch("docker.DockerClient.from_env")
def test_pull_image_success(self, mock_from_env):
"""pull_image pulls the container's current image tag."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(image_tag="nginx:1.25")
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.pull_image("abc123def456")
mock_client.images.pull.assert_called_once_with("nginx:1.25")
assert result["success"] is True
assert "message" in result
@patch("docker.DockerClient.from_env")
def test_pull_image_no_tags(self, mock_from_env):
"""pull_image handles containers with no image tags gracefully."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container()
container.image.tags = []
mock_client.containers.get.return_value = container
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.pull_image("abc123def456")
assert result["success"] is False
assert "message" in result
class TestSwarmServiceOperations:
@patch("docker.DockerClient.from_env")
def test_stop_swarm_service_scales_to_zero(self, mock_from_env):
"""Stopping a Swarm container scales the service to 0 replicas."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(
labels={"com.docker.swarm.service.name": "mystack_web"},
)
mock_client.containers.get.return_value = container
mock_service = MagicMock()
mock_service.name = "mystack_web"
mock_client.services.list.return_value = [mock_service]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.stop_container("abc123def456")
# Should scale service to 0 instead of calling container.stop()
mock_service.scale.assert_called_once_with(0)
container.stop.assert_not_called()
assert result["success"] is True
@patch("docker.DockerClient.from_env")
def test_start_swarm_service_scales_to_one(self, mock_from_env):
"""Starting a Swarm container scales the service to 1 replica."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(
status="exited",
labels={"com.docker.swarm.service.name": "mystack_web"},
)
mock_client.containers.get.return_value = container
mock_service = MagicMock()
mock_service.name = "mystack_web"
mock_client.services.list.return_value = [mock_service]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.start_container("abc123def456")
# Should scale service to 1 instead of calling container.start()
mock_service.scale.assert_called_once_with(1)
container.start.assert_not_called()
assert result["success"] is True
@patch("docker.DockerClient.from_env")
def test_restart_swarm_service_force_updates(self, mock_from_env):
"""Restarting a Swarm container force-updates the service."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
container = _make_container(
labels={"com.docker.swarm.service.name": "mystack_web"},
)
mock_client.containers.get.return_value = container
mock_service = MagicMock()
mock_service.name = "mystack_web"
mock_client.services.list.return_value = [mock_service]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.restart_container("abc123def456")
mock_service.force_update.assert_called_once()
container.restart.assert_not_called()
assert result["success"] is True
class TestGetHealth:
@patch("docker.DockerClient.from_env")
@patch("socket.gethostname", return_value="hf-pdocker-01")
def test_get_health(self, mock_hostname, mock_from_env):
"""get_health returns hostname and container count."""
mock_client = MagicMock()
mock_from_env.return_value = mock_client
mock_client.containers.list.return_value = [
_make_container(),
_make_container(id="def456", name="another"),
]
from agent.docker_ops import DockerOps
ops = DockerOps()
result = ops.get_health()
assert result["status"] == "healthy"
assert result["hostname"] == "hf-pdocker-01"
assert result["containers_total"] == 2