feat(server): API proxy routes for services and nodes (TDD)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 22:24:20 -06:00
parent 9af81e3001
commit f13d6151e3
5 changed files with 558 additions and 0 deletions
+33
View File
@@ -0,0 +1,33 @@
"""Configuration loader for the Farm Manager server.
Reads node definitions from a JSON config file. The path defaults to
``/app/config.json`` but can be overridden via the ``CONFIG_PATH``
environment variable.
"""
import json
import os
_config = None
def _load_config():
global _config
if _config is None:
config_path = os.environ.get("CONFIG_PATH", "/app/config.json")
with open(config_path) as f:
_config = json.load(f)
return _config
def get_nodes() -> list[dict]:
"""Return the list of node definitions from the config file."""
return _load_config()["nodes"]
def get_node_url(node_name: str) -> str | None:
"""Return the agent base URL for *node_name*, or ``None`` if unknown."""
for node in get_nodes():
if node["name"] == node_name:
return f"http://{node['host']}:{node['agent_port']}"
return None
+27
View File
@@ -0,0 +1,27 @@
"""Farm Manager API server — main FastAPI application."""
import os
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from server.routes.nodes import router as nodes_router
from server.routes.services import router as services_router
app = FastAPI(title="Farm Manager")
app.include_router(nodes_router)
app.include_router(services_router)
@app.get("/health")
def health():
return {"status": "ok"}
static_dir = os.path.join(os.path.dirname(__file__), "static")
if os.path.isdir(static_dir):
app.mount("/", StaticFiles(directory=static_dir, html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8888)
+44
View File
@@ -0,0 +1,44 @@
"""Node health endpoints for the Farm Manager API server."""
import asyncio
import httpx
from fastapi import APIRouter
from server.config import get_nodes
router = APIRouter()
async def _check_node_health(http_client: httpx.AsyncClient, node: dict) -> dict:
"""Query a single agent's /health endpoint and return a NodeStatus dict."""
url = f"http://{node['host']}:{node['agent_port']}/health"
try:
resp = await http_client.get(url, timeout=5.0)
resp.raise_for_status()
data = resp.json()
return {
"name": node["name"],
"host": node["host"],
"healthy": True,
"containers_total": data.get("containers_total", 0),
"error": None,
}
except Exception as exc:
return {
"name": node["name"],
"host": node["host"],
"healthy": False,
"containers_total": 0,
"error": str(exc),
}
@router.get("/api/nodes")
async def list_nodes():
"""Return all configured nodes with their current health status."""
nodes = get_nodes()
async with httpx.AsyncClient() as client:
tasks = [_check_node_health(client, node) for node in nodes]
results = await asyncio.gather(*tasks)
return list(results)
+107
View File
@@ -0,0 +1,107 @@
"""Service proxy endpoints for the Farm Manager API server.
Each endpoint proxies to the appropriate node agent based on the
``{node}`` path parameter.
"""
import asyncio
import httpx
from fastapi import APIRouter, HTTPException, Query
from server.config import get_nodes, get_node_url
router = APIRouter()
def _require_node_url(node: str) -> str:
"""Return the agent URL for *node* or raise 404."""
url = get_node_url(node)
if url is None:
raise HTTPException(status_code=404, detail=f"Unknown node: {node}")
return url
# ---------------------------------------------------------------------------
# GET /api/services — aggregate containers from all nodes
# ---------------------------------------------------------------------------
async def _fetch_containers(http_client: httpx.AsyncClient, node: dict) -> list[dict]:
"""Fetch containers from a single node agent, adding the node field."""
url = f"http://{node['host']}:{node['agent_port']}/containers"
try:
resp = await http_client.get(url, timeout=10.0)
resp.raise_for_status()
containers = resp.json()
for c in containers:
c["node"] = node["name"]
return containers
except Exception:
return []
@router.get("/api/services")
async def list_services():
"""Return containers from all nodes with a ``node`` field added."""
nodes = get_nodes()
async with httpx.AsyncClient() as client:
tasks = [_fetch_containers(client, node) for node in nodes]
results = await asyncio.gather(*tasks)
# Flatten list of lists
all_containers = []
for node_containers in results:
all_containers.extend(node_containers)
return all_containers
# ---------------------------------------------------------------------------
# Container actions — proxy to the correct agent
# ---------------------------------------------------------------------------
@router.post("/api/services/{node}/{container_id}/start")
async def start_service(node: str, container_id: str):
"""Proxy start request to the agent on *node*."""
base = _require_node_url(node)
async with httpx.AsyncClient() as client:
resp = await client.post(f"{base}/containers/{container_id}/start", timeout=30.0)
return resp.json()
@router.post("/api/services/{node}/{container_id}/stop")
async def stop_service(node: str, container_id: str):
"""Proxy stop request to the agent on *node*."""
base = _require_node_url(node)
async with httpx.AsyncClient() as client:
resp = await client.post(f"{base}/containers/{container_id}/stop", timeout=30.0)
return resp.json()
@router.post("/api/services/{node}/{container_id}/restart")
async def restart_service(node: str, container_id: str):
"""Proxy restart request to the agent on *node*."""
base = _require_node_url(node)
async with httpx.AsyncClient() as client:
resp = await client.post(f"{base}/containers/{container_id}/restart", timeout=30.0)
return resp.json()
@router.get("/api/services/{node}/{container_id}/logs")
async def get_service_logs(node: str, container_id: str, tail: int = Query(default=200)):
"""Proxy logs request to the agent on *node*."""
base = _require_node_url(node)
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{base}/containers/{container_id}/logs",
params={"tail": tail},
timeout=10.0,
)
return resp.json()
@router.post("/api/services/{node}/{container_id}/pull")
async def pull_service(node: str, container_id: str):
"""Proxy pull request to the agent on *node*."""
base = _require_node_url(node)
async with httpx.AsyncClient() as client:
resp = await client.post(f"{base}/containers/{container_id}/pull", timeout=60.0)
return resp.json()
+347
View File
@@ -0,0 +1,347 @@
"""Tests for server API proxy routes — nodes and services.
Mocks httpx.AsyncClient to avoid real network calls to agents.
"""
import os
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
os.environ.setdefault("CONFIG_PATH", os.path.join(
os.path.dirname(__file__), "..", "config.json"
))
from fastapi.testclient import TestClient
@pytest.fixture(autouse=True)
def _reset_config():
"""Reset the config module cache between tests."""
import server.config as cfg
cfg._config = None
yield
cfg._config = None
@pytest.fixture
def client():
from server.main import app
return TestClient(app)
# ---------------------------------------------------------------------------
# Mock helpers
# ---------------------------------------------------------------------------
def _mock_response(json_data, status_code=200):
"""Build a MagicMock httpx.Response with sync .json() method."""
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = json_data
resp.raise_for_status = MagicMock()
return resp
def _make_mock_client(**side_effects):
"""Create a mock httpx.AsyncClient context manager.
*side_effects* maps method names ('get', 'post') to lists of
return values or Exception instances for side_effect.
"""
mock_instance = AsyncMock()
for method, values in side_effects.items():
getattr(mock_instance, method).side_effect = values
return mock_instance
def _patch_httpx(module_path, mock_instance):
"""Return a context-manager patch for httpx.AsyncClient in *module_path*."""
patcher = patch(f"{module_path}.httpx.AsyncClient")
MockClient = patcher.start()
MockClient.return_value.__aenter__ = AsyncMock(return_value=mock_instance)
MockClient.return_value.__aexit__ = AsyncMock(return_value=False)
return patcher
# ---------------------------------------------------------------------------
# GET /api/nodes
# ---------------------------------------------------------------------------
class TestGetNodes:
def test_returns_all_three_nodes(self, client):
"""GET /api/nodes returns a list of 3 nodes with health status."""
mock = _make_mock_client(get=[
_mock_response({"status": "healthy", "hostname": "hf-pdocker-01", "containers_total": 10}),
_mock_response({"status": "healthy", "hostname": "hf-pdocker-02", "containers_total": 5}),
_mock_response({"status": "healthy", "hostname": "bart", "containers_total": 8}),
])
p = _patch_httpx("server.routes.nodes", mock)
try:
resp = client.get("/api/nodes")
finally:
p.stop()
assert resp.status_code == 200
data = resp.json()
assert len(data) == 3
names = [n["name"] for n in data]
assert "hf-pdocker-01" in names
assert "hf-pdocker-02" in names
assert "bart" in names
def test_node_contains_health_fields(self, client):
"""Each node has name, host, healthy, and containers_total."""
mock = _make_mock_client(get=[
_mock_response({"status": "healthy", "hostname": "hf-pdocker-01", "containers_total": 10}),
_mock_response({"status": "healthy", "hostname": "hf-pdocker-02", "containers_total": 5}),
_mock_response({"status": "healthy", "hostname": "bart", "containers_total": 8}),
])
p = _patch_httpx("server.routes.nodes", mock)
try:
resp = client.get("/api/nodes")
finally:
p.stop()
data = resp.json()
node = data[0]
assert "name" in node
assert "host" in node
assert "healthy" in node
assert "containers_total" in node
def test_unhealthy_node_on_agent_error(self, client):
"""If an agent is unreachable, the node shows healthy=False."""
mock = _make_mock_client(get=[
_mock_response({"status": "healthy", "hostname": "hf-pdocker-01", "containers_total": 10}),
Exception("Connection refused"),
_mock_response({"status": "healthy", "hostname": "bart", "containers_total": 8}),
])
p = _patch_httpx("server.routes.nodes", mock)
try:
resp = client.get("/api/nodes")
finally:
p.stop()
data = resp.json()
assert resp.status_code == 200
unhealthy = [n for n in data if n["name"] == "hf-pdocker-02"]
assert len(unhealthy) == 1
assert unhealthy[0]["healthy"] is False
assert unhealthy[0]["error"] is not None
# ---------------------------------------------------------------------------
# GET /api/services
# ---------------------------------------------------------------------------
class TestGetServices:
def test_returns_containers_from_all_nodes(self, client):
"""GET /api/services aggregates containers from all nodes, adding node field."""
containers_node1 = [
{"id": "abc1", "name": "app1", "status": "running",
"image": "nginx:latest", "created": "2026-03-05T00:00:00Z",
"uptime": "1h", "is_swarm": False, "swarm_service": None},
]
containers_node2 = [
{"id": "abc2", "name": "app2", "status": "running",
"image": "redis:latest", "created": "2026-03-05T00:00:00Z",
"uptime": "2h", "is_swarm": False, "swarm_service": None},
]
containers_node3 = [
{"id": "abc3", "name": "app3", "status": "exited",
"image": "postgres:15", "created": "2026-03-05T00:00:00Z",
"uptime": None, "is_swarm": False, "swarm_service": None},
]
mock = _make_mock_client(get=[
_mock_response(containers_node1),
_mock_response(containers_node2),
_mock_response(containers_node3),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.get("/api/services")
finally:
p.stop()
assert resp.status_code == 200
data = resp.json()
assert len(data) == 3
nodes = [c["node"] for c in data]
assert "hf-pdocker-01" in nodes
assert "hf-pdocker-02" in nodes
assert "bart" in nodes
def test_skips_unreachable_node(self, client):
"""If a node agent is down, its containers are omitted."""
containers_node1 = [
{"id": "abc1", "name": "app1", "status": "running",
"image": "nginx:latest", "created": "2026-03-05T00:00:00Z",
"uptime": "1h", "is_swarm": False, "swarm_service": None},
]
mock = _make_mock_client(get=[
_mock_response(containers_node1),
Exception("Connection refused"),
_mock_response([]),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.get("/api/services")
finally:
p.stop()
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
# ---------------------------------------------------------------------------
# POST /api/services/{node}/{container_id}/start
# ---------------------------------------------------------------------------
class TestStartService:
def test_start_proxies_to_agent(self, client):
"""POST start sends request to correct agent and returns result."""
mock = _make_mock_client(post=[
_mock_response({"success": True, "message": "Started container app1"}),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.post("/api/services/hf-pdocker-01/abc123/start")
finally:
p.stop()
assert resp.status_code == 200
assert resp.json()["success"] is True
def test_start_invalid_node_returns_404(self, client):
"""POST start with unknown node returns 404."""
resp = client.post("/api/services/nonexistent-node/abc123/start")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/services/{node}/{container_id}/stop
# ---------------------------------------------------------------------------
class TestStopService:
def test_stop_proxies_to_agent(self, client):
"""POST stop sends request to correct agent."""
mock = _make_mock_client(post=[
_mock_response({"success": True, "message": "Stopped container app1"}),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.post("/api/services/bart/abc123/stop")
finally:
p.stop()
assert resp.status_code == 200
assert resp.json()["success"] is True
def test_stop_invalid_node_returns_404(self, client):
resp = client.post("/api/services/nonexistent-node/abc123/stop")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/services/{node}/{container_id}/restart
# ---------------------------------------------------------------------------
class TestRestartService:
def test_restart_proxies_to_agent(self, client):
mock = _make_mock_client(post=[
_mock_response({"success": True, "message": "Restarted container app1"}),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.post("/api/services/hf-pdocker-02/abc123/restart")
finally:
p.stop()
assert resp.status_code == 200
assert resp.json()["success"] is True
def test_restart_invalid_node_returns_404(self, client):
resp = client.post("/api/services/nonexistent-node/abc123/restart")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# GET /api/services/{node}/{container_id}/logs
# ---------------------------------------------------------------------------
class TestGetServiceLogs:
def test_logs_proxies_to_agent(self, client):
mock = _make_mock_client(get=[
_mock_response({"container": "test-app", "logs": "log line 1\nlog line 2"}),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.get("/api/services/hf-pdocker-01/abc123/logs")
finally:
p.stop()
assert resp.status_code == 200
assert "log line 1" in resp.json()["logs"]
def test_logs_passes_tail_param(self, client):
mock = _make_mock_client(get=[
_mock_response({"container": "test-app", "logs": "log data"}),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.get("/api/services/hf-pdocker-01/abc123/logs?tail=50")
finally:
p.stop()
assert resp.status_code == 200
# Verify tail param was passed through to the agent
call_args = mock.get.call_args
assert "tail" in str(call_args)
def test_logs_invalid_node_returns_404(self, client):
resp = client.get("/api/services/nonexistent-node/abc123/logs")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/services/{node}/{container_id}/pull
# ---------------------------------------------------------------------------
class TestPullService:
def test_pull_proxies_to_agent(self, client):
mock = _make_mock_client(post=[
_mock_response({"success": True, "message": "Pulled nginx:latest"}),
])
p = _patch_httpx("server.routes.services", mock)
try:
resp = client.post("/api/services/bart/abc123/pull")
finally:
p.stop()
assert resp.status_code == 200
assert resp.json()["success"] is True
def test_pull_invalid_node_returns_404(self, client):
resp = client.post("/api/services/nonexistent-node/abc123/pull")
assert resp.status_code == 404