dfa32412 commited on
Commit
9ed0434
·
verified ·
1 Parent(s): 44b958f

Upload 2 files

Browse files
Files changed (2) hide show
  1. app.py +288 -0
  2. requirements.txt +3 -0
app.py ADDED
@@ -0,0 +1,288 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import httpx
3
+ import json
4
+ from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
+ from fastapi.security import APIKeyHeader
6
+ from fastapi.responses import StreamingResponse, JSONResponse
7
+ import logging
8
+ from contextlib import asynccontextmanager
9
+ import typing
10
+ import itertools # For key rotation
11
+ import asyncio # For potential sleep during retry
12
+
13
+ # --- Configuration ---
14
+
15
+ # --- Client Authentication (Proxy Access) ---
16
+ # Load Allowed Client API Keys (for clients talking to this proxy)
17
+ ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
+ if not ALLOWED_API_KEYS_STR:
19
+ raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
+ ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
+ if not ALLOWED_KEYS:
22
+ raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
+ logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
+
25
+ # --- Upstream API Configuration ---
26
+ # URL to fetch upstream API keys from (one key per line)
27
+ UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
+
29
+ # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
+ # Or required if the upstream target needs a key in a different way sometimes.
31
+ # Let's make it optional now.
32
+ DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
+
34
+ # Upstream API Base URL
35
+ OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://freeaichatplayground.com/api/v1")
36
+ OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
+
38
+ if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
+ raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
+
41
+ # --- Logging ---
42
+ logging.basicConfig(level=logging.INFO)
43
+ logger = logging.getLogger(__name__)
44
+
45
+ # --- Authentication Dependency (Client -> Proxy) ---
46
+ api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
+ async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
+ """Dependency to verify the client's API key provided to this proxy."""
49
+ if not api_key_header:
50
+ logger.warning("Missing Authorization header from client")
51
+ raise HTTPException(status_code=401, detail="Missing Authorization header")
52
+ parts = api_key_header.split()
53
+ if len(parts) != 2 or parts[0].lower() != "bearer":
54
+ logger.warning(f"Invalid Authorization header format from client.")
55
+ raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
+ client_api_key = parts[1]
57
+ if client_api_key not in ALLOWED_KEYS:
58
+ truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
+ logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
+ raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
+ logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
+ return client_api_key
63
+
64
+ # --- Key Fetching and Rotation Logic ---
65
+ async def fetch_upstream_keys(url: str) -> list[str]:
66
+ """Fetches keys from the given URL, one key per line."""
67
+ keys = []
68
+ try:
69
+ async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
+ logger.info(f"Fetching upstream API keys from: {url}")
71
+ response = await client.get(url)
72
+ response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
+ content = response.text
74
+ keys = [line.strip() for line in content.splitlines() if line.strip()]
75
+ logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
+ if not keys:
77
+ logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
+ return keys
79
+ except httpx.RequestError as e:
80
+ logger.error(f"Error fetching upstream keys from {url}: {e}")
81
+ return [] # Return empty list on fetch error
82
+ except httpx.HTTPStatusError as e:
83
+ logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
+ logger.error(f"Response body: {e.response.text}")
85
+ return [] # Return empty list on bad status
86
+
87
+ # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
+ @asynccontextmanager
89
+ async def lifespan(app: FastAPI):
90
+ # --- Initialize Upstream Key Iterator ---
91
+ upstream_keys = []
92
+ if UPSTREAM_KEYS_URL:
93
+ upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
+
95
+ if not upstream_keys:
96
+ logger.warning("No upstream keys fetched from URL or URL not provided.")
97
+ if DEFAULT_OPENAI_API_KEY:
98
+ logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
+ upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
+ else:
101
+ # Critical failure - no keys available
102
+ logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
+ # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
+ # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
+ # Or raise an exception here to prevent startup:
106
+ raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
+
108
+ # Store keys and create the cycling iterator in app.state
109
+ app.state.upstream_api_keys = upstream_keys
110
+ app.state.key_iterator = itertools.cycle(upstream_keys)
111
+ logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
+
113
+ # --- Initialize HTTPX Client ---
114
+ logger.info("Initializing main HTTPX client...")
115
+ timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
+ client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
+ app.state.http_client = client # Store client in app.state
118
+ logger.info("HTTPX client initialized.")
119
+
120
+ yield # Application runs here
121
+
122
+ # --- Cleanup ---
123
+ logger.info("Closing HTTPX client...")
124
+ await app.state.http_client.aclose()
125
+ logger.info("HTTPX client closed.")
126
+ app.state.upstream_api_keys = [] # Clear keys
127
+ app.state.key_iterator = None
128
+ logger.info("Upstream keys cleared.")
129
+
130
+ # --- FastAPI App ---
131
+ app = FastAPI(lifespan=lifespan)
132
+
133
+ # --- Streaming Helper ---
134
+ async def yield_openai_chunks(response: httpx.Response):
135
+ """Asynchronously yields chunks from the upstream response stream."""
136
+ # (Content remains the same as before)
137
+ logger.info("Starting to stream chunks from upstream...")
138
+ try:
139
+ async for chunk in response.aiter_bytes():
140
+ yield chunk
141
+ except Exception as e:
142
+ logger.error(f"Error during streaming upstream response: {e}")
143
+ finally:
144
+ await response.aclose()
145
+ logger.info("Upstream streaming response closed.")
146
+
147
+ # --- Proxy Endpoint ---
148
+ @app.post("/v1/chat/completions")
149
+ async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
150
+ """
151
+ Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
152
+ Uses rotated keys for upstream authentication.
153
+ """
154
+ client: httpx.AsyncClient = request.app.state.http_client
155
+ key_iterator = request.app.state.key_iterator
156
+
157
+ if not client or not key_iterator:
158
+ logger.error("HTTPX client or Key Iterator not available (app state issue).")
159
+ raise HTTPException(status_code=503, detail="Service temporarily unavailable")
160
+
161
+ # --- Get Next Upstream API Key ---
162
+ try:
163
+ current_upstream_key = next(key_iterator)
164
+ # Log rotation (optional, consider security of logging key info)
165
+ # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
166
+ except StopIteration:
167
+ # This should not happen if lifespan logic is correct and keys were loaded
168
+ logger.error("Upstream key iterator exhausted unexpectedly.")
169
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
170
+ except Exception as e:
171
+ logger.error(f"Unexpected error getting next key: {e}")
172
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
173
+
174
+ # --- Get Request Data ---
175
+ try:
176
+ request_body = await request.body()
177
+ payload = json.loads(request_body)
178
+ except json.JSONDecodeError:
179
+ raise HTTPException(status_code=400, detail="Invalid JSON body")
180
+
181
+ is_streaming = payload.get("stream", False)
182
+
183
+ # --- Prepare Upstream Request ---
184
+ upstream_headers = {
185
+ "Content-Type": request.headers.get("Content-Type", "application/json"),
186
+ "Accept": request.headers.get("Accept", "application/json"),
187
+ }
188
+
189
+ # --- Upstream Authentication (Using Rotated Key) ---
190
+ # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
191
+ if "freeaichatplayground.com" in OPENAI_API_BASE:
192
+ logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
193
+ payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
194
+ else:
195
+ # Default to standard Bearer token authentication for upstream
196
+ logger.debug("Using Authorization header for upstream authentication.")
197
+ upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
198
+
199
+ if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
200
+ logger.info("Adding 'Accept: text/event-stream' for streaming request")
201
+ upstream_headers["Accept"] = "text/event-stream, application/json"
202
+
203
+ logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
204
+
205
+ # --- Make Request to Upstream ---
206
+ response = None # Define response here to ensure it's available in finally block
207
+ try:
208
+ req = client.build_request(
209
+ "POST",
210
+ OPENAI_CHAT_ENDPOINT, # Use the full URL
211
+ json=payload,
212
+ headers=upstream_headers,
213
+ )
214
+ response = await client.send(req, stream=True)
215
+
216
+ # Check for immediate errors *before* processing body/stream
217
+ if response.status_code >= 400:
218
+ error_body = await response.aread() # Read error fully
219
+ await response.aclose()
220
+ logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
221
+ try: detail = json.loads(error_body)
222
+ except json.JSONDecodeError: detail = error_body.decode()
223
+ raise HTTPException(status_code=response.status_code, detail=detail)
224
+
225
+ # --- Handle Streaming Response ---
226
+ if is_streaming:
227
+ logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
228
+ return StreamingResponse(
229
+ yield_openai_chunks(response), # Generator handles closing response
230
+ status_code=response.status_code,
231
+ media_type=response.headers.get("content-type", "text/event-stream"),
232
+ )
233
+ # --- Handle Non-Streaming Response ---
234
+ else:
235
+ logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
236
+ response_body = await response.aread()
237
+ await response.aclose() # Ensure closed
238
+ content_type = response.headers.get("content-type", "application/json")
239
+ return Response( # Return raw response, FastAPI handles JSON content type
240
+ content=response_body,
241
+ status_code=response.status_code,
242
+ media_type=content_type,
243
+ )
244
+
245
+ except httpx.TimeoutException as e:
246
+ logger.error(f"Request to upstream timed out: {e}")
247
+ if response: await response.aclose()
248
+ raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
249
+ except httpx.RequestError as e:
250
+ logger.error(f"Error requesting upstream API: {e}")
251
+ if response: await response.aclose()
252
+ raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
253
+ except HTTPException as e:
254
+ # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
255
+ if response and not response.is_closed: await response.aclose()
256
+ raise e
257
+ except Exception as e:
258
+ logger.exception("An unexpected error occurred during response processing.")
259
+ if response and not response.is_closed: await response.aclose()
260
+ raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
261
+
262
+
263
+ # --- Health Check Endpoint ---
264
+ @app.get("/health")
265
+ async def health_check():
266
+ """Simple health check endpoint."""
267
+ # Could add checks here, e.g., if keys were loaded
268
+ key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
269
+ return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
270
+
271
+ # --- Main Execution Guard ---
272
+ if __name__ == "__main__":
273
+ import uvicorn
274
+ # Startup checks are implicitly handled by config loading at the top
275
+ print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
276
+ print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
277
+ if UPSTREAM_KEYS_URL:
278
+ print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
279
+ elif DEFAULT_OPENAI_API_KEY:
280
+ print("Using single OPENAI_API_KEY for upstream.")
281
+ else:
282
+ print("ERROR: No upstream key source configured!") # Should have failed earlier
283
+ print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
284
+ print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
285
+ print("---")
286
+
287
+ uvicorn.run(app, host="0.0.0.0", port=7860)
288
+
requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ fastapi==0.115.12
2
+ httpx==0.28.1
3
+ uvicorn==0.34.2