from fastapi import FastAPI, Request, Response, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader from fastapi import Security import httpx import os from dotenv import load_dotenv import json from typing import Optional, Set import time load_dotenv() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Load API keys from environment API_KEYS: Set[str] = set(os.getenv("API_KEYS", "").split(",")) if not API_KEYS: raise ValueError("API_KEYS environment variable must be set") api_key_header = APIKeyHeader(name="X-API-Key") # Global variables to store authentication data auth_data = { "session_id": None, "omadac_id": None, "token": None, "last_auth": 0 } OMADA_BASE_URL = os.getenv("OMADA_BASE_URL", "https://omada.sarlink.link") USERNAME = os.getenv("OMADA_USERNAME") PASSWORD = os.getenv("OMADA_PASSWORD") if not all([USERNAME, PASSWORD]): raise ValueError("OMADA_USERNAME and OMADA_PASSWORD must be set in environment") async def validate_api_key(api_key: str = Security(api_key_header)) -> str: """Validate the API key""" if api_key not in API_KEYS: raise HTTPException( status_code=401, detail="Invalid API key", headers={"WWW-Authenticate": "API key"}, ) return api_key async def login_to_omada() -> bool: """Login to Omada controller and update auth data""" try: async with httpx.AsyncClient(verify=False) as client: response = await client.post( f"{OMADA_BASE_URL}/api/v2/login", json={"username": USERNAME, "password": PASSWORD}, headers={"Content-Type": "application/json"} ) if response.status_code != 200: print(f"Login failed: {response.text}") return False data = response.json() if data.get("errorCode") != 0: print(f"Login error: {data}") return False # Extract session ID from cookies cookies = response.headers.get("set-cookie") if cookies: for cookie in cookies.split(", "): if "TPOMADA_SESSIONID" in cookie: session_id = cookie.split(";")[0].split("=")[1] auth_data["session_id"] = session_id break auth_data["omadac_id"] = data["result"]["omadacId"] auth_data["token"] = data["result"]["token"] auth_data["last_auth"] = time.time() print("Successfully authenticated with Omada controller") return True except Exception as e: print(f"Login error: {str(e)}") return False def is_auth_valid() -> bool: """Check if current authentication is valid""" return all([ auth_data["session_id"], auth_data["omadac_id"], auth_data["token"], time.time() - auth_data["last_auth"] < 3600 # 1 hour timeout ]) @app.get("/health") async def health_check(): return {"status": "healthy"} @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]) async def proxy_request(request: Request, path: str, api_key: str = Security(validate_api_key)): """Forward requests to Omada controller with proper authentication""" # Ensure we're authenticated with Omada if not is_auth_valid(): if not await login_to_omada(): raise HTTPException(status_code=500, detail="Failed to authenticate with Omada controller") # Get the request body if any body = None if request.method not in ["GET", "HEAD"]: body = await request.body() # Prepare headers headers = dict(request.headers) headers.pop("host", None) headers.pop("x-api-key", None) # Remove our API key from forwarded request headers["Cookie"] = f"TPOMADA_SESSIONID={auth_data['session_id']}" headers["csrf-token"] = auth_data["token"] # Construct the target URL path = path.lstrip("/") if "api/v2" in path and auth_data["omadac_id"] not in path: # Insert omadac_id for API requests if not present parts = path.split("api/v2", 1) target_url = f"{OMADA_BASE_URL}/{auth_data['omadac_id']}/api/v2{parts[1]}" else: target_url = f"{OMADA_BASE_URL}/{path}" # Add query parameters query = request.url.query if query: target_url = f"{target_url}?{query}" try: async with httpx.AsyncClient(verify=False) as client: response = await client.request( method=request.method, url=target_url, headers=headers, content=body, follow_redirects=False # Don't automatically follow redirects ) # Handle redirects to login page if response.status_code == 302 and "/login" in response.headers.get("location", ""): if await login_to_omada(): # Retry the request with new credentials headers["Cookie"] = f"TPOMADA_SESSIONID={auth_data['session_id']}" headers["csrf-token"] = auth_data["token"] response = await client.request( method=request.method, url=target_url, headers=headers, content=body, follow_redirects=True ) # Create response with same status code and headers return Response( content=response.content, status_code=response.status_code, headers=dict(response.headers) ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)