feat(server): Group CRUD with JSON persistence and bulk actions (TDD)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 22:26:20 -06:00
parent f13d6151e3
commit 6e9465942a
4 changed files with 322 additions and 0 deletions
+101
View File
@@ -0,0 +1,101 @@
"""Singleton store for service groups with JSON file persistence.
Groups are persisted to a JSON file specified by the ``GROUPS_PATH``
environment variable (default ``/app/groups.json``).
"""
import json
import os
import threading
from server.models import Group, GroupsData
class GroupStore:
"""Thread-safe singleton that manages groups and persists to disk."""
_instance: "GroupStore | None" = None
_lock = threading.Lock()
def __new__(cls) -> "GroupStore":
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self) -> None:
if self._initialized:
return
self._initialized = True
self._data_lock = threading.Lock()
self._path = os.environ.get("GROUPS_PATH", "/app/groups.json")
self._groups: list[Group] = []
self._load()
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _load(self) -> None:
"""Load groups from the JSON file on disk."""
try:
with open(self._path) as f:
raw = json.load(f)
data = GroupsData(**raw)
self._groups = data.groups
except (FileNotFoundError, json.JSONDecodeError):
self._groups = []
def _save(self) -> None:
"""Persist the current groups list to the JSON file."""
data = GroupsData(groups=self._groups)
with open(self._path, "w") as f:
json.dump(data.model_dump(), f, indent=2)
# ------------------------------------------------------------------
# CRUD operations
# ------------------------------------------------------------------
def list_groups(self) -> list[Group]:
"""Return all groups."""
with self._data_lock:
return list(self._groups)
def get_group(self, group_id: str) -> Group | None:
"""Return a single group by id, or None."""
with self._data_lock:
for g in self._groups:
if g.id == group_id:
return g
return None
def create_group(self, group: Group) -> Group:
"""Add a new group. Raises ValueError if duplicate id."""
with self._data_lock:
for g in self._groups:
if g.id == group.id:
raise ValueError(f"Group '{group.id}' already exists")
self._groups.append(group)
self._save()
return group
def update_group(self, group_id: str, group: Group) -> Group | None:
"""Replace a group by id. Returns updated group or None."""
with self._data_lock:
for i, g in enumerate(self._groups):
if g.id == group_id:
self._groups[i] = group
self._save()
return group
return None
def delete_group(self, group_id: str) -> bool:
"""Remove a group by id. Returns True if found and deleted."""
with self._data_lock:
for i, g in enumerate(self._groups):
if g.id == group_id:
self._groups.pop(i)
self._save()
return True
return False
+2
View File
@@ -5,10 +5,12 @@ import os
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from server.routes.groups import router as groups_router
from server.routes.nodes import router as nodes_router
from server.routes.services import router as services_router
app = FastAPI(title="Farm Manager")
app.include_router(groups_router)
app.include_router(nodes_router)
app.include_router(services_router)
+109
View File
@@ -0,0 +1,109 @@
"""Group CRUD and bulk action endpoints for the Farm Manager API server."""
import asyncio
import httpx
from fastapi import APIRouter, HTTPException
from server.config import get_node_url
from server.group_store import GroupStore
from server.models import Group
router = APIRouter()
def _get_store() -> GroupStore:
return GroupStore()
def _require_group(group_id: str) -> Group:
"""Return the group or raise 404."""
group = _get_store().get_group(group_id)
if group is None:
raise HTTPException(status_code=404, detail=f"Group not found: {group_id}")
return group
# ---------------------------------------------------------------------------
# CRUD
# ---------------------------------------------------------------------------
@router.get("/api/groups")
def list_groups():
"""Return all groups."""
return [g.model_dump() for g in _get_store().list_groups()]
@router.post("/api/groups")
def create_group(group: Group):
"""Create a new group. Returns 409 if duplicate id."""
try:
created = _get_store().create_group(group)
return created.model_dump()
except ValueError:
raise HTTPException(status_code=409, detail=f"Group '{group.id}' already exists")
@router.put("/api/groups/{group_id}")
def update_group(group_id: str, group: Group):
"""Update an existing group by id. Returns 404 if not found."""
updated = _get_store().update_group(group_id, group)
if updated is None:
raise HTTPException(status_code=404, detail=f"Group not found: {group_id}")
return updated.model_dump()
@router.delete("/api/groups/{group_id}")
def delete_group(group_id: str):
"""Delete a group by id. Returns 404 if not found."""
if not _get_store().delete_group(group_id):
raise HTTPException(status_code=404, detail=f"Group not found: {group_id}")
return {"success": True}
# ---------------------------------------------------------------------------
# Bulk actions — start/stop/restart all services in a group
# ---------------------------------------------------------------------------
async def _send_action(
http_client: httpx.AsyncClient, node: str, container: str, action: str
) -> dict:
"""Send a start/stop/restart action to a single container via its node agent."""
url = get_node_url(node)
if url is None:
return {"node": node, "container": container, "success": False, "error": f"Unknown node: {node}"}
try:
resp = await http_client.post(f"{url}/containers/{container}/{action}", timeout=30.0)
return {"node": node, "container": container, **resp.json()}
except Exception as exc:
return {"node": node, "container": container, "success": False, "error": str(exc)}
async def _bulk_action(group_id: str, action: str) -> list[dict]:
"""Execute *action* on every service in the group."""
group = _require_group(group_id)
async with httpx.AsyncClient() as client:
tasks = [
_send_action(client, svc.node, svc.container, action)
for svc in group.services
]
results = await asyncio.gather(*tasks)
return list(results)
@router.post("/api/groups/{group_id}/start")
async def start_group(group_id: str):
"""Start all services in a group."""
return await _bulk_action(group_id, "start")
@router.post("/api/groups/{group_id}/stop")
async def stop_group(group_id: str):
"""Stop all services in a group."""
return await _bulk_action(group_id, "stop")
@router.post("/api/groups/{group_id}/restart")
async def restart_group(group_id: str):
"""Restart all services in a group."""
return await _bulk_action(group_id, "restart")