omada-api-proxy/proxy.py

176 lines
6.0 KiB
Python
Raw Normal View History

2024-12-20 06:02:52 +05:00
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import APIKeyHeader
from fastapi import Security
import httpx
import os
from dotenv import load_dotenv
import json
from typing import Optional, Set
import time
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load API keys from environment
API_KEYS: Set[str] = set(os.getenv("API_KEYS", "").split(","))
if not API_KEYS:
raise ValueError("API_KEYS environment variable must be set")
api_key_header = APIKeyHeader(name="X-API-Key")
# Global variables to store authentication data
auth_data = {
"session_id": None,
"omadac_id": None,
"token": None,
"last_auth": 0
}
OMADA_BASE_URL = os.getenv("OMADA_BASE_URL", "https://omada.sarlink.link")
USERNAME = os.getenv("OMADA_USERNAME")
PASSWORD = os.getenv("OMADA_PASSWORD")
if not all([USERNAME, PASSWORD]):
raise ValueError("OMADA_USERNAME and OMADA_PASSWORD must be set in environment")
async def validate_api_key(api_key: str = Security(api_key_header)) -> str:
"""Validate the API key"""
if api_key not in API_KEYS:
raise HTTPException(
status_code=401,
detail="Invalid API key",
headers={"WWW-Authenticate": "API key"},
)
return api_key
async def login_to_omada() -> bool:
"""Login to Omada controller and update auth data"""
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(
f"{OMADA_BASE_URL}/api/v2/login",
json={"username": USERNAME, "password": PASSWORD},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
print(f"Login failed: {response.text}")
return False
data = response.json()
if data.get("errorCode") != 0:
print(f"Login error: {data}")
return False
# Extract session ID from cookies
cookies = response.headers.get("set-cookie")
if cookies:
for cookie in cookies.split(", "):
if "TPOMADA_SESSIONID" in cookie:
session_id = cookie.split(";")[0].split("=")[1]
auth_data["session_id"] = session_id
break
auth_data["omadac_id"] = data["result"]["omadacId"]
auth_data["token"] = data["result"]["token"]
auth_data["last_auth"] = time.time()
print("Successfully authenticated with Omada controller")
return True
except Exception as e:
print(f"Login error: {str(e)}")
return False
def is_auth_valid() -> bool:
"""Check if current authentication is valid"""
return all([
auth_data["session_id"],
auth_data["omadac_id"],
auth_data["token"],
time.time() - auth_data["last_auth"] < 3600 # 1 hour timeout
])
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
async def proxy_request(request: Request, path: str, api_key: str = Security(validate_api_key)):
"""Forward requests to Omada controller with proper authentication"""
# Ensure we're authenticated with Omada
if not is_auth_valid():
if not await login_to_omada():
raise HTTPException(status_code=500, detail="Failed to authenticate with Omada controller")
# Get the request body if any
body = None
if request.method not in ["GET", "HEAD"]:
body = await request.body()
# Prepare headers
headers = dict(request.headers)
headers.pop("host", None)
headers.pop("x-api-key", None) # Remove our API key from forwarded request
headers["Cookie"] = f"TPOMADA_SESSIONID={auth_data['session_id']}"
headers["csrf-token"] = auth_data["token"]
# Construct the target URL
path = path.lstrip("/")
if "api/v2" in path and auth_data["omadac_id"] not in path:
# Insert omadac_id for API requests if not present
parts = path.split("api/v2", 1)
target_url = f"{OMADA_BASE_URL}/{auth_data['omadac_id']}/api/v2{parts[1]}"
else:
target_url = f"{OMADA_BASE_URL}/{path}"
# Add query parameters
query = request.url.query
if query:
target_url = f"{target_url}?{query}"
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=body,
follow_redirects=False # Don't automatically follow redirects
)
# Handle redirects to login page
if response.status_code == 302 and "/login" in response.headers.get("location", ""):
if await login_to_omada():
# Retry the request with new credentials
headers["Cookie"] = f"TPOMADA_SESSIONID={auth_data['session_id']}"
headers["csrf-token"] = auth_data["token"]
response = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=body,
follow_redirects=True
)
# Create response with same status code and headers
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)