176 lines
6.0 KiB
Python
176 lines
6.0 KiB
Python
from fastapi import FastAPI, Request, Response, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.security import APIKeyHeader
|
|
from fastapi import Security
|
|
import httpx
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import json
|
|
from typing import Optional, Set
|
|
import time
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Load API keys from environment
|
|
API_KEYS: Set[str] = set(os.getenv("API_KEYS", "").split(","))
|
|
if not API_KEYS:
|
|
raise ValueError("API_KEYS environment variable must be set")
|
|
|
|
api_key_header = APIKeyHeader(name="X-API-Key")
|
|
|
|
# Global variables to store authentication data
|
|
auth_data = {
|
|
"session_id": None,
|
|
"omadac_id": None,
|
|
"token": None,
|
|
"last_auth": 0
|
|
}
|
|
|
|
OMADA_BASE_URL = os.getenv("OMADA_BASE_URL", "https://omada.sarlink.link")
|
|
USERNAME = os.getenv("OMADA_USERNAME")
|
|
PASSWORD = os.getenv("OMADA_PASSWORD")
|
|
|
|
if not all([USERNAME, PASSWORD]):
|
|
raise ValueError("OMADA_USERNAME and OMADA_PASSWORD must be set in environment")
|
|
|
|
async def validate_api_key(api_key: str = Security(api_key_header)) -> str:
|
|
"""Validate the API key"""
|
|
if api_key not in API_KEYS:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid API key",
|
|
headers={"WWW-Authenticate": "API key"},
|
|
)
|
|
return api_key
|
|
|
|
async def login_to_omada() -> bool:
|
|
"""Login to Omada controller and update auth data"""
|
|
try:
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
response = await client.post(
|
|
f"{OMADA_BASE_URL}/api/v2/login",
|
|
json={"username": USERNAME, "password": PASSWORD},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f"Login failed: {response.text}")
|
|
return False
|
|
|
|
data = response.json()
|
|
if data.get("errorCode") != 0:
|
|
print(f"Login error: {data}")
|
|
return False
|
|
|
|
# Extract session ID from cookies
|
|
cookies = response.headers.get("set-cookie")
|
|
if cookies:
|
|
for cookie in cookies.split(", "):
|
|
if "TPOMADA_SESSIONID" in cookie:
|
|
session_id = cookie.split(";")[0].split("=")[1]
|
|
auth_data["session_id"] = session_id
|
|
break
|
|
|
|
auth_data["omadac_id"] = data["result"]["omadacId"]
|
|
auth_data["token"] = data["result"]["token"]
|
|
auth_data["last_auth"] = time.time()
|
|
|
|
print("Successfully authenticated with Omada controller")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Login error: {str(e)}")
|
|
return False
|
|
|
|
def is_auth_valid() -> bool:
|
|
"""Check if current authentication is valid"""
|
|
return all([
|
|
auth_data["session_id"],
|
|
auth_data["omadac_id"],
|
|
auth_data["token"],
|
|
time.time() - auth_data["last_auth"] < 3600 # 1 hour timeout
|
|
])
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {"status": "healthy"}
|
|
|
|
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
|
|
async def proxy_request(request: Request, path: str, api_key: str = Security(validate_api_key)):
|
|
"""Forward requests to Omada controller with proper authentication"""
|
|
# Ensure we're authenticated with Omada
|
|
if not is_auth_valid():
|
|
if not await login_to_omada():
|
|
raise HTTPException(status_code=500, detail="Failed to authenticate with Omada controller")
|
|
|
|
# Get the request body if any
|
|
body = None
|
|
if request.method not in ["GET", "HEAD"]:
|
|
body = await request.body()
|
|
|
|
# Prepare headers
|
|
headers = dict(request.headers)
|
|
headers.pop("host", None)
|
|
headers.pop("x-api-key", None) # Remove our API key from forwarded request
|
|
headers["Cookie"] = f"TPOMADA_SESSIONID={auth_data['session_id']}"
|
|
headers["csrf-token"] = auth_data["token"]
|
|
|
|
# Construct the target URL
|
|
path = path.lstrip("/")
|
|
if "api/v2" in path and auth_data["omadac_id"] not in path:
|
|
# Insert omadac_id for API requests if not present
|
|
parts = path.split("api/v2", 1)
|
|
target_url = f"{OMADA_BASE_URL}/{auth_data['omadac_id']}/api/v2{parts[1]}"
|
|
else:
|
|
target_url = f"{OMADA_BASE_URL}/{path}"
|
|
|
|
# Add query parameters
|
|
query = request.url.query
|
|
if query:
|
|
target_url = f"{target_url}?{query}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
response = await client.request(
|
|
method=request.method,
|
|
url=target_url,
|
|
headers=headers,
|
|
content=body,
|
|
follow_redirects=False # Don't automatically follow redirects
|
|
)
|
|
|
|
# Handle redirects to login page
|
|
if response.status_code == 302 and "/login" in response.headers.get("location", ""):
|
|
if await login_to_omada():
|
|
# Retry the request with new credentials
|
|
headers["Cookie"] = f"TPOMADA_SESSIONID={auth_data['session_id']}"
|
|
headers["csrf-token"] = auth_data["token"]
|
|
response = await client.request(
|
|
method=request.method,
|
|
url=target_url,
|
|
headers=headers,
|
|
content=body,
|
|
follow_redirects=True
|
|
)
|
|
|
|
# Create response with same status code and headers
|
|
return Response(
|
|
content=response.content,
|
|
status_code=response.status_code,
|
|
headers=dict(response.headers)
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|