lokiai / main.py
ParthSadaria's picture
Update main.py
c23079f verified
import os
import re
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request, Depends, Security, Query
from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse, FileResponse, PlainTextResponse
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
import httpx
from functools import lru_cache
from pathlib import Path
import json
import datetime
import time
import asyncio
from starlette.status import HTTP_403_FORBIDDEN
import cloudscraper
from concurrent.futures import ThreadPoolExecutor
import uvloop
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.cors import CORSMiddleware
import contextlib
import requests
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
executor = ThreadPoolExecutor(max_workers=16)
load_dotenv()
api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
from usage_tracker import UsageTracker
usage_tracker = UsageTracker()
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@lru_cache(maxsize=1)
def get_env_vars():
return {
'api_keys': os.getenv('API_KEYS', '').split(','),
'secret_api_endpoint': os.getenv('SECRET_API_ENDPOINT'),
'secret_api_endpoint_2': os.getenv('SECRET_API_ENDPOINT_2'),
'secret_api_endpoint_3': os.getenv('SECRET_API_ENDPOINT_3'),
'secret_api_endpoint_4': os.getenv('SECRET_API_ENDPOINT_4', "https://text.pollinations.ai/openai"),
'secret_api_endpoint_5': os.getenv('SECRET_API_ENDPOINT_5'),
'secret_api_endpoint_6': os.getenv('SECRET_API_ENDPOINT_6'),
'mistral_api': os.getenv('MISTRAL_API', "https://api.mistral.ai"),
'mistral_key': os.getenv('MISTRAL_KEY'),
'gemini_key': os.getenv('GEMINI_KEY'),
'endpoint_origin': os.getenv('ENDPOINT_ORIGIN'),
'new_img': os.getenv('NEW_IMG')
}
mistral_models = {
"mistral-large-latest", "pixtral-large-latest", "mistral-moderation-latest",
"ministral-3b-latest", "ministral-8b-latest", "open-mistral-nemo",
"mistral-small-latest", "mistral-saba-latest", "codestral-latest"
}
pollinations_models = {
"openai", "openai-large", "openai-fast", "openai-xlarge", "openai-reasoning",
"qwen-coder", "llama", "mistral", "searchgpt", "deepseek", "claude-hybridspace",
"deepseek-r1", "deepseek-reasoner", "llamalight", "gemini", "gemini-thinking",
"hormoz", "phi", "phi-mini", "openai-audio", "llama-scaleway"
}
alternate_models = {
"o1", "llama-4-scout", "o4-mini", "sonar", "sonar-pro", "sonar-reasoning",
"sonar-reasoning-pro", "grok-3", "grok-3-fast", "r1-1776", "o3"
}
claude_3_models = {
"claude-3-7-sonnet", "claude-3-7-sonnet-thinking", "claude 3.5 haiku",
"claude 3.5 sonnet", "claude 3.5 haiku", "o3-mini-medium", "o3-mini-high",
"grok-3", "grok-3-thinking", "grok 2"
}
gemini_models = {
# Gemini 1.5 Series
"gemini-1.5-pro", # Alias for latest stable 1.5 Pro[4][5]
"gemini-1.5-pro-002", # Latest 1.5 Pro stable[4][5]
"gemini-1.5-flash", # Alias for latest stable 1.5 Flash[4][5]
"gemini-1.5-flash-002", # Latest 1.5 Flash stable[4][5]
"gemini-1.5-flash-8b", # 1.5 Flash 8B variant[1]
# Gemini 2.0 Series
"gemini-2.0-flash-lite", # Cost-efficient model[5]
"gemini-2.0-flash-lite-preview", # Preview version[1]
"gemini-2.0-flash", # Default model as of Jan 2025[5]
"gemini-2.0-flash-exp", # Experimental version[1]
"gemini-2.0-flash-thinking", # Exposes model reasoning[5]
"gemini-2.0-flash-thinking-exp-01-21", # Experimental thinking[1]
"gemini-2.0-flash-preview-image-generation", # Image generation[1]
"gemini-2.0-pro-exp-02-05", # 2.0 Pro Experimental[1]
# Gemini 2.5 Series
"gemini-2.5-flash", # Default model as of May 2025[5][7]
"gemini-2.5-flash-preview-05-20", # 2.5 Flash preview[3]
"gemini-2.5-flash-preview-native-audio-dialog", # Native audio output[3]
"gemini-2.5-flash-exp-native-audio-thinking-dialog", # Audio thinking[3]
"gemini-2.5-pro", # 2.5 Pro (active, most advanced)[6][7]
"gemini-2.5-pro-preview-06-05", # Latest 2.5 Pro preview[3]
"gemini-2.5-pro-preview-03-25", # 2.5 Pro preview[1][3]
"gemini-2.5-pro-exp-03-25", # 2.5 Pro experimental[3]
"gemini-2.5-pro-preview-tts", # Speech generation[3]
"gemini-2.5-flash-preview-tts", # Speech generation[3]
# Experimental and Special Models
"gemini-exp-1206", # Experimental 2.0 Pro[1]
"gemini-embedding-exp-03-07",# Experimental embeddings[3]
}
supported_image_models = {
"Flux Pro Ultra", "grok-2-aurora", "Flux Pro", "Flux Pro Ultra Raw",
"Flux Dev", "Flux Schnell", "stable-diffusion-3-large-turbo",
"Flux Realism", "stable-diffusion-ultra", "dall-e-3", "sdxl-lightning-4step"
}
class Payload(BaseModel):
model: str
messages: list
stream: bool = False
class ImageGenerationPayload(BaseModel):
model: str
prompt: str
size: str = "1024x1024"
number: int = 1
server_status = True
available_model_ids: list[str] = []
@lru_cache(maxsize=1)
def get_async_client():
return httpx.AsyncClient(
timeout=60.0,
limits=httpx.Limits(max_keepalive_connections=50, max_connections=200)
)
scraper_pool = []
MAX_SCRAPERS = 20
def get_scraper():
if not scraper_pool:
for _ in range(MAX_SCRAPERS):
scraper_pool.append(cloudscraper.create_scraper())
return scraper_pool[int(time.time() * 1000) % MAX_SCRAPERS]
async def verify_api_key(
request: Request,
api_key: str = Security(api_key_header)
):
referer = request.headers.get("referer", "")
if referer.startswith(("https://parthsadaria-lokiai.hf.space/playground",
"https://parthsadaria-lokiai.hf.space/image-playground")):
return True
if not api_key:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail="No API key provided"
)
if api_key.startswith('Bearer '):
api_key = api_key[7:]
valid_api_keys = get_env_vars().get('api_keys', [])
if not valid_api_keys or valid_api_keys == ['']:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail="API keys not configured on server"
)
if api_key not in set(valid_api_keys):
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail="Invalid API key"
)
return True
@lru_cache(maxsize=1)
def load_models_data():
try:
file_path = Path(__file__).parent / 'models.json'
with open(file_path, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error loading models.json: {str(e)}")
return []
@app.get("/api/v1/models")
@app.get("/models")
async def get_models():
models_data = load_models_data()
if not models_data:
raise HTTPException(status_code=500, detail="Error loading available models")
return models_data
async def generate_search_async(query: str, systemprompt: str | None = None, stream: bool = True):
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
system_message = systemprompt or "Be Helpful and Friendly"
prompt_messages = [{"role": "user", "content": query}]
prompt_messages.insert(0, {"content": system_message, "role": "system"})
payload = {
"is_vscode_extension": True,
"message_history": prompt_messages,
"requested_model": "searchgpt",
"user_input": prompt_messages[-1]["content"],
}
secret_api_endpoint_3 = get_env_vars()['secret_api_endpoint_3']
if not secret_api_endpoint_3:
raise HTTPException(status_code=500, detail="Search API endpoint not configured")
client = get_async_client()
if stream:
queue = asyncio.Queue()
async def _fetch_search_data_stream():
try:
async with client.stream("POST", secret_api_endpoint_3, json=payload, headers=headers) as response:
if response.status_code != 200:
error_detail = await response.text()
await queue.put({"error": f"Search API returned status code {response.status_code}: {error_detail}"})
return
async for line in response.aiter_lines():
if line.startswith("data: "):
try:
json_data = json.loads(line[6:])
content = json_data.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content.strip():
cleaned_response = {
"created": json_data.get("created"),
"id": json_data.get("id"),
"model": "searchgpt",
"object": "chat.completion",
"choices": [
{
"message": {
"content": content
}
}
]
}
await queue.put({"data": f"data: {json.dumps(cleaned_response)}\n\n", "text": content})
except json.JSONDecodeError:
if line.strip() == "[DONE]":
continue
print(f"Warning: Could not decode JSON from search API stream: {line}")
await queue.put({"error": f"Invalid JSON from search API: {line}"})
break
await queue.put(None)
except Exception as e:
print(f"Error in _fetch_search_data_stream: {e}")
await queue.put({"error": str(e)})
await queue.put(None)
asyncio.create_task(_fetch_search_data_stream())
return queue
else:
try:
response = await client.post(secret_api_endpoint_3, json=payload, headers=headers)
response.raise_for_status()
json_data = response.json()
content = json_data.get("choices", [{}])[0].get("message", {}).get("content", "")
return {"response": content}
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail=f"Search API returned status {e.response.status_code}: {e.response.text}")
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Failed to connect to search API: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"An unexpected error occurred during search: {str(e)}")
@lru_cache(maxsize=10)
def read_html_file(file_path):
try:
with open(file_path, "r") as file:
return file.read()
except FileNotFoundError:
return None
@app.get("/favicon.ico")
async def favicon():
favicon_path = Path(__file__).parent / "favicon.ico"
return FileResponse(favicon_path, media_type="image/x-icon")
@app.get("/banner.jpg")
async def banner():
banner_path = Path(__file__).parent / "banner.jpg"
return FileResponse(banner_path, media_type="image/jpeg")
@app.get("/ping")
async def ping():
return {"message": "pong", "response_time": "0.000000 seconds"}
@app.get("/", response_class=HTMLResponse)
async def root():
html_content = read_html_file("index.html")
if html_content is None:
raise HTTPException(status_code=404, detail="index.html not found")
return HTMLResponse(content=html_content)
@app.get("/script.js", response_class=HTMLResponse)
async def script():
html_content = read_html_file("script.js")
if html_content is None:
raise HTTPException(status_code=404, detail="script.js not found")
return HTMLResponse(content=html_content)
@app.get("/style.css", response_class=HTMLResponse)
async def style():
html_content = read_html_file("style.css")
if html_content is None:
raise HTTPException(status_code=404, detail="style.css not found")
return HTMLResponse(content=html_content)
@app.get("/dynamo", response_class=HTMLResponse)
async def dynamic_ai_page(request: Request):
user_agent = request.headers.get('user-agent', 'Unknown User')
client_ip = request.client.host if request.client else "Unknown IP"
location = f"IP: {client_ip}"
prompt = f"""
Generate a dynamic HTML page for a user with the following details: with name "LOKI.AI"
- User-Agent: {user_agent}
- Location: {location}
- Style: Cyberpunk, minimalist, or retro
Make sure the HTML is clean and includes a heading, also have cool animations a motivational message, and a cool background.
Wrap the generated HTML in triple backticks (```).
"""
payload_data = {
"model": "mistral-small-latest",
"messages": [{"role": "user", "content": prompt}],
"stream": False
}
headers = {
"Authorization": "Bearer playground"
}
try:
client = get_async_client()
response = await client.post(
f"http://localhost:7860/chat/completions",
json=payload_data,
headers=headers,
timeout=30.0
)
response.raise_for_status()
data = response.json()
html_content = None
if data and 'choices' in data and len(data['choices']) > 0:
message_content = data['choices'][0].get('message', {}).get('content', '')
match = re.search(r"```(?:html)?(.*?)```", message_content, re.DOTALL)
if match:
html_content = match.group(1).strip()
else:
html_content = message_content.strip()
if not html_content:
raise HTTPException(status_code=500, detail="Failed to generate HTML content from AI.")
return HTMLResponse(content=html_content)
except httpx.RequestError as e:
print(f"HTTPX Request Error in /dynamo: {e}")
raise HTTPException(status_code=500, detail=f"Failed to connect to internal AI service: {e}")
except httpx.HTTPStatusError as e:
print(f"HTTPX Status Error in /dynamo: {e.response.status_code} - {e.response.text}")
raise HTTPException(status_code=e.response.status_code, detail=f"Internal AI service responded with error: {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred in /dynamo: {e}")
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")
@app.get("/scraper", response_class=PlainTextResponse)
async def scrape_site(url: str = Query(..., description="URL to scrape")):
try:
loop = asyncio.get_running_loop()
response_text = await loop.run_in_executor(
executor,
lambda: get_scraper().get(url).text
)
if response_text and len(response_text.strip()) > 0:
return PlainTextResponse(response_text)
else:
raise HTTPException(status_code=500, detail="Scraping returned empty content.")
except Exception as e:
print(f"Cloudscraper failed: {e}")
raise HTTPException(status_code=500, detail=f"Cloudscraper failed: {e}")
@app.get("/playground", response_class=HTMLResponse)
async def playground():
html_content = read_html_file("playground.html")
if html_content is None:
raise HTTPException(status_code=404, detail="playground.html not found")
return HTMLResponse(content=html_content)
@app.get("/image-playground", response_class=HTMLResponse)
async def image_playground():
html_content = read_html_file("image-playground.html")
if html_content is None:
raise HTTPException(status_code=404, detail="image-playground.html not found")
return HTMLResponse(content=html_content)
GITHUB_BASE = "[https://raw.githubusercontent.com/Parthsadaria/Vetra/main](https://raw.githubusercontent.com/Parthsadaria/Vetra/main)"
FILES = {
"html": "index.html",
"css": "style.css",
"js": "script.js"
}
async def get_github_file(filename: str) -> str | None:
url = f"{GITHUB_BASE}/{filename}"
client = get_async_client()
try:
res = await client.get(url, follow_redirects=True)
res.raise_for_status()
return res.text
except httpx.HTTPStatusError as e:
print(f"Error fetching {filename} from GitHub: {e.response.status_code} - {e.response.text}")
return None
except httpx.RequestError as e:
print(f"Request error fetching {filename} from GitHub: {e}")
return None
@app.get("/vetra", response_class=HTMLResponse)
async def serve_vetra():
html = await get_github_file(FILES["html"])
css = await get_github_file(FILES["css"])
js = await get_github_file(FILES["js"])
if not html:
raise HTTPException(status_code=404, detail="index.html not found on GitHub")
final_html = html.replace(
"</head>",
f"<style>{css or '/* CSS not found */'}</style></head>"
).replace(
"</body>",
f"<script>{js or '// JS not found'}</script></body>"
)
return HTMLResponse(content=final_html)
@app.get("/searchgpt")
async def search_gpt(q: str, request: Request, stream: bool = False, systemprompt: str | None = None):
if not q:
raise HTTPException(status_code=400, detail="Query parameter 'q' is required")
usage_tracker.record_request(request=request, model="searchgpt", endpoint="/searchgpt")
if stream:
queue = await generate_search_async(q, systemprompt=systemprompt, stream=True)
async def stream_generator():
collected_text = ""
while True:
item = await queue.get()
if item is None:
break
if "error" in item:
yield f"data: {json.dumps({'error': item['error']})}\n\n"
break
if "data" in item:
yield item["data"]
collected_text += item.get("text", "")
return StreamingResponse(
stream_generator(),
media_type="text/event-stream"
)
else:
response_data = await generate_search_async(q, systemprompt=systemprompt, stream=False)
return JSONResponse(content=response_data)
header_url = os.getenv('HEADER_URL')
@app.post("/chat/completions")
@app.post("/api/v1/chat/completions")
async def get_completion(payload: Payload, request: Request, authenticated: bool = Depends(verify_api_key)):
if not server_status:
raise HTTPException(
status_code=503,
detail="Server is under maintenance. Please try again later."
)
model_to_use = payload.model or "gpt-4o-mini"
if available_model_ids and model_to_use not in set(available_model_ids):
raise HTTPException(
status_code=400,
detail=f"Model '{model_to_use}' is not available. Check /models for the available model list."
)
usage_tracker.record_request(request=request, model=model_to_use, endpoint="/chat/completions")
payload_dict = payload.dict()
payload_dict["model"] = model_to_use
stream_enabled = payload_dict.get("stream", True)
env_vars = get_env_vars()
endpoint = None
custom_headers = {}
target_url_path = "/v1/chat/completions"
if model_to_use in mistral_models:
endpoint = env_vars['mistral_api']
custom_headers = {
"Authorization": f"Bearer {env_vars['mistral_key']}"
}
elif model_to_use in pollinations_models:
endpoint = env_vars['secret_api_endpoint_4']
custom_headers = {}
elif model_to_use in alternate_models:
endpoint = env_vars['secret_api_endpoint_2']
custom_headers = {}
elif model_to_use in claude_3_models:
endpoint = env_vars['secret_api_endpoint_5']
custom_headers = {}
elif model_to_use in gemini_models:
endpoint = env_vars['secret_api_endpoint_6']
if not endpoint:
raise HTTPException(status_code=500, detail="Gemini API endpoint (SECRET_API_ENDPOINT_6) not configured.")
if not env_vars['gemini_key']:
raise HTTPException(status_code=500, detail="GEMINI_KEY not configured for Gemini models.")
custom_headers = {
"Authorization": f"Bearer {env_vars['gemini_key']}"
}
target_url_path = "/chat/completions"
else:
endpoint = env_vars['secret_api_endpoint']
custom_headers = {
"Origin": header_url,
"Priority": "u=1, i",
"Referer": header_url
}
if not endpoint:
raise HTTPException(status_code=500, detail=f"No API endpoint configured for model: {model_to_use}")
print(f"Proxying request for model '{model_to_use}' to endpoint: {endpoint}{target_url_path}")
client = get_async_client()
if stream_enabled:
async def real_time_stream_generator():
try:
async with client.stream("POST", f"{endpoint}{target_url_path}", json=payload_dict, headers=custom_headers) as response:
if response.status_code >= 400:
error_messages = {
400: "Bad request. Verify input data.",
401: "Unauthorized. Invalid API key for upstream service.",
403: "Forbidden. You do not have access to this resource on upstream.",
404: "The requested resource was not found on upstream.",
422: "Unprocessable entity. Check your payload for upstream API.",
500: "Internal server error from upstream API."
}
detail_message = error_messages.get(response.status_code, f"Upstream error code: {response.status_code}")
try:
error_body = await response.aread()
error_json = json.loads(error_body.decode('utf-8'))
if 'error' in error_json and 'message' in error_json['error']:
detail_message += f" - Upstream detail: {error_json['error']['message']}"
elif 'detail' in error_json:
detail_message += f" - Upstream detail: {error_json['detail']}"
else:
detail_message += f" - Upstream raw: {error_body.decode('utf-8')[:200]}..."
except (json.JSONDecodeError, UnicodeDecodeError):
detail_message += f" - Upstream raw: {error_body.decode('utf-8', errors='ignore')[:200]}"
raise HTTPException(status_code=response.status_code, detail=detail_message)
async for line in response.aiter_lines():
if line:
yield line + "\n"
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Request to upstream AI service timed out.")
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Failed to connect to upstream AI service: {str(e)}")
except Exception as e:
if isinstance(e, HTTPException):
raise e
print(f"An unexpected error occurred during chat completion proxy: {e}")
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
return StreamingResponse(
real_time_stream_generator(),
media_type="text/event-stream",
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
else:
try:
response = await client.post(f"{endpoint}{target_url_path}", json=payload_dict, headers=custom_headers)
response.raise_for_status()
return JSONResponse(content=response.json())
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Request to upstream AI service timed out.")
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Failed to connect to upstream AI service: {str(e)}")
except httpx.HTTPStatusError as e:
error_messages = {
400: "Bad request. Verify input data.",
401: "Unauthorized. Invalid API key for upstream service.",
403: "Forbidden. You do not have access to this resource on upstream.",
404: "The requested resource was not found on upstream.",
422: "Unprocessable entity. Check your payload for upstream API.",
500: "Internal server error from upstream API."
}
detail_message = error_messages.get(e.response.status_code, f"Upstream error code: {e.response.status_code}")
try:
error_body = e.response.json()
if 'error' in error_body and 'message' in error_body['error']:
detail_message += f" - Upstream detail: {error_body['error']['message']}"
elif 'detail' in error_body:
detail_message += f" - Upstream detail: {error_body['detail']}"
except json.JSONDecodeError:
detail_message += f" - Upstream raw: {e.response.text[:200]}"
raise HTTPException(status_code=e.response.status_code, detail=detail_message)
except Exception as e:
print(f"An unexpected error occurred during non-streaming chat completion proxy: {e}")
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/images/generations")
async def create_image(payload: ImageGenerationPayload, request: Request, authenticated: bool = Depends(verify_api_key)):
if not server_status:
raise HTTPException(
status_code=503,
content={"message": "Server is under maintenance. Please try again later."}
)
if payload.model not in supported_image_models:
raise HTTPException(
status_code=400,
detail=f"Model '{payload.model}' is not supported for image generation. Supported models are: {', '.join(supported_image_models)}"
)
usage_tracker.record_request(request=request, model=payload.model, endpoint="/images/generations")
api_payload = {
"model": payload.model,
"prompt": payload.prompt,
"size": payload.size,
"n": payload.number
}
target_api_url = get_env_vars().get('new_img')
if not target_api_url:
raise HTTPException(status_code=500, detail="Image generation API endpoint (NEW_IMG) not configured.")
try:
client = get_async_client()
response = await client.post(target_api_url, json=api_payload)
response.raise_for_status()
return JSONResponse(content=response.json())
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Image generation request timed out.")
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Error connecting to image generation service: {e}")
except httpx.HTTPStatusError as e:
error_detail = e.response.json().get("detail", f"Image generation failed with status code: {e.response.status_code}")
raise HTTPException(status_code=e.response.status_code, detail=error_detail)
except Exception as e:
print(f"An unexpected error occurred during image generation: {e}")
raise HTTPException(status_code=500, detail=f"An unexpected error occurred during image generation: {e}")
@app.get("/usage")
async def get_usage_json(days: int = 7):
return usage_tracker.get_usage_summary(days)
@app.get("/usage/page", response_class=HTMLResponse)
async def get_usage_page(days: int = Query(7, description="Number of days to include in the usage summary")):
usage_data = usage_tracker.get_usage_summary(days)
html_content = generate_usage_html(usage_data, days)
return HTMLResponse(content=html_content)
def generate_usage_html(usage_data: dict, days: int = 7):
model_labels = list(usage_data['model_usage_period'].keys())
model_counts = list(usage_data['model_usage_period'].values())
endpoint_labels = list(usage_data['endpoint_usage_period'].keys())
endpoint_counts = list(usage_data['endpoint_usage_period'].values())
daily_dates = list(usage_data['daily_usage_period'].keys())
daily_requests = [data['requests'] for data in usage_data['daily_usage_period'].values()]
daily_unique_ips = [data['unique_ips_count'] for data in usage_data['daily_usage_period'].values()]
model_usage_all_time_rows = "\n".join([
f"""
<tr>
<td>{model}</td>
<td>{stats['total_requests']}</td>
<td>{datetime.datetime.fromisoformat(stats['first_used']).strftime("%Y-%m-%d %H:%M")}</td>
<td>{datetime.datetime.fromisoformat(stats['last_used']).strftime("%Y-%m-%d %H:%M")}</td>
</tr>
""" for model, stats in usage_data['all_time_model_usage'].items()
])
api_usage_all_time_rows = "\n".join([
f"""
<tr>
<td>{endpoint}</td>
<td>{stats['total_requests']}</td>
<td>{datetime.datetime.fromisoformat(stats['first_used']).strftime("%Y-%m-%d %H:%M")}</td>
<td>{datetime.datetime.fromisoformat(stats['last_used']).strftime("%Y-%m-%d %H:%M")}</td>
</tr>
""" for endpoint, stats in usage_data['all_time_endpoint_usage'].items()
])
daily_usage_table_rows = "\n".join([
f"""
<tr>
<td>{date}</td>
<td>{data['requests']}</td>
<td>{data['unique_ips_count']}</td>
</tr>
""" for date, data in usage_data['daily_usage_period'].items()
])
recent_requests_rows = "\n".join([
f"""
<tr>
<td>{datetime.datetime.fromisoformat(req['timestamp']).strftime("%Y-%m-%d %H:%M:%S")}</td>
<td>{req['model']}</td>
<td>{req['endpoint']}</td>
<td>{req['ip_address']}</td>
<td>{req['user_agent']}</td>
</tr>
""" for req in usage_data['recent_requests']
])
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Lokiai AI - Usage Statistics</title>
<link href="[https://fonts.googleapis.com/css2?family=Inter:wght@300;400;600;700&display=swap](https://fonts.googleapis.com/css2?family=Inter:wght@300;400;600;700&display=swap)" rel="stylesheet">
<script src="[https://cdn.jsdelivr.net/npm/chart.js](https://cdn.jsdelivr.net/npm/chart.js)"></script>
<style>
:root {{
--bg-dark: #0f1011;
--bg-darker: #070708;
--text-primary: #e6e6e6;
--text-secondary: #8c8c8c;
--border-color: #2c2c2c;
--accent-color: #3a6ee0;
--accent-hover: #4a7ef0;
--chart-bg-light: rgba(58, 110, 224, 0.2);
--chart-border-light: #3a6ee0;
}}
body {{
font-family: 'Inter', sans-serif;
background-color: var(--bg-dark);
color: var(--text-primary);
max-width: 1200px;
margin: 0 auto;
padding: 40px 20px;
line-height: 1.6;
}}
.logo {{
display: flex;
align-items: center;
justify-content: center;
margin-bottom: 30px;
}}
.logo h1 {{
font-weight: 700;
font-size: 2.8em;
color: var(--text-primary);
margin-left: 15px;
}}
.logo img {{
width: 70px;
height: 70px;
border-radius: 12px;
box-shadow: 0 5px 15px rgba(0,0,0,0.2);
}}
.container {{
background-color: var(--bg-darker);
border-radius: 16px;
padding: 30px;
box-shadow: 0 20px 50px rgba(0,0,0,0.4);
border: 1px solid var(--border-color);
}}
h2, h3 {{
color: var(--text-primary);
border-bottom: 2px solid var(--border-color);
padding-bottom: 12px;
margin-top: 40px;
margin-bottom: 25px;
font-weight: 600;
font-size: 1.8em;
}}
.summary-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin-bottom: 30px;
}}
.summary-card {{
background-color: var(--bg-dark);
border-radius: 10px;
padding: 20px;
text-align: center;
border: 1px solid var(--border-color);
box-shadow: 0 8px 20px rgba(0,0,0,0.2);
transition: transform 0.2s ease-in-out;
}}
.summary-card:hover {{
transform: translateY(-5px);
}}
.summary-card h3 {{
margin-top: 0;
font-size: 1.1em;
color: var(--text-secondary);
border-bottom: none;
padding-bottom: 0;
margin-bottom: 10px;
}}
.summary-card p {{
font-size: 2.2em;
font-weight: 700;
color: var(--accent-color);
margin: 0;
}}
table {{
width: 100%;
border-collapse: separate;
border-spacing: 0;
margin-bottom: 40px;
background-color: var(--bg-dark);
border-radius: 10px;
overflow: hidden;
box-shadow: 0 8px 20px rgba(0,0,0,0.2);
}}
th, td {{
border: 1px solid var(--border-color);
padding: 15px;
text-align: left;
transition: background-color 0.3s ease;
}}
th {{
background-color: #1a1a1a;
color: var(--text-primary);
font-weight: 600;
text-transform: uppercase;
font-size: 0.95em;
}}
tr:nth-child(even) {{
background-color: rgba(255,255,255,0.03);
}}
tr:hover {{
background-color: rgba(62,100,255,0.1);
}}
.chart-container {{
background-color: var(--bg-dark);
border-radius: 10px;
padding: 20px;
margin-bottom: 40px;
border: 1px solid var(--border-color);
box-shadow: 0 8px 20px rgba(0,0,0,0.2);
max-height: 400px;
position: relative;
}}
canvas {{
max-width: 100% !important;
height: auto !important;
}}
@media (max-width: 768px) {{
body {{
padding: 20px 10px;
}}
.container {{
padding: 20px;
}}
.logo h1 {{
font-size: 2em;
}}
.summary-card p {{
font-size: 1.8em;
}}
h2, h3 {{
font-size: 1.5em;
}}
table {{
font-size: 0.85em;
}}
th, td {{
padding: 10px;
}}
}}
</style>
</head>
<body>
<div class="container">
<div class="logo">
<img src="" alt="Lokiai AI Logo">
<h1>Lokiai AI Usage</h1>
</div>
<div class="summary-grid">
<div class="summary-card">
<h3>Total Requests (All Time)</h3>
<p>{usage_data['total_requests']}</p>
</div>
<div class="summary-card">
<h3>Unique IPs (All Time)</h3>
<p>{usage_data['unique_ips_total_count']}</p>
</div>
<div class="summary-card">
<h3>Models Used (Last {days} Days)</h3>
<p>{len(usage_data['model_usage_period'])}</p>
</div>
<div class="summary-card">
<h3>Endpoints Used (Last {days} Days)</h3>
<p>{len(usage_data['endpoint_usage_period'])}</p>
</div>
</div>
<h2>Daily Usage (Last {days} Days)</h2>
<div class="chart-container">
<canvas id="dailyRequestsChart"></canvas>
</div>
<table>
<thead>
<tr>
<th>Date</th>
<th>Requests</th>
<th>Unique IPs</th>
</tr>
</thead>
<tbody>
{daily_usage_table_rows}
</tbody>
</table>
<h2>Model Usage (Last {days} Days)</h2>
<div class="chart-container">
<canvas id="modelUsageChart"></canvas>
</div>
<h3>Model Usage (All Time Details)</h3>
<table>
<thead>
<tr>
<th>Model</th>
<th>Total Requests</th>
<th>First Used</th>
<th>Last Used</th>
</tr>
</thead>
<tbody>
{model_usage_all_time_rows}
</tbody>
</table>
<h2>API Endpoint Usage (Last {days} Days)</h2>
<div class="chart-container">
<canvas id="endpointUsageChart"></canvas>
</div>
<h3>API Endpoint Usage (All Time Details)</h3>
<table>
<thead>
<tr>
<th>Endpoint</th>
<th>Total Requests</th>
<th>First Used</th>
<th>Last Used</th>
</tr>
</thead>
<tbody>
{api_usage_all_time_rows}
</tbody>
</table>
<h2>Recent Requests (Last 20)</h2>
<table>
<thead>
<tr>
<th>Timestamp</th>
<th>Model</th>
<th>Endpoint</th>
<th>IP Address</th>
<th>User Agent</th>
</tr>
</thead>
<tbody>
{recent_requests_rows}
</tbody>
</table>
</div>
<script>
const modelLabels = {json.dumps(model_labels)};
const modelCounts = {json.dumps(model_counts)};
const endpointLabels = {json.dumps(endpoint_labels)};
const endpointCounts = {json.dumps(endpoint_counts)};
const dailyDates = {json.dumps(daily_dates)};
const dailyRequests = {json.dumps(daily_requests)};
const dailyUniqueIps = {json.dumps(daily_unique_ips)};
new Chart(document.getElementById('modelUsageChart'), {{
type: 'bar',
data: {{
labels: modelLabels,
datasets: [{{
label: 'Requests',
data: modelCounts,
backgroundColor: 'var(--chart-bg-light)',
borderColor: 'var(--chart-border-light)',
borderWidth: 1,
borderRadius: 5,
}}]
}},
options: {{
responsive: true,
maintainAspectRatio: false,
plugins: {{
legend: {{
labels: {{
color: 'var(--text-primary)'
}}
}},
title: {{
display: true,
text: 'Model Usage',
color: 'var(--text-primary)'
}}
}},
scales: {{
x: {{
ticks: {{
color: 'var(--text-secondary)'
}},
grid: {{
color: 'var(--border-color)'
}}
}},
y: {{
beginAtZero: true,
ticks: {{
color: 'var(--text-secondary)'
}},
grid: {{
color: 'var(--border-color)'
}}
}}
}}
}}
}});
new Chart(document.getElementById('endpointUsageChart'), {{
type: 'doughnut',
data: {{
labels: endpointLabels,
datasets: [{{
label: 'Requests',
data: endpointCounts,
backgroundColor: [
'#3a6ee0', '#5b8bff', '#8dc4ff', '#b3d8ff', '#d0e8ff',
'#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF'
],
hoverOffset: 4
}}]
}},
options: {{
responsive: true,
maintainAspectRatio: false,
plugins: {{
legend: {{
position: 'right',
labels: {{
color: 'var(--text-primary)'
}}
}},
title: {{
display: true,
text: 'API Endpoint Usage',
color: 'var(--text-primary)'
}}
}}
}}
}});
new Chart(document.getElementById('dailyRequestsChart'), {{
type: 'line',
data: {{
labels: dailyDates,
datasets: [
{{
label: 'Total Requests',
data: dailyRequests,
borderColor: 'var(--accent-color)',
backgroundColor: 'rgba(58, 110, 224, 0.1)',
fill: true,
tension: 0.3
}},
{{
label: 'Unique IPs',
data: dailyUniqueIps,
borderColor: '#FFCE56',
backgroundColor: 'rgba(255, 206, 86, 0.1)',
fill: true,
tension: 0.3
}}
]
}},
options: {{
responsive: true,
maintainAspectRatio: false,
plugins: {{
legend: {{
labels: {{
color: 'var(--text-primary)'
}}
}},
title: {{
display: true,
text: 'Daily Requests and Unique IPs',
color: 'var(--text-primary)'
}}
}},
scales: {{
x: {{
ticks: {{
color: 'var(--text-secondary)'
}},
grid: {{
color: 'var(--border-color)'
}}
}},
y: {{
beginAtZero: true,
ticks: {{
color: 'var(--text-secondary)'
}},
grid: {{
color: 'var(--border-color)'
}}
}}
}}
}}
}});
</script>
</body>
</html>
"""
return html_content
@app.on_event("startup")
async def startup_event():
global available_model_ids
models_data = load_models_data()
available_model_ids = [m['id'] for m in models_data if isinstance(m, dict) and 'id' in m]
print(f"Loaded {len(available_model_ids)} model IDs from models.json")
available_model_ids.extend(list(pollinations_models))
available_model_ids.extend(list(alternate_models))
available_model_ids.extend(list(mistral_models))
available_model_ids.extend(list(claude_3_models))
available_model_ids.extend(list(gemini_models))
available_model_ids = list(set(available_model_ids))
print(f"Total unique available models after merging: {len(available_model_ids)}")
for _ in range(MAX_SCRAPERS):
scraper_pool.append(cloudscraper.create_scraper())
print(f"Initialized Cloudscraper pool with {MAX_SCRAPERS} instances.")
env_vars = get_env_vars()
missing_vars = []
if not env_vars['api_keys'] or env_vars['api_keys'] == ['']:
missing_vars.append('API_KEYS')
if not env_vars['secret_api_endpoint']:
missing_vars.append('SECRET_API_ENDPOINT')
if not env_vars['secret_api_endpoint_2']:
missing_vars.append('SECRET_API_ENDPOINT_2')
if not env_vars['secret_api_endpoint_3']:
missing_vars.append('SECRET_API_ENDPOINT_3')
if not env_vars['secret_api_endpoint_4'] and any(model in pollinations_models for model in available_model_ids):
missing_vars.append('SECRET_API_ENDPOINT_4 (Pollinations.ai)')
if not env_vars['secret_api_endpoint_5'] and any(model in claude_3_models for model in available_model_ids):
missing_vars.append('SECRET_API_ENDPOINT_5 (Claude 3.x)')
if not env_vars['secret_api_endpoint_6'] and any(model in gemini_models for model in available_model_ids):
missing_vars.append('SECRET_API_ENDPOINT_6 (Gemini)')
if not env_vars['mistral_api'] and any(model in mistral_models for model in available_model_ids):
missing_vars.append('MISTRAL_API')
if not env_vars['mistral_key'] and any(model in mistral_models for model in available_model_ids):
missing_vars.append('MISTRAL_KEY')
if not env_vars['gemini_key'] and any(model in gemini_models for model in available_model_ids):
missing_vars.append('GEMINI_KEY')
if not env_vars['new_img'] and len(supported_image_models) > 0:
missing_vars.append('NEW_IMG (Image Generation)')
if missing_vars:
print(f"WARNING: The following critical environment variables are missing or empty: {', '.join(missing_vars)}")
print("Some server functionality (e.g., specific AI models, image generation) may be limited or unavailable.")
else:
print("All critical environment variables appear to be configured.")
print("Server started successfully!")
@app.on_event("shutdown")
async def shutdown_event():
client = get_async_client()
await client.aclose()
scraper_pool.clear()
usage_tracker.save_data()
print("Server shutdown complete!")
@app.get("/health")
async def health_check():
env_vars = get_env_vars()
missing_critical_vars = []
if not env_vars['api_keys'] or env_vars['api_keys'] == ['']:
missing_critical_vars.append('API_KEYS')
if not env_vars['secret_api_endpoint']:
missing_critical_vars.append('SECRET_API_ENDPOINT')
if not env_vars['secret_api_endpoint_2']:
missing_critical_vars.append('SECRET_API_ENDPOINT_2')
if not env_vars['secret_api_endpoint_3']:
missing_critical_vars.append('SECRET_API_ENDPOINT_3')
if not env_vars['secret_api_endpoint_4'] and any(model in pollinations_models for model in available_model_ids):
missing_critical_vars.append('SECRET_API_ENDPOINT_4 (Pollinations.ai)')
if not env_vars['secret_api_endpoint_5'] and any(model in claude_3_models for model in available_model_ids):
missing_critical_vars.append('SECRET_API_ENDPOINT_5 (Claude 3.x)')
if not env_vars['secret_api_endpoint_6'] and any(model in gemini_models for model in available_model_ids):
missing_critical_vars.append('SECRET_API_ENDPOINT_6 (Gemini)')
if not env_vars['mistral_api'] and any(model in mistral_models for model in available_model_ids):
missing_critical_vars.append('MISTRAL_API')
if not env_vars['mistral_key'] and any(model in mistral_models for model in available_model_ids):
missing_critical_vars.append('MISTRAL_KEY')
if not env_vars['gemini_key'] and any(model in gemini_models for model in available_model_ids):
missing_critical_vars.append('GEMINI_KEY')
if not env_vars['new_img'] and len(supported_image_models) > 0:
missing_critical_vars.append('NEW_IMG (Image Generation)')
health_status = {
"status": "healthy" if not missing_critical_vars else "unhealthy",
"missing_env_vars": missing_critical_vars,
"server_status": server_status,
"message": "Everything's lit! πŸš€" if not missing_critical_vars else "Uh oh, some env vars are missing. 😬"
}
return JSONResponse(content=health_status)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)