classifier/frontend/api_client.py

65 lines
No EOL
2 KiB
Python

# api_client.py - httpx async
import httpx
from config import BACKEND_INTERNAL_URL
TIMEOUT_HEALTH = 5
TIMEOUT_UPLOAD = 3600
TIMEOUT_DEFAULT = 15
async def check_health() -> dict:
async with httpx.AsyncClient() as client:
r = await client.get(f"{BACKEND_INTERNAL_URL}/health", timeout=TIMEOUT_HEALTH)
r.raise_for_status()
return r.json()
async def upload_file(filename: str, data: bytes) -> dict:
async with httpx.AsyncClient() as client:
r = await client.post(
f"{BACKEND_INTERNAL_URL}/upload",
files={"file": (filename, data)},
timeout=TIMEOUT_UPLOAD,
)
if r.status_code != 200:
raise RuntimeError(f"Erreur backend ({r.status_code}) : {r.text}")
return r.json()
async def get_debug(pc_id: str) -> dict:
async with httpx.AsyncClient() as client:
r = await client.get(
f"{BACKEND_INTERNAL_URL}/debug/{pc_id}", timeout=TIMEOUT_DEFAULT
)
r.raise_for_status()
return r.json()
async def list_pointclouds() -> list[dict]:
async with httpx.AsyncClient() as client:
r = await client.get(f"{BACKEND_INTERNAL_URL}/list", timeout=TIMEOUT_DEFAULT)
r.raise_for_status()
data = r.json()
return data.get("pointclouds", []) if data else []
async def delete_pointcloud(pc_id: str) -> dict:
async with httpx.AsyncClient() as client:
r = await client.delete(
f"{BACKEND_INTERNAL_URL}/delete/{pc_id}", timeout=TIMEOUT_DEFAULT
)
r.raise_for_status()
return r.json()
async def crop_pointcloud(pc_id: str, payload: dict) -> dict:
async with httpx.AsyncClient() as client:
r = await client.post(
f"{BACKEND_INTERNAL_URL}/crop/{pc_id}",
json=payload,
timeout=TIMEOUT_UPLOAD,
)
if r.status_code != 200:
raise RuntimeError(f"Erreur crop ({r.status_code}) : {r.text}")
return r.json()