proper status code, added detail in reposne

This commit is contained in:
2025-05-31 21:12:51 +05:00
parent b56cfd59a3
commit 247eece2a2

View File

@@ -1,144 +1,120 @@
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import httpx
from fastapi.responses import JSONResponse
import re
import os
from typing import Dict, Optional
import httpx
from urllib.parse import unquote
local_mac_vendors: Dict[str, str] = {}
app = FastAPI(title="MAC Address Vendor Lookup API")
@asynccontextmanager
async def lifespan(app: FastAPI):
load_local_mac_vendors()
yield
def normalize_mac(mac_address: str) -> str:
mac_clean = unquote(mac_address)
mac_clean = re.sub(r'[^a-fA-F0-9]', '', mac_clean)
if len(mac_clean) < 6:
return None
return '-'.join([mac_clean[i:i+2].upper() for i in range(0, len(mac_clean), 2)])
app = FastAPI(title="MAC Address Vendor Lookup", version="1.0.0", lifespan=lifespan)
def is_valid_mac(mac_address: str) -> bool:
mac_clean = unquote(mac_address)
mac_clean = re.sub(r'[^a-fA-F0-9]', '', mac_clean)
return len(mac_clean) >= 6 and len(mac_clean) <= 12 and all(c in '0123456789abcdefABCDEF' for c in mac_clean)
local_mac_vendors: Dict[str, str] = {}
def get_mac_prefix(mac_address: str) -> str:
mac_clean = re.sub(r'[^a-fA-F0-9]', '', mac_address)
return mac_clean[:6].upper()
def load_local_mac_vendors():
"""Load MAC vendor data from local file"""
global local_mac_vendors
if local_mac_vendors or not os.path.exists("mac_list.txt"):
return
def lookup_local_file(mac_prefix: str) -> str:
if not os.path.exists('mac_list.txt'):
return None
try:
with open("mac_list.txt", "r", encoding="utf-8") as file:
for line in file:
with open('mac_list.txt', 'r') as f:
for line in f:
line = line.strip()
if line:
parts = line.split('\t', 1)
if len(parts) == 2:
oui = parts[0].strip().upper()
vendor = parts[1].strip()
local_mac_vendors[oui] = vendor
if line and ' ' in line:
prefix, vendor = line.split(' ', 1)
if prefix.upper() == mac_prefix:
return vendor
except Exception:
pass
return None
async def lookup_from_github(oui: str) -> Optional[str]:
"""Lookup vendor from GitHub gist"""
async def lookup_remote_gist(mac_prefix: str) -> str:
url = "https://gist.githubusercontent.com/aallan/b4bb86db86079509e6159810ae9bd3e4/raw/846ae1b646ab0f4d646af9115e47365f4118e5f6/mac-vendor.txt"
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
for line in response.text.strip().split('\n'):
if line.strip():
parts = line.split('\t', 1)
if len(parts) == 2 and parts[0].strip().upper() == oui:
return parts[1].strip()
except Exception:
pass
return None
async def lookup_from_macvendors_api(mac_address: str) -> Optional[str]:
"""Lookup vendor from macvendors.com API"""
try:
formatted_mac = normalize_mac_for_api(mac_address)
url = f"https://api.macvendors.com/{formatted_mac}"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response = await client.get(url, timeout=5.0)
if response.status_code == 200:
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return None
else:
return response.text.strip()
for line in response.text.split('\n'):
line = line.strip()
if line and ' ' in line:
prefix, vendor = line.split(' ', 1)
if prefix.upper() == mac_prefix:
return vendor
except Exception:
pass
return None
def normalize_mac_for_api(mac_address: str) -> str:
"""Normalize MAC address for external API (XX-XX-XX format)"""
clean_mac = re.sub(r'[^a-fA-F0-9]', '', mac_address.upper())
if len(clean_mac) >= 6:
return f"{clean_mac[0:2]}-{clean_mac[2:4]}-{clean_mac[4:6]}"
else:
raise ValueError("Invalid MAC address format")
def normalize_mac_response(mac_address: str) -> str:
"""Normalize MAC address for response format"""
clean_mac = re.sub(r'[^a-fA-F0-9]', '', mac_address.upper())
async def lookup_remote_api(normalized_mac: str) -> str:
mac_for_api = normalized_mac
if len(mac_for_api.replace('-', '')) == 6:
mac_for_api = mac_for_api + '-00-00-00'
if len(clean_mac) == 6:
return f"{clean_mac[0:2]}-{clean_mac[2:4]}-{clean_mac[4:6]}"
elif len(clean_mac) == 12:
return f"{clean_mac[0:2]}-{clean_mac[2:4]}-{clean_mac[4:6]}-{clean_mac[6:8]}-{clean_mac[8:10]}-{clean_mac[10:12]}"
else:
raise ValueError("Invalid MAC address format")
def get_oui(mac_address: str) -> str:
"""Extract OUI (first 6 hex digits) from MAC address"""
clean_mac = re.sub(r'[^a-fA-F0-9]', '', mac_address.upper())
if len(clean_mac) >= 6:
return clean_mac[:6]
else:
raise ValueError("MAC address too short")
@app.get("/")
async def root():
return {"message": "MAC Address Vendor Lookup API"}
url = f"https://api.macvendors.com/{mac_for_api}"
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
if response.status_code == 200:
return response.text.strip()
except Exception:
pass
return None
@app.get("/lookup/{mac_address}")
async def lookup_vendor(mac_address: str):
try:
oui = get_oui(mac_address)
formatted_mac = normalize_mac_response(mac_address)
vendor = None
# 1. Try local file first
if oui in local_mac_vendors:
vendor = local_mac_vendors[oui]
# 2. Try GitHub gist if not found locally
if not vendor:
vendor = await lookup_from_github(oui)
# 3. Try macvendors.com API if still not found
if not vendor:
vendor = await lookup_from_macvendors_api(mac_address)
# Default to Unknown if nothing found
if not vendor:
vendor = "Unknown"
async def lookup_mac_vendor(mac_address: str):
if not is_valid_mac(mac_address):
return JSONResponse(
status_code=400,
content={
"mac_address": mac_address,
"vendor": None,
"detail": "Invalid MAC address format"
}
)
normalized_mac = normalize_mac(mac_address)
mac_prefix = get_mac_prefix(mac_address)
vendor = lookup_local_file(mac_prefix)
if not vendor:
vendor = await lookup_remote_gist(mac_prefix)
if not vendor:
vendor = await lookup_remote_api(normalized_mac)
if vendor:
return {
"mac_address": formatted_mac,
"vendor": vendor
"mac_address": normalized_mac,
"vendor": vendor,
"detail": None
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="Internal server error")
else:
return JSONResponse(
status_code=404,
content={
"mac_address": normalized_mac,
"vendor": None,
"detail": "Vendor not found"
}
)
if __name__ == "__main__":
import uvicorn