dfa32412 commited on
Commit
47fb5b5
·
verified ·
1 Parent(s): 9ed0434

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +303 -288
app.py CHANGED
@@ -1,288 +1,303 @@
1
- import os
2
- import httpx
3
- import json
4
- from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
- from fastapi.security import APIKeyHeader
6
- from fastapi.responses import StreamingResponse, JSONResponse
7
- import logging
8
- from contextlib import asynccontextmanager
9
- import typing
10
- import itertools # For key rotation
11
- import asyncio # For potential sleep during retry
12
-
13
- # --- Configuration ---
14
-
15
- # --- Client Authentication (Proxy Access) ---
16
- # Load Allowed Client API Keys (for clients talking to this proxy)
17
- ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
- if not ALLOWED_API_KEYS_STR:
19
- raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
- ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
- if not ALLOWED_KEYS:
22
- raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
- logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
-
25
- # --- Upstream API Configuration ---
26
- # URL to fetch upstream API keys from (one key per line)
27
- UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
-
29
- # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
- # Or required if the upstream target needs a key in a different way sometimes.
31
- # Let's make it optional now.
32
- DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
-
34
- # Upstream API Base URL
35
- OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://freeaichatplayground.com/api/v1")
36
- OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
-
38
- if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
- raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
-
41
- # --- Logging ---
42
- logging.basicConfig(level=logging.INFO)
43
- logger = logging.getLogger(__name__)
44
-
45
- # --- Authentication Dependency (Client -> Proxy) ---
46
- api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
- async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
- """Dependency to verify the client's API key provided to this proxy."""
49
- if not api_key_header:
50
- logger.warning("Missing Authorization header from client")
51
- raise HTTPException(status_code=401, detail="Missing Authorization header")
52
- parts = api_key_header.split()
53
- if len(parts) != 2 or parts[0].lower() != "bearer":
54
- logger.warning(f"Invalid Authorization header format from client.")
55
- raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
- client_api_key = parts[1]
57
- if client_api_key not in ALLOWED_KEYS:
58
- truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
- logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
- raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
- logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
- return client_api_key
63
-
64
- # --- Key Fetching and Rotation Logic ---
65
- async def fetch_upstream_keys(url: str) -> list[str]:
66
- """Fetches keys from the given URL, one key per line."""
67
- keys = []
68
- try:
69
- async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
- logger.info(f"Fetching upstream API keys from: {url}")
71
- response = await client.get(url)
72
- response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
- content = response.text
74
- keys = [line.strip() for line in content.splitlines() if line.strip()]
75
- logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
- if not keys:
77
- logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
- return keys
79
- except httpx.RequestError as e:
80
- logger.error(f"Error fetching upstream keys from {url}: {e}")
81
- return [] # Return empty list on fetch error
82
- except httpx.HTTPStatusError as e:
83
- logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
- logger.error(f"Response body: {e.response.text}")
85
- return [] # Return empty list on bad status
86
-
87
- # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
- @asynccontextmanager
89
- async def lifespan(app: FastAPI):
90
- # --- Initialize Upstream Key Iterator ---
91
- upstream_keys = []
92
- if UPSTREAM_KEYS_URL:
93
- upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
-
95
- if not upstream_keys:
96
- logger.warning("No upstream keys fetched from URL or URL not provided.")
97
- if DEFAULT_OPENAI_API_KEY:
98
- logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
- upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
- else:
101
- # Critical failure - no keys available
102
- logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
- # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
- # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
- # Or raise an exception here to prevent startup:
106
- raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
-
108
- # Store keys and create the cycling iterator in app.state
109
- app.state.upstream_api_keys = upstream_keys
110
- app.state.key_iterator = itertools.cycle(upstream_keys)
111
- logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
-
113
- # --- Initialize HTTPX Client ---
114
- logger.info("Initializing main HTTPX client...")
115
- timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
- client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
- app.state.http_client = client # Store client in app.state
118
- logger.info("HTTPX client initialized.")
119
-
120
- yield # Application runs here
121
-
122
- # --- Cleanup ---
123
- logger.info("Closing HTTPX client...")
124
- await app.state.http_client.aclose()
125
- logger.info("HTTPX client closed.")
126
- app.state.upstream_api_keys = [] # Clear keys
127
- app.state.key_iterator = None
128
- logger.info("Upstream keys cleared.")
129
-
130
- # --- FastAPI App ---
131
- app = FastAPI(lifespan=lifespan)
132
-
133
- # --- Streaming Helper ---
134
- async def yield_openai_chunks(response: httpx.Response):
135
- """Asynchronously yields chunks from the upstream response stream."""
136
- # (Content remains the same as before)
137
- logger.info("Starting to stream chunks from upstream...")
138
- try:
139
- async for chunk in response.aiter_bytes():
140
- yield chunk
141
- except Exception as e:
142
- logger.error(f"Error during streaming upstream response: {e}")
143
- finally:
144
- await response.aclose()
145
- logger.info("Upstream streaming response closed.")
146
-
147
- # --- Proxy Endpoint ---
148
- @app.post("/v1/chat/completions")
149
- async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
150
- """
151
- Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
152
- Uses rotated keys for upstream authentication.
153
- """
154
- client: httpx.AsyncClient = request.app.state.http_client
155
- key_iterator = request.app.state.key_iterator
156
-
157
- if not client or not key_iterator:
158
- logger.error("HTTPX client or Key Iterator not available (app state issue).")
159
- raise HTTPException(status_code=503, detail="Service temporarily unavailable")
160
-
161
- # --- Get Next Upstream API Key ---
162
- try:
163
- current_upstream_key = next(key_iterator)
164
- # Log rotation (optional, consider security of logging key info)
165
- # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
166
- except StopIteration:
167
- # This should not happen if lifespan logic is correct and keys were loaded
168
- logger.error("Upstream key iterator exhausted unexpectedly.")
169
- raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
170
- except Exception as e:
171
- logger.error(f"Unexpected error getting next key: {e}")
172
- raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
173
-
174
- # --- Get Request Data ---
175
- try:
176
- request_body = await request.body()
177
- payload = json.loads(request_body)
178
- except json.JSONDecodeError:
179
- raise HTTPException(status_code=400, detail="Invalid JSON body")
180
-
181
- is_streaming = payload.get("stream", False)
182
-
183
- # --- Prepare Upstream Request ---
184
- upstream_headers = {
185
- "Content-Type": request.headers.get("Content-Type", "application/json"),
186
- "Accept": request.headers.get("Accept", "application/json"),
187
- }
188
-
189
- # --- Upstream Authentication (Using Rotated Key) ---
190
- # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
191
- if "freeaichatplayground.com" in OPENAI_API_BASE:
192
- logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
193
- payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
194
- else:
195
- # Default to standard Bearer token authentication for upstream
196
- logger.debug("Using Authorization header for upstream authentication.")
197
- upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
198
-
199
- if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
200
- logger.info("Adding 'Accept: text/event-stream' for streaming request")
201
- upstream_headers["Accept"] = "text/event-stream, application/json"
202
-
203
- logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
204
-
205
- # --- Make Request to Upstream ---
206
- response = None # Define response here to ensure it's available in finally block
207
- try:
208
- req = client.build_request(
209
- "POST",
210
- OPENAI_CHAT_ENDPOINT, # Use the full URL
211
- json=payload,
212
- headers=upstream_headers,
213
- )
214
- response = await client.send(req, stream=True)
215
-
216
- # Check for immediate errors *before* processing body/stream
217
- if response.status_code >= 400:
218
- error_body = await response.aread() # Read error fully
219
- await response.aclose()
220
- logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
221
- try: detail = json.loads(error_body)
222
- except json.JSONDecodeError: detail = error_body.decode()
223
- raise HTTPException(status_code=response.status_code, detail=detail)
224
-
225
- # --- Handle Streaming Response ---
226
- if is_streaming:
227
- logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
228
- return StreamingResponse(
229
- yield_openai_chunks(response), # Generator handles closing response
230
- status_code=response.status_code,
231
- media_type=response.headers.get("content-type", "text/event-stream"),
232
- )
233
- # --- Handle Non-Streaming Response ---
234
- else:
235
- logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
236
- response_body = await response.aread()
237
- await response.aclose() # Ensure closed
238
- content_type = response.headers.get("content-type", "application/json")
239
- return Response( # Return raw response, FastAPI handles JSON content type
240
- content=response_body,
241
- status_code=response.status_code,
242
- media_type=content_type,
243
- )
244
-
245
- except httpx.TimeoutException as e:
246
- logger.error(f"Request to upstream timed out: {e}")
247
- if response: await response.aclose()
248
- raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
249
- except httpx.RequestError as e:
250
- logger.error(f"Error requesting upstream API: {e}")
251
- if response: await response.aclose()
252
- raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
253
- except HTTPException as e:
254
- # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
255
- if response and not response.is_closed: await response.aclose()
256
- raise e
257
- except Exception as e:
258
- logger.exception("An unexpected error occurred during response processing.")
259
- if response and not response.is_closed: await response.aclose()
260
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
261
-
262
-
263
- # --- Health Check Endpoint ---
264
- @app.get("/health")
265
- async def health_check():
266
- """Simple health check endpoint."""
267
- # Could add checks here, e.g., if keys were loaded
268
- key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
269
- return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
270
-
271
- # --- Main Execution Guard ---
272
- if __name__ == "__main__":
273
- import uvicorn
274
- # Startup checks are implicitly handled by config loading at the top
275
- print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
276
- print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
277
- if UPSTREAM_KEYS_URL:
278
- print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
279
- elif DEFAULT_OPENAI_API_KEY:
280
- print("Using single OPENAI_API_KEY for upstream.")
281
- else:
282
- print("ERROR: No upstream key source configured!") # Should have failed earlier
283
- print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
284
- print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
285
- print("---")
286
-
287
- uvicorn.run(app, host="0.0.0.0", port=7860)
288
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import httpx
3
+ import json
4
+ from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
+ from fastapi.security import APIKeyHeader
6
+ from fastapi.responses import StreamingResponse, JSONResponse
7
+ import logging
8
+ from contextlib import asynccontextmanager
9
+ import typing
10
+ import itertools # For key rotation
11
+ import asyncio # For potential sleep during retry
12
+
13
+ # --- Configuration ---
14
+
15
+ # --- Client Authentication (Proxy Access) ---
16
+ # Load Allowed Client API Keys (for clients talking to this proxy)
17
+ ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
+ if not ALLOWED_API_KEYS_STR:
19
+ raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
+ ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
+ if not ALLOWED_KEYS:
22
+ raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
+ logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
+
25
+ # --- Upstream API Configuration ---
26
+ # URL to fetch upstream API keys from (one key per line)
27
+ UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
+
29
+ # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
+ # Or required if the upstream target needs a key in a different way sometimes.
31
+ # Let's make it optional now.
32
+ DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
+
34
+ # Upstream API Base URL
35
+ OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://freeaichatplayground.com/api/v1")
36
+ OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
+
38
+ if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
+ raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
+
41
+ # --- Logging ---
42
+ logging.basicConfig(level=logging.INFO)
43
+ logger = logging.getLogger(__name__)
44
+
45
+ # --- Authentication Dependency (Client -> Proxy) ---
46
+ api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
+ async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
+ """Dependency to verify the client's API key provided to this proxy."""
49
+ if not api_key_header:
50
+ logger.warning("Missing Authorization header from client")
51
+ raise HTTPException(status_code=401, detail="Missing Authorization header")
52
+ parts = api_key_header.split()
53
+ if len(parts) != 2 or parts[0].lower() != "bearer":
54
+ logger.warning(f"Invalid Authorization header format from client.")
55
+ raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
+ client_api_key = parts[1]
57
+ if client_api_key not in ALLOWED_KEYS:
58
+ truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
+ logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
+ raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
+ logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
+ return client_api_key
63
+
64
+ # --- Key Fetching and Rotation Logic ---
65
+ async def fetch_upstream_keys(url: str) -> list[str]:
66
+ """Fetches keys from the given URL, one key per line."""
67
+ keys = []
68
+ try:
69
+ async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
+ logger.info(f"Fetching upstream API keys from: {url}")
71
+ response = await client.get(url)
72
+ response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
+ content = response.text
74
+ keys = [line.strip() for line in content.splitlines() if line.strip()]
75
+ logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
+ if not keys:
77
+ logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
+ return keys
79
+ except httpx.RequestError as e:
80
+ logger.error(f"Error fetching upstream keys from {url}: {e}")
81
+ return [] # Return empty list on fetch error
82
+ except httpx.HTTPStatusError as e:
83
+ logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
+ logger.error(f"Response body: {e.response.text}")
85
+ return [] # Return empty list on bad status
86
+
87
+ # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
+ @asynccontextmanager
89
+ async def lifespan(app: FastAPI):
90
+ # --- Initialize Upstream Key Iterator ---
91
+ upstream_keys = []
92
+ if UPSTREAM_KEYS_URL:
93
+ upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
+
95
+ if not upstream_keys:
96
+ logger.warning("No upstream keys fetched from URL or URL not provided.")
97
+ if DEFAULT_OPENAI_API_KEY:
98
+ logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
+ upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
+ else:
101
+ # Critical failure - no keys available
102
+ logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
+ # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
+ # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
+ # Or raise an exception here to prevent startup:
106
+ raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
+
108
+ # Store keys and create the cycling iterator in app.state
109
+ app.state.upstream_api_keys = upstream_keys
110
+ app.state.key_iterator = itertools.cycle(upstream_keys)
111
+ logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
+
113
+ # --- Initialize HTTPX Client ---
114
+ logger.info("Initializing main HTTPX client...")
115
+ timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
+ client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
+ app.state.http_client = client # Store client in app.state
118
+ logger.info("HTTPX client initialized.")
119
+
120
+ yield # Application runs here
121
+
122
+ # --- Cleanup ---
123
+ logger.info("Closing HTTPX client...")
124
+ await app.state.http_client.aclose()
125
+ logger.info("HTTPX client closed.")
126
+ app.state.upstream_api_keys = [] # Clear keys
127
+ app.state.key_iterator = None
128
+ logger.info("Upstream keys cleared.")
129
+
130
+ # --- FastAPI App ---
131
+ app = FastAPI(lifespan=lifespan)
132
+
133
+ # --- Streaming Helper ---
134
+ async def yield_openai_chunks(response: httpx.Response):
135
+ """Asynchronously yields chunks from the upstream response stream."""
136
+ # (Content remains the same as before)
137
+ logger.info("Starting to stream chunks from upstream...")
138
+ try:
139
+ buffer = ''
140
+ async for chunk in response.aiter_bytes():
141
+ buffer += chunk.decode()
142
+ while True:
143
+ index = buffer.find("\n")
144
+ if index != -1:
145
+ content = buffer[:index]
146
+ buffer = buffer[index + 1:]
147
+
148
+ if content.startswith("0:"):
149
+ content = content[2:].strip('"').replace("\\n","\n")
150
+ data = {"id":"123456-456789-123456","object":"chat.completion.chunk","choices":[{"delta":{"content":content},"index":0,"finish_reason":None}]}
151
+ yield "data: " + json.dumps(data) + "\n\n"
152
+ else:
153
+ break
154
+
155
+ yield "data: [DONE]\n\n"
156
+ except Exception as e:
157
+ logger.error(f"Error during streaming upstream response: {e}")
158
+ finally:
159
+ await response.aclose()
160
+ logger.info("Upstream streaming response closed.")
161
+
162
+ # --- Proxy Endpoint ---
163
+ @app.post("/v1/chat/completions")
164
+ async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
165
+ """
166
+ Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
167
+ Uses rotated keys for upstream authentication.
168
+ """
169
+ client: httpx.AsyncClient = request.app.state.http_client
170
+ key_iterator = request.app.state.key_iterator
171
+
172
+ if not client or not key_iterator:
173
+ logger.error("HTTPX client or Key Iterator not available (app state issue).")
174
+ raise HTTPException(status_code=503, detail="Service temporarily unavailable")
175
+
176
+ # --- Get Next Upstream API Key ---
177
+ try:
178
+ current_upstream_key = next(key_iterator)
179
+ # Log rotation (optional, consider security of logging key info)
180
+ # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
181
+ except StopIteration:
182
+ # This should not happen if lifespan logic is correct and keys were loaded
183
+ logger.error("Upstream key iterator exhausted unexpectedly.")
184
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
185
+ except Exception as e:
186
+ logger.error(f"Unexpected error getting next key: {e}")
187
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
188
+
189
+ # --- Get Request Data ---
190
+ try:
191
+ request_body = await request.body()
192
+ payload = json.loads(request_body)
193
+ except json.JSONDecodeError:
194
+ raise HTTPException(status_code=400, detail="Invalid JSON body")
195
+
196
+ is_streaming = payload.get("stream", False)
197
+
198
+ # --- Prepare Upstream Request ---
199
+ upstream_headers = {
200
+ "Content-Type": request.headers.get("Content-Type", "application/json"),
201
+ "Accept": request.headers.get("Accept", "application/json"),
202
+ }
203
+
204
+ # --- Upstream Authentication (Using Rotated Key) ---
205
+ # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
206
+ if "freeaichatplayground.com" in OPENAI_API_BASE:
207
+ logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
208
+ payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
209
+ else:
210
+ # Default to standard Bearer token authentication for upstream
211
+ logger.debug("Using Authorization header for upstream authentication.")
212
+ upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
213
+
214
+ if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
215
+ logger.info("Adding 'Accept: text/event-stream' for streaming request")
216
+ upstream_headers["Accept"] = "text/event-stream, application/json"
217
+
218
+ logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
219
+
220
+ # --- Make Request to Upstream ---
221
+ response = None # Define response here to ensure it's available in finally block
222
+ try:
223
+ req = client.build_request(
224
+ "POST",
225
+ OPENAI_CHAT_ENDPOINT, # Use the full URL
226
+ json=payload,
227
+ headers=upstream_headers,
228
+ )
229
+ response = await client.send(req, stream=True)
230
+
231
+ # Check for immediate errors *before* processing body/stream
232
+ if response.status_code >= 400:
233
+ error_body = await response.aread() # Read error fully
234
+ await response.aclose()
235
+ logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
236
+ try: detail = json.loads(error_body)
237
+ except json.JSONDecodeError: detail = error_body.decode()
238
+ raise HTTPException(status_code=response.status_code, detail=detail)
239
+
240
+ # --- Handle Streaming Response ---
241
+ if is_streaming:
242
+ logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
243
+ return StreamingResponse(
244
+ yield_openai_chunks(response), # Generator handles closing response
245
+ status_code=response.status_code,
246
+ media_type=response.headers.get("content-type", "text/event-stream"),
247
+ )
248
+ # --- Handle Non-Streaming Response ---
249
+ else:
250
+ logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
251
+ response_body = await response.aread()
252
+ await response.aclose() # Ensure closed
253
+ content_type = response.headers.get("content-type", "application/json")
254
+ return Response( # Return raw response, FastAPI handles JSON content type
255
+ content=response_body,
256
+ status_code=response.status_code,
257
+ media_type=content_type,
258
+ )
259
+
260
+ except httpx.TimeoutException as e:
261
+ logger.error(f"Request to upstream timed out: {e}")
262
+ if response: await response.aclose()
263
+ raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
264
+ except httpx.RequestError as e:
265
+ logger.error(f"Error requesting upstream API: {e}")
266
+ if response: await response.aclose()
267
+ raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
268
+ except HTTPException as e:
269
+ # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
270
+ if response and not response.is_closed: await response.aclose()
271
+ raise e
272
+ except Exception as e:
273
+ logger.exception("An unexpected error occurred during response processing.")
274
+ if response and not response.is_closed: await response.aclose()
275
+ raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
276
+
277
+
278
+ # --- Health Check Endpoint ---
279
+ @app.get("/health")
280
+ async def health_check():
281
+ """Simple health check endpoint."""
282
+ # Could add checks here, e.g., if keys were loaded
283
+ key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
284
+ return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
285
+
286
+ # --- Main Execution Guard ---
287
+ if __name__ == "__main__":
288
+ import uvicorn
289
+ # Startup checks are implicitly handled by config loading at the top
290
+ print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
291
+ print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
292
+ if UPSTREAM_KEYS_URL:
293
+ print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
294
+ elif DEFAULT_OPENAI_API_KEY:
295
+ print("Using single OPENAI_API_KEY for upstream.")
296
+ else:
297
+ print("ERROR: No upstream key source configured!") # Should have failed earlier
298
+ print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
299
+ print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
300
+ print("---")
301
+
302
+ uvicorn.run(app, host="0.0.0.0", port=7860)
303
+