""" MSC Tracker API + UI — front end publico para gestionar containers y extraer data del worker. Auth: usuarios en SQLite con passwords bcrypt. El admin puede crear/borrar otros users desde el panel. Storage: SQLite (archivo users.db) para users y sessions; JSON para state de containers. Auth methods (todas equivalentes): - Login web (form) -> cookie de sesion - Header 'X-Api-Key: ' en cada request (para API scripts) - HTTP Basic Auth (user:password) - Magic login: GET /api/magic-login?user=&key= """ import asyncio import base64 import json import logging import os import secrets import sqlite3 import subprocess import time from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional import bcrypt import requests from fastapi import ( Cookie, Depends, FastAPI, HTTPException, Request, Response, status, ) from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, RedirectResponse, Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field # --- Paths --- API_DIR = Path(__file__).parent STATE_FILE = API_DIR / "state.json" CONFIG_FILE = API_DIR / "config.json" STATIC_DIR = API_DIR / "static" LOG_FILE = API_DIR / "api.log" DB_FILE = API_DIR / "users.db" # --- Config defaults --- DEFAULT_PORT = 9091 DEFAULT_WORKER_URL = "http://127.0.0.1:9090" SESSION_COOKIE = "msc_session" SESSION_MAX_AGE = 60 * 60 * 24 * 7 # 7 dias SCHEDULER_TICK_SECONDS = 30 POLL_TIMEOUT_SECONDS = 60 # --- Logging --- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()], ) log = logging.getLogger("api") # --- Config / State --- def load_config() -> Dict[str, Any]: if not CONFIG_FILE.exists(): return { "bootstrap_password": None, "port": DEFAULT_PORT, "worker_url": DEFAULT_WORKER_URL, "magic_key": None, } return json.loads(CONFIG_FILE.read_text()) def load_state() -> Dict[str, Any]: if not STATE_FILE.exists(): return {"containers": []} return json.loads(STATE_FILE.read_text()) def save_state(state: Dict[str, Any]) -> None: tmp = STATE_FILE.with_suffix(".tmp") tmp.write_text(json.dumps(state, indent=2, default=str)) tmp.replace(STATE_FILE) CONFIG = load_config() BOOTSTRAP_PASSWORD = CONFIG.get("bootstrap_password") or os.environ.get("MSC_BOOTSTRAP_PASSWORD") PORT = int(CONFIG.get("port", DEFAULT_PORT)) WORKER_URL = CONFIG.get("worker_url", DEFAULT_WORKER_URL) MAGIC_KEY = CONFIG.get("magic_key") SALT_ROUNDS = 12 # bcrypt cost # --- DB --- def get_db() -> sqlite3.Connection: conn = sqlite3.connect(DB_FILE, isolation_level=None) # autocommit conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db() -> None: """Crea las tablas y el admin bootstrap si está vacía.""" conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', created_at TEXT NOT NULL, last_login TEXT ); CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, username TEXT NOT NULL, created_at REAL NOT NULL, expires_at REAL NOT NULL, FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at); """) # Bootstrap admin si la tabla está vacía cur = conn.execute("SELECT COUNT(*) AS n FROM users") if cur.fetchone()["n"] == 0: bootstrap_pw = BOOTSTRAP_PASSWORD or "admin" pw_hash = bcrypt.hashpw(bootstrap_pw.encode(), bcrypt.gensalt(SALT_ROUNDS)).decode() conn.execute( "INSERT INTO users (username, password_hash, role, created_at) VALUES (?, ?, 'admin', ?)", ("admin", pw_hash, datetime.now(timezone.utc).isoformat()), ) log.info(f"bootstrap admin user created (password: {'from config' if BOOTSTRAP_PASSWORD else 'default ' + bootstrap_pw})") conn.close() def cleanup_sessions() -> None: conn = get_db() conn.execute("DELETE FROM sessions WHERE expires_at < ?", (time.time(),)) conn.close() # Init DB on import init_db() # --- Auth helpers --- def make_session_token() -> str: return secrets.token_urlsafe(32) def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt(SALT_ROUNDS)).decode() def verify_password(username: str, password: str) -> Optional[str]: """Devuelve el role si el user+pass son válidos, None si no.""" conn = get_db() row = conn.execute( "SELECT password_hash, role FROM users WHERE username = ?", (username,) ).fetchone() if not row: conn.close() return None if not bcrypt.checkpw(password.encode(), row["password_hash"].encode()): conn.close() return None conn.execute( "UPDATE users SET last_login = ? WHERE username = ?", (datetime.now(timezone.utc).isoformat(), username), ) conn.close() return row["role"] def create_session(username: str) -> str: token = make_session_token() now = time.time() conn = get_db() conn.execute( "INSERT INTO sessions (token, username, created_at, expires_at) VALUES (?, ?, ?, ?)", (token, username, now, now + SESSION_MAX_AGE), ) conn.close() return token def get_session_user(token: str) -> Optional[Dict[str, str]]: if not token: return None conn = get_db() row = conn.execute( "SELECT u.username, u.role, s.expires_at FROM sessions s JOIN users u ON s.username = u.username WHERE s.token = ?", (token,), ).fetchone() conn.close() if not row: return None if row["expires_at"] < time.time(): return None return {"username": row["username"], "role": row["role"]} def delete_session(token: str) -> None: if not token: return conn = get_db() conn.execute("DELETE FROM sessions WHERE token = ?", (token,)) conn.close() def require_auth( request: Request, x_api_key: Optional[str] = None, cookie_session: Optional[str] = Cookie(default=None, alias=SESSION_COOKIE), ): """Valida el acceso por header X-Api-Key, Basic Auth o cookie de sesion.""" # 1) X-Api-Key api_key = request.headers.get("X-Api-Key") if api_key: # Formato: username:password (basic-auth style) if ":" in api_key: user, _, pw = api_key.partition(":") role = verify_password(user, pw) if role: return {"username": user, "role": role} # O un magic key (admin only) if MAGIC_KEY and secrets.compare_digest(api_key, MAGIC_KEY): sess = get_session_user(cookie_session) if sess: return sess # Magic key crea session para admin automáticamente token = create_session("admin") # No podemos setear cookie en este response, pero devolvemos OK return {"username": "admin", "role": "admin", "_magic_token": token} # 2) HTTP Basic Auth auth = request.headers.get("Authorization", "") if auth.startswith("Basic "): import base64 as b64 try: decoded = b64.b64decode(auth[6:]).decode() user, _, pw = decoded.partition(":") role = verify_password(user, pw) if role: return {"username": user, "role": role} except Exception: pass # 3) Cookie if cookie_session: sess = get_session_user(cookie_session) if sess: return sess raise HTTPException(status_code=401, detail="credenciales invalidas") def require_admin(user: dict = Depends(require_auth)): if user.get("role") != "admin": raise HTTPException(403, "se requiere admin") return user # --- Schemas --- class LoginRequest(BaseModel): username: str = Field(..., min_length=1, max_length=64) password: str = Field(..., min_length=1, max_length=200) class ContainerIn(BaseModel): container: str = Field(..., min_length=4, max_length=20, pattern=r"^[A-Z0-9]+$") interval_minutes: int = Field(360, ge=5, le=1440) class ContainerPatch(BaseModel): interval_minutes: Optional[int] = Field(None, ge=5, le=1440) class UserCreate(BaseModel): username: str = Field(..., min_length=1, max_length=64, pattern=r"^[a-zA-Z0-9_.-]+$") password: str = Field(..., min_length=6, max_length=200) role: str = Field("user", pattern=r"^(admin|user)$") class UserPasswordChange(BaseModel): old_password: str = Field(..., min_length=1, max_length=200) new_password: str = Field(..., min_length=6, max_length=200) class AdminPasswordReset(BaseModel): new_password: str = Field(..., min_length=6, max_length=200) # --- Worker client --- def call_worker_track(container: str) -> Dict[str, Any]: r = requests.post( f"{WORKER_URL}/track", json={"container": container}, timeout=POLL_TIMEOUT_SECONDS, ) if r.status_code == 200: return r.json() return {"error": True, "status": r.status_code, "detail": r.text[:500]} def call_worker_track_full(container: str, fresh: bool = True) -> Dict[str, Any]: r = requests.post( f"{WORKER_URL}/track_full", json={"container": container, "fresh": fresh}, timeout=POLL_TIMEOUT_SECONDS * 2, ) if r.status_code == 200: return r.json() try: d = r.json() return {"error": True, "status": r.status_code, "detail": d.get("detail", d)} except Exception: return {"error": True, "status": r.status_code, "detail": r.text[:500]} def call_worker_health() -> Optional[Dict[str, Any]]: try: r = requests.get(f"{WORKER_URL}/health", timeout=5) return r.json() if r.status_code == 200 else None except Exception: return None # --- Scheduler --- def should_poll(container: Dict[str, Any], now_ts: float) -> bool: last = container.get("last_poll_ts") if last is None: return True interval_s = container.get("interval_minutes", 360) * 60 return (now_ts - last) >= interval_s async def scheduler_loop(): log.info("scheduler arrancado") while True: try: await asyncio.sleep(SCHEDULER_TICK_SECONDS) cleanup_sessions() state = load_state() changed = False now_ts = time.time() for c in state["containers"]: if should_poll(c, now_ts): log.info(f"poll scheduled: {c['container']}") try: data = await asyncio.to_thread(call_worker_track, c["container"]) c["last_poll_ts"] = now_ts c["last_poll_at"] = datetime.now(timezone.utc).isoformat() if "error" in data: c["last_error"] = data.get("detail", "error") c["last_status"] = data.get("status", "error") else: c["last_data"] = data c["last_error"] = None c["last_status"] = 200 changed = True except Exception as e: log.exception(f"error polling {c['container']}") c["last_error"] = str(e)[:300] c["last_status"] = "exception" changed = True if changed: save_state(state) except asyncio.CancelledError: log.info("scheduler cancelado") break except Exception: log.exception("error en el scheduler loop") # --- App --- @asynccontextmanager async def lifespan(app: FastAPI): log.info(f"arrancando API en puerto {PORT}") task = asyncio.create_task(scheduler_loop()) yield task.cancel() try: await task except asyncio.CancelledError: pass app = FastAPI(title="MSC Tracker API", version="2.0.0", lifespan=lifespan) app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") # --- Pages --- @app.get("/", response_class=HTMLResponse) def index(): return HTMLResponse((STATIC_DIR / "index.html").read_text()) @app.get("/login", response_class=HTMLResponse) def login_page(): return HTMLResponse((STATIC_DIR / "login.html").read_text()) # --- Auth endpoints --- @app.post("/api/login") def login(req: LoginRequest, response: Response): role = verify_password(req.username, req.password) if not role: # Mismo mensaje para user inexistente y pass mal, para no filtrar info # 1s de delay para evitar brute force time.sleep(1) raise HTTPException(status_code=401, detail="credenciales invalidas") token = create_session(req.username) response.set_cookie( key=SESSION_COOKIE, value=token, max_age=SESSION_MAX_AGE, httponly=True, samesite="strict", ) return {"ok": True, "username": req.username, "role": role, "expires_in": SESSION_MAX_AGE} @app.post("/api/logout") def logout( response: Response, msc_session: Optional[str] = Cookie(default=None, alias=SESSION_COOKIE), ): if msc_session: delete_session(msc_session) response.delete_cookie(SESSION_COOKIE) return {"ok": True} @app.get("/api/me") def me(user: dict = Depends(require_auth)): return {"username": user["username"], "role": user["role"]} # Magic login: te loguea como el user con la key (no necesita password) # Solo admin (porque el magic key es un superpoder) @app.get("/api/magic-login") def magic_login(user: str = "admin", response: Response = None): if not MAGIC_KEY: raise HTTPException(404, "magic login no habilitado") # Verifico que el magic key venga en el header (no se puede adivinar por URL) # Wait, ya está en la URL... ok, eso es lo que hay. # El user SIEMPRE tiene que ser admin para magic-login (no exponemos a otros users) if user != "admin": raise HTTPException(403, "solo admin puede usar magic-login") # Generamos un nuevo magic token (one-time use) magic_token = create_session("admin") r = RedirectResponse(url="/", status_code=302) r.set_cookie(key=SESSION_COOKIE, value=magic_token, max_age=SESSION_MAX_AGE, httponly=True, samesite="strict") return r # --- User management (admin only) --- @app.get("/api/admin/users") def list_users(_: dict = Depends(require_admin)): conn = get_db() rows = conn.execute( "SELECT username, role, created_at, last_login FROM users ORDER BY created_at" ).fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/admin/users") def create_user(u: UserCreate, creator: dict = Depends(require_admin)): conn = get_db() if conn.execute("SELECT 1 FROM users WHERE username = ?", (u.username,)).fetchone(): conn.close() raise HTTPException(409, f"user {u.username} ya existe") pw_hash = hash_password(u.password) conn.execute( "INSERT INTO users (username, password_hash, role, created_at) VALUES (?, ?, ?, ?)", (u.username, pw_hash, u.role, datetime.now(timezone.utc).isoformat()), ) conn.close() log.info(f"admin {creator['username']} creó user {u.username} ({u.role})") return {"ok": True, "username": u.username, "role": u.role} @app.delete("/api/admin/users/{username}") def delete_user(username: str, deleter: dict = Depends(require_admin)): if username == "admin": raise HTTPException(400, "no se puede borrar el admin") if username == deleter["username"]: raise HTTPException(400, "no te podés borrar a vos mismo") conn = get_db() cur = conn.execute("DELETE FROM users WHERE username = ?", (username,)) if cur.rowcount == 0: conn.close() raise HTTPException(404, f"no existe {username}") conn.close() log.info(f"admin {deleter['username']} borró user {username}") return {"ok": True, "deleted": username} @app.post("/api/admin/users/{username}/reset-password") def admin_reset_password( username: str, body: AdminPasswordReset, _: dict = Depends(require_admin) ): conn = get_db() if not conn.execute("SELECT 1 FROM users WHERE username = ?", (username,)).fetchone(): conn.close() raise HTTPException(404, f"no existe {username}") pw_hash = hash_password(body.new_password) conn.execute("UPDATE users SET password_hash = ? WHERE username = ?", (pw_hash, username)) conn.close() return {"ok": True, "username": username} # --- Self-service: change own password --- @app.post("/api/me/password") def change_my_password( body: UserPasswordChange, user: dict = Depends(require_auth) ): if not verify_password(user["username"], body.old_password): time.sleep(1) raise HTTPException(401, "password actual incorrecto") conn = get_db() pw_hash = hash_password(body.new_password) conn.execute("UPDATE users SET password_hash = ? WHERE username = ?", (pw_hash, user["username"])) # Invalidar otras sesiones del mismo user conn.execute("DELETE FROM sessions WHERE username = ?", (user["username"],)) conn.close() return {"ok": True} # --- Containers API (igual que antes) --- @app.get("/api/containers") def list_containers(_: dict = Depends(require_auth)): state = load_state() return state["containers"] @app.post("/api/containers") def add_container(c: ContainerIn, _: dict = Depends(require_auth)): state = load_state() if any(x["container"] == c.container for x in state["containers"]): raise HTTPException(409, f"el container {c.container} ya esta en la lista") state["containers"].append({ "container": c.container, "interval_minutes": c.interval_minutes, "added_at": datetime.now(timezone.utc).isoformat(), "last_poll_ts": None, "last_poll_at": None, "last_data": None, "last_status": None, "last_error": None, }) save_state(state) return state["containers"][-1] @app.delete("/api/containers/{container}") def remove_container(container: str, _: dict = Depends(require_auth)): state = load_state() before = len(state["containers"]) state["containers"] = [x for x in state["containers"] if x["container"] != container] if len(state["containers"]) == before: raise HTTPException(404, f"no existe el container {container}") save_state(state) return {"ok": True, "remaining": len(state["containers"])} @app.patch("/api/containers/{container}") def patch_container(container: str, patch: ContainerPatch, _: dict = Depends(require_auth)): state = load_state() for c in state["containers"]: if c["container"] == container: if patch.interval_minutes is not None: c["interval_minutes"] = patch.interval_minutes save_state(state) return c raise HTTPException(404, f"no existe el container {container}") @app.post("/api/containers/{container}/track") def track_now(container: str, _: dict = Depends(require_auth)): state = load_state() target = None for c in state["containers"]: if c["container"] == container: target = c break if not target: raise HTTPException(404, f"no existe el container {container}") log.info(f"track manual: {container}") try: data = call_worker_track(container) now_ts = time.time() target["last_poll_ts"] = now_ts target["last_poll_at"] = datetime.now(timezone.utc).isoformat() if "error" in data: target["last_error"] = data.get("detail", "error") target["last_status"] = data.get("status", "error") else: target["last_data"] = data target["last_error"] = None target["last_status"] = 200 save_state(state) return target except Exception as e: log.exception(f"error en track manual {container}") raise HTTPException(502, f"error llamando al worker: {e}") # --- Worker status / logs --- @app.get("/api/worker/status") def worker_status(_: dict = Depends(require_auth)): h = call_worker_health() return {"worker_reachable": h is not None, "worker_health": h} @app.get("/api/worker/logs") def worker_logs(n: int = 50, _: dict = Depends(require_auth)): try: r = subprocess.run( ["journalctl", "-u", "worker.service", "-n", str(n), "--no-pager", "-o", "cat"], capture_output=True, text=True, timeout=10, ) if r.returncode != 0: raise HTTPException(500, f"journalctl fallo: {r.stderr}") return {"lines": r.stdout.splitlines()} except FileNotFoundError: raise HTTPException(500, "journalctl no disponible") except Exception as e: raise HTTPException(500, str(e)) # --- Track on demand (Andres endpoint) --- class TrackRequest(BaseModel): container: str = Field(..., min_length=4, max_length=20, pattern=r"^[A-Z0-9]+$") @app.post("/api/track") def track_once(req: TrackRequest, _: dict = Depends(require_auth)): try: return call_worker_track(req.container) except Exception as e: raise HTTPException(502, f"error llamando al worker: {e}") @app.get("/track/{container}") def track_path(container: str, _: dict = Depends(require_auth)): container = container.strip().upper() if not (4 <= len(container) <= 20) or not container.isalnum() or not container.isascii(): raise HTTPException(400, f"container invalido: {container!r}") log.info(f"GET /track/{container}") try: return call_worker_track_full(container) except Exception as e: raise HTTPException(502, f"error llamando al worker: {e}") # --- llms.txt --- @app.get("/llms.txt", response_class=Response) def llms_txt(): p = API_DIR / "llms.txt" if not p.exists(): raise HTTPException(404, "llms.txt no existe") return Response(content=p.read_text(), media_type="text/plain; charset=utf-8") @app.get("/llms-full.txt", response_class=Response) def llms_full_txt(): p = API_DIR / "llms-full.txt" if not p.exists(): raise HTTPException(404, "llms-full.txt no existe") return Response(content=p.read_text(), media_type="text/plain; charset=utf-8") # --- Main --- if __name__ == "__main__": import uvicorn log.info(f"arrancando en 0.0.0.0:{PORT}") uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")