f13d6151e3
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
108 lines
3.8 KiB
Python
108 lines
3.8 KiB
Python
"""Service proxy endpoints for the Farm Manager API server.
|
|
|
|
Each endpoint proxies to the appropriate node agent based on the
|
|
``{node}`` path parameter.
|
|
"""
|
|
|
|
import asyncio
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
|
|
from server.config import get_nodes, get_node_url
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def _require_node_url(node: str) -> str:
|
|
"""Return the agent URL for *node* or raise 404."""
|
|
url = get_node_url(node)
|
|
if url is None:
|
|
raise HTTPException(status_code=404, detail=f"Unknown node: {node}")
|
|
return url
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/services — aggregate containers from all nodes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _fetch_containers(http_client: httpx.AsyncClient, node: dict) -> list[dict]:
|
|
"""Fetch containers from a single node agent, adding the node field."""
|
|
url = f"http://{node['host']}:{node['agent_port']}/containers"
|
|
try:
|
|
resp = await http_client.get(url, timeout=10.0)
|
|
resp.raise_for_status()
|
|
containers = resp.json()
|
|
for c in containers:
|
|
c["node"] = node["name"]
|
|
return containers
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
@router.get("/api/services")
|
|
async def list_services():
|
|
"""Return containers from all nodes with a ``node`` field added."""
|
|
nodes = get_nodes()
|
|
async with httpx.AsyncClient() as client:
|
|
tasks = [_fetch_containers(client, node) for node in nodes]
|
|
results = await asyncio.gather(*tasks)
|
|
# Flatten list of lists
|
|
all_containers = []
|
|
for node_containers in results:
|
|
all_containers.extend(node_containers)
|
|
return all_containers
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Container actions — proxy to the correct agent
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/api/services/{node}/{container_id}/start")
|
|
async def start_service(node: str, container_id: str):
|
|
"""Proxy start request to the agent on *node*."""
|
|
base = _require_node_url(node)
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(f"{base}/containers/{container_id}/start", timeout=30.0)
|
|
return resp.json()
|
|
|
|
|
|
@router.post("/api/services/{node}/{container_id}/stop")
|
|
async def stop_service(node: str, container_id: str):
|
|
"""Proxy stop request to the agent on *node*."""
|
|
base = _require_node_url(node)
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(f"{base}/containers/{container_id}/stop", timeout=30.0)
|
|
return resp.json()
|
|
|
|
|
|
@router.post("/api/services/{node}/{container_id}/restart")
|
|
async def restart_service(node: str, container_id: str):
|
|
"""Proxy restart request to the agent on *node*."""
|
|
base = _require_node_url(node)
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(f"{base}/containers/{container_id}/restart", timeout=30.0)
|
|
return resp.json()
|
|
|
|
|
|
@router.get("/api/services/{node}/{container_id}/logs")
|
|
async def get_service_logs(node: str, container_id: str, tail: int = Query(default=200)):
|
|
"""Proxy logs request to the agent on *node*."""
|
|
base = _require_node_url(node)
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.get(
|
|
f"{base}/containers/{container_id}/logs",
|
|
params={"tail": tail},
|
|
timeout=10.0,
|
|
)
|
|
return resp.json()
|
|
|
|
|
|
@router.post("/api/services/{node}/{container_id}/pull")
|
|
async def pull_service(node: str, container_id: str):
|
|
"""Proxy pull request to the agent on *node*."""
|
|
base = _require_node_url(node)
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(f"{base}/containers/{container_id}/pull", timeout=60.0)
|
|
return resp.json()
|