dfa32412 commited on
Commit
661798b
·
verified ·
1 Parent(s): 98e086f

Upload app.py

Browse files
Files changed (1) hide show
  1. app.py +321 -303
app.py CHANGED
@@ -1,303 +1,321 @@
1
- import os
2
- import httpx
3
- import json
4
- from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
- from fastapi.security import APIKeyHeader
6
- from fastapi.responses import StreamingResponse, JSONResponse
7
- import logging
8
- from contextlib import asynccontextmanager
9
- import typing
10
- import itertools # For key rotation
11
- import asyncio # For potential sleep during retry
12
-
13
- # --- Configuration ---
14
-
15
- # --- Client Authentication (Proxy Access) ---
16
- # Load Allowed Client API Keys (for clients talking to this proxy)
17
- ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
- if not ALLOWED_API_KEYS_STR:
19
- raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
- ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
- if not ALLOWED_KEYS:
22
- raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
- logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
-
25
- # --- Upstream API Configuration ---
26
- # URL to fetch upstream API keys from (one key per line)
27
- UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
-
29
- # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
- # Or required if the upstream target needs a key in a different way sometimes.
31
- # Let's make it optional now.
32
- DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
-
34
- # Upstream API Base URL
35
- OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://freeaichatplayground.com/api/v1")
36
- OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
-
38
- if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
- raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
-
41
- # --- Logging ---
42
- logging.basicConfig(level=logging.INFO)
43
- logger = logging.getLogger(__name__)
44
-
45
- # --- Authentication Dependency (Client -> Proxy) ---
46
- api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
- async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
- """Dependency to verify the client's API key provided to this proxy."""
49
- if not api_key_header:
50
- logger.warning("Missing Authorization header from client")
51
- raise HTTPException(status_code=401, detail="Missing Authorization header")
52
- parts = api_key_header.split()
53
- if len(parts) != 2 or parts[0].lower() != "bearer":
54
- logger.warning(f"Invalid Authorization header format from client.")
55
- raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
- client_api_key = parts[1]
57
- if client_api_key not in ALLOWED_KEYS:
58
- truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
- logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
- raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
- logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
- return client_api_key
63
-
64
- # --- Key Fetching and Rotation Logic ---
65
- async def fetch_upstream_keys(url: str) -> list[str]:
66
- """Fetches keys from the given URL, one key per line."""
67
- keys = []
68
- try:
69
- async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
- logger.info(f"Fetching upstream API keys from: {url}")
71
- response = await client.get(url)
72
- response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
- content = response.text
74
- keys = [line.strip() for line in content.splitlines() if line.strip()]
75
- logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
- if not keys:
77
- logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
- return keys
79
- except httpx.RequestError as e:
80
- logger.error(f"Error fetching upstream keys from {url}: {e}")
81
- return [] # Return empty list on fetch error
82
- except httpx.HTTPStatusError as e:
83
- logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
- logger.error(f"Response body: {e.response.text}")
85
- return [] # Return empty list on bad status
86
-
87
- # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
- @asynccontextmanager
89
- async def lifespan(app: FastAPI):
90
- # --- Initialize Upstream Key Iterator ---
91
- upstream_keys = []
92
- if UPSTREAM_KEYS_URL:
93
- upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
-
95
- if not upstream_keys:
96
- logger.warning("No upstream keys fetched from URL or URL not provided.")
97
- if DEFAULT_OPENAI_API_KEY:
98
- logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
- upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
- else:
101
- # Critical failure - no keys available
102
- logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
- # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
- # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
- # Or raise an exception here to prevent startup:
106
- raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
-
108
- # Store keys and create the cycling iterator in app.state
109
- app.state.upstream_api_keys = upstream_keys
110
- app.state.key_iterator = itertools.cycle(upstream_keys)
111
- logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
-
113
- # --- Initialize HTTPX Client ---
114
- logger.info("Initializing main HTTPX client...")
115
- timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
- client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
- app.state.http_client = client # Store client in app.state
118
- logger.info("HTTPX client initialized.")
119
-
120
- yield # Application runs here
121
-
122
- # --- Cleanup ---
123
- logger.info("Closing HTTPX client...")
124
- await app.state.http_client.aclose()
125
- logger.info("HTTPX client closed.")
126
- app.state.upstream_api_keys = [] # Clear keys
127
- app.state.key_iterator = None
128
- logger.info("Upstream keys cleared.")
129
-
130
- # --- FastAPI App ---
131
- app = FastAPI(lifespan=lifespan)
132
-
133
- # --- Streaming Helper ---
134
- async def yield_openai_chunks(response: httpx.Response):
135
- """Asynchronously yields chunks from the upstream response stream."""
136
- # (Content remains the same as before)
137
- logger.info("Starting to stream chunks from upstream...")
138
- try:
139
- buffer = ''
140
- async for chunk in response.aiter_bytes():
141
- buffer += chunk.decode()
142
- while True:
143
- index = buffer.find("\n")
144
- if index != -1:
145
- content = buffer[:index]
146
- buffer = buffer[index + 1:]
147
-
148
- if content.startswith("0:"):
149
- content = content[2:][1:-1].replace("\\n","\n").replace('\\"','"').replace("\\\\","\\")
150
- data = {"id":"123456-456789-123456","object":"chat.completion.chunk","choices":[{"delta":{"content":content},"index":0,"finish_reason":None}]}
151
- yield "data: " + json.dumps(data) + "\n\n"
152
- else:
153
- break
154
-
155
- yield "data: [DONE]\n\n"
156
- except Exception as e:
157
- logger.error(f"Error during streaming upstream response: {e}")
158
- finally:
159
- await response.aclose()
160
- logger.info("Upstream streaming response closed.")
161
-
162
- # --- Proxy Endpoint ---
163
- @app.post("/v1/chat/completions")
164
- async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
165
- """
166
- Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
167
- Uses rotated keys for upstream authentication.
168
- """
169
- client: httpx.AsyncClient = request.app.state.http_client
170
- key_iterator = request.app.state.key_iterator
171
-
172
- if not client or not key_iterator:
173
- logger.error("HTTPX client or Key Iterator not available (app state issue).")
174
- raise HTTPException(status_code=503, detail="Service temporarily unavailable")
175
-
176
- # --- Get Next Upstream API Key ---
177
- try:
178
- current_upstream_key = next(key_iterator)
179
- # Log rotation (optional, consider security of logging key info)
180
- # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
181
- except StopIteration:
182
- # This should not happen if lifespan logic is correct and keys were loaded
183
- logger.error("Upstream key iterator exhausted unexpectedly.")
184
- raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
185
- except Exception as e:
186
- logger.error(f"Unexpected error getting next key: {e}")
187
- raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
188
-
189
- # --- Get Request Data ---
190
- try:
191
- request_body = await request.body()
192
- payload = json.loads(request_body)
193
- except json.JSONDecodeError:
194
- raise HTTPException(status_code=400, detail="Invalid JSON body")
195
-
196
- is_streaming = payload.get("stream", False)
197
-
198
- # --- Prepare Upstream Request ---
199
- upstream_headers = {
200
- "Content-Type": request.headers.get("Content-Type", "application/json"),
201
- "Accept": request.headers.get("Accept", "application/json"),
202
- }
203
-
204
- # --- Upstream Authentication (Using Rotated Key) ---
205
- # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
206
- if "freeaichatplayground.com" in OPENAI_API_BASE:
207
- logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
208
- payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
209
- else:
210
- # Default to standard Bearer token authentication for upstream
211
- logger.debug("Using Authorization header for upstream authentication.")
212
- upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
213
-
214
- if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
215
- logger.info("Adding 'Accept: text/event-stream' for streaming request")
216
- upstream_headers["Accept"] = "text/event-stream, application/json"
217
-
218
- logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
219
-
220
- # --- Make Request to Upstream ---
221
- response = None # Define response here to ensure it's available in finally block
222
- try:
223
- req = client.build_request(
224
- "POST",
225
- OPENAI_CHAT_ENDPOINT, # Use the full URL
226
- json=payload,
227
- headers=upstream_headers,
228
- )
229
- response = await client.send(req, stream=True)
230
-
231
- # Check for immediate errors *before* processing body/stream
232
- if response.status_code >= 400:
233
- error_body = await response.aread() # Read error fully
234
- await response.aclose()
235
- logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
236
- try: detail = json.loads(error_body)
237
- except json.JSONDecodeError: detail = error_body.decode()
238
- raise HTTPException(status_code=response.status_code, detail=detail)
239
-
240
- # --- Handle Streaming Response ---
241
- if is_streaming:
242
- logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
243
- return StreamingResponse(
244
- yield_openai_chunks(response), # Generator handles closing response
245
- status_code=response.status_code,
246
- media_type=response.headers.get("content-type", "text/event-stream"),
247
- )
248
- # --- Handle Non-Streaming Response ---
249
- else:
250
- logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
251
- response_body = await response.aread()
252
- await response.aclose() # Ensure closed
253
- content_type = response.headers.get("content-type", "application/json")
254
- return Response( # Return raw response, FastAPI handles JSON content type
255
- content=response_body,
256
- status_code=response.status_code,
257
- media_type=content_type,
258
- )
259
-
260
- except httpx.TimeoutException as e:
261
- logger.error(f"Request to upstream timed out: {e}")
262
- if response: await response.aclose()
263
- raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
264
- except httpx.RequestError as e:
265
- logger.error(f"Error requesting upstream API: {e}")
266
- if response: await response.aclose()
267
- raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
268
- except HTTPException as e:
269
- # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
270
- if response and not response.is_closed: await response.aclose()
271
- raise e
272
- except Exception as e:
273
- logger.exception("An unexpected error occurred during response processing.")
274
- if response and not response.is_closed: await response.aclose()
275
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
276
-
277
-
278
- # --- Health Check Endpoint ---
279
- @app.get("/health")
280
- async def health_check():
281
- """Simple health check endpoint."""
282
- # Could add checks here, e.g., if keys were loaded
283
- key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
284
- return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
285
-
286
- # --- Main Execution Guard ---
287
- if __name__ == "__main__":
288
- import uvicorn
289
- # Startup checks are implicitly handled by config loading at the top
290
- print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
291
- print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
292
- if UPSTREAM_KEYS_URL:
293
- print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
294
- elif DEFAULT_OPENAI_API_KEY:
295
- print("Using single OPENAI_API_KEY for upstream.")
296
- else:
297
- print("ERROR: No upstream key source configured!") # Should have failed earlier
298
- print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
299
- print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
300
- print("---")
301
-
302
- uvicorn.run(app, host="0.0.0.0", port=7860)
303
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import httpx
3
+ import json
4
+ from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
+ from fastapi.security import APIKeyHeader
6
+ from fastapi.responses import StreamingResponse, JSONResponse
7
+ import logging
8
+ from contextlib import asynccontextmanager
9
+ import typing
10
+ import itertools # For key rotation
11
+ import asyncio # For potential sleep during retry
12
+
13
+ # --- Configuration ---
14
+
15
+ # --- Client Authentication (Proxy Access) ---
16
+ # Load Allowed Client API Keys (for clients talking to this proxy)
17
+ ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
+ if not ALLOWED_API_KEYS_STR:
19
+ raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
+ ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
+ if not ALLOWED_KEYS:
22
+ raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
+ logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
+
25
+ # --- Upstream API Configuration ---
26
+ # URL to fetch upstream API keys from (one key per line)
27
+ UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
+
29
+ # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
+ # Or required if the upstream target needs a key in a different way sometimes.
31
+ # Let's make it optional now.
32
+ DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
+
34
+ # Upstream API Base URL
35
+ OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://freeaichatplayground.com/api/v1")
36
+ OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
+
38
+ if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
+ raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
+
41
+ # --- Logging ---
42
+ logging.basicConfig(level=logging.INFO)
43
+ logger = logging.getLogger(__name__)
44
+
45
+ # --- Authentication Dependency (Client -> Proxy) ---
46
+ api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
+ async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
+ """Dependency to verify the client's API key provided to this proxy."""
49
+ if not api_key_header:
50
+ logger.warning("Missing Authorization header from client")
51
+ raise HTTPException(status_code=401, detail="Missing Authorization header")
52
+ parts = api_key_header.split()
53
+ if len(parts) != 2 or parts[0].lower() != "bearer":
54
+ logger.warning(f"Invalid Authorization header format from client.")
55
+ raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
+ client_api_key = parts[1]
57
+ if client_api_key not in ALLOWED_KEYS:
58
+ truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
+ logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
+ raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
+ logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
+ return client_api_key
63
+
64
+ # --- Key Fetching and Rotation Logic ---
65
+ async def fetch_upstream_keys(url: str) -> list[str]:
66
+ """Fetches keys from the given URL, one key per line."""
67
+ keys = []
68
+ try:
69
+ async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
+ logger.info(f"Fetching upstream API keys from: {url}")
71
+ response = await client.get(url)
72
+ response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
+ content = response.text
74
+ keys = [line.strip() for line in content.splitlines() if line.strip()]
75
+ logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
+ if not keys:
77
+ logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
+ return keys
79
+ except httpx.RequestError as e:
80
+ logger.error(f"Error fetching upstream keys from {url}: {e}")
81
+ return [] # Return empty list on fetch error
82
+ except httpx.HTTPStatusError as e:
83
+ logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
+ logger.error(f"Response body: {e.response.text}")
85
+ return [] # Return empty list on bad status
86
+
87
+ # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
+ @asynccontextmanager
89
+ async def lifespan(app: FastAPI):
90
+ # --- Initialize Upstream Key Iterator ---
91
+ upstream_keys = []
92
+ if UPSTREAM_KEYS_URL:
93
+ upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
+
95
+ if not upstream_keys:
96
+ logger.warning("No upstream keys fetched from URL or URL not provided.")
97
+ if DEFAULT_OPENAI_API_KEY:
98
+ logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
+ upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
+ else:
101
+ # Critical failure - no keys available
102
+ logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
+ # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
+ # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
+ # Or raise an exception here to prevent startup:
106
+ raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
+
108
+ # Store keys and create the cycling iterator in app.state
109
+ app.state.upstream_api_keys = upstream_keys
110
+ app.state.key_iterator = itertools.cycle(upstream_keys)
111
+ logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
+
113
+ # --- Initialize HTTPX Client ---
114
+ logger.info("Initializing main HTTPX client...")
115
+ timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
+ client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
+ app.state.http_client = client # Store client in app.state
118
+ logger.info("HTTPX client initialized.")
119
+
120
+ yield # Application runs here
121
+
122
+ # --- Cleanup ---
123
+ logger.info("Closing HTTPX client...")
124
+ await app.state.http_client.aclose()
125
+ logger.info("HTTPX client closed.")
126
+ app.state.upstream_api_keys = [] # Clear keys
127
+ app.state.key_iterator = None
128
+ logger.info("Upstream keys cleared.")
129
+
130
+ # --- FastAPI App ---
131
+ app = FastAPI(lifespan=lifespan)
132
+
133
+ # --- Streaming Helper ---
134
+ async def yield_openai_chunks(response: httpx.Response):
135
+ """Asynchronously yields chunks from the upstream response stream."""
136
+ # (Content remains the same as before)
137
+ logger.info("Starting to stream chunks from upstream...")
138
+ try:
139
+ buffer = ''
140
+ async for chunk in response.aiter_bytes():
141
+ buffer += chunk.decode()
142
+ while True:
143
+ index = buffer.find("\n")
144
+ if index != -1:
145
+ content = buffer[:index]
146
+ buffer = buffer[index + 1:]
147
+
148
+ if content.startswith("0:"):
149
+ content = content[2:][1:-1].replace("\\n","\n").replace('\\"','"').replace("\\\\","\\")
150
+ data = {"id":"123456-456789-123456","object":"chat.completion.chunk","choices":[{"delta":{"content":content},"index":0,"finish_reason":None}]}
151
+ yield "data: " + json.dumps(data) + "\n\n"
152
+ else:
153
+ break
154
+
155
+ yield "data: [DONE]\n\n"
156
+ except Exception as e:
157
+ logger.error(f"Error during streaming upstream response: {e}")
158
+ finally:
159
+ await response.aclose()
160
+ logger.info("Upstream streaming response closed.")
161
+
162
+ # --- Proxy Endpoint ---
163
+ @app.post("/v1/chat/completions")
164
+ async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
165
+ """
166
+ Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
167
+ Uses rotated keys for upstream authentication.
168
+ """
169
+ client: httpx.AsyncClient = request.app.state.http_client
170
+ key_iterator = request.app.state.key_iterator
171
+
172
+ if not client or not key_iterator:
173
+ logger.error("HTTPX client or Key Iterator not available (app state issue).")
174
+ raise HTTPException(status_code=503, detail="Service temporarily unavailable")
175
+
176
+ # --- Get Next Upstream API Key ---
177
+ try:
178
+ current_upstream_key = next(key_iterator)
179
+ # Log rotation (optional, consider security of logging key info)
180
+ # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
181
+ except StopIteration:
182
+ # This should not happen if lifespan logic is correct and keys were loaded
183
+ logger.error("Upstream key iterator exhausted unexpectedly.")
184
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
185
+ except Exception as e:
186
+ logger.error(f"Unexpected error getting next key: {e}")
187
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
188
+
189
+ # --- Get Request Data ---
190
+ try:
191
+ request_body = await request.body()
192
+ payload = json.loads(request_body)
193
+ config = {}
194
+
195
+ if payload.get("temperature") is not None:
196
+ config["temperature"] = payload.get("temperature")
197
+ del payload["temperature"]
198
+
199
+
200
+ if payload.get("max_tokens") is not None:
201
+ config["maxTokens"] = payload.get("max_tokens")
202
+ del payload["max_tokens"]
203
+
204
+ if payload.get("top_p") is not None:
205
+ config["topP"] = payload.get("top_p")
206
+ del payload["top_p"]
207
+
208
+ if len(config) > 0:
209
+ payload["config"] = config
210
+
211
+ except json.JSONDecodeError:
212
+ raise HTTPException(status_code=400, detail="Invalid JSON body")
213
+
214
+ is_streaming = payload.get("stream", False)
215
+
216
+ # --- Prepare Upstream Request ---
217
+ upstream_headers = {
218
+ "Content-Type": request.headers.get("Content-Type", "application/json"),
219
+ "Accept": request.headers.get("Accept", "application/json"),
220
+ }
221
+
222
+ # --- Upstream Authentication (Using Rotated Key) ---
223
+ # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
224
+ if "freeaichatplayground.com" in OPENAI_API_BASE:
225
+ logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
226
+ payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
227
+ else:
228
+ # Default to standard Bearer token authentication for upstream
229
+ logger.debug("Using Authorization header for upstream authentication.")
230
+ upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
231
+
232
+ if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
233
+ logger.info("Adding 'Accept: text/event-stream' for streaming request")
234
+ upstream_headers["Accept"] = "text/event-stream, application/json"
235
+
236
+ logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
237
+
238
+ # --- Make Request to Upstream ---
239
+ response = None # Define response here to ensure it's available in finally block
240
+ try:
241
+ req = client.build_request(
242
+ "POST",
243
+ OPENAI_CHAT_ENDPOINT, # Use the full URL
244
+ json=payload,
245
+ headers=upstream_headers,
246
+ )
247
+ response = await client.send(req, stream=True)
248
+
249
+ # Check for immediate errors *before* processing body/stream
250
+ if response.status_code >= 400:
251
+ error_body = await response.aread() # Read error fully
252
+ await response.aclose()
253
+ logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
254
+ try: detail = json.loads(error_body)
255
+ except json.JSONDecodeError: detail = error_body.decode()
256
+ raise HTTPException(status_code=response.status_code, detail=detail)
257
+
258
+ # --- Handle Streaming Response ---
259
+ if is_streaming:
260
+ logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
261
+ return StreamingResponse(
262
+ yield_openai_chunks(response), # Generator handles closing response
263
+ status_code=response.status_code,
264
+ media_type=response.headers.get("content-type", "text/event-stream"),
265
+ )
266
+ # --- Handle Non-Streaming Response ---
267
+ else:
268
+ logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
269
+ response_body = await response.aread()
270
+ await response.aclose() # Ensure closed
271
+ content_type = response.headers.get("content-type", "application/json")
272
+ return Response( # Return raw response, FastAPI handles JSON content type
273
+ content=response_body,
274
+ status_code=response.status_code,
275
+ media_type=content_type,
276
+ )
277
+
278
+ except httpx.TimeoutException as e:
279
+ logger.error(f"Request to upstream timed out: {e}")
280
+ if response: await response.aclose()
281
+ raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
282
+ except httpx.RequestError as e:
283
+ logger.error(f"Error requesting upstream API: {e}")
284
+ if response: await response.aclose()
285
+ raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
286
+ except HTTPException as e:
287
+ # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
288
+ if response and not response.is_closed: await response.aclose()
289
+ raise e
290
+ except Exception as e:
291
+ logger.exception("An unexpected error occurred during response processing.")
292
+ if response and not response.is_closed: await response.aclose()
293
+ raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
294
+
295
+
296
+ # --- Health Check Endpoint ---
297
+ @app.get("/health")
298
+ async def health_check():
299
+ """Simple health check endpoint."""
300
+ # Could add checks here, e.g., if keys were loaded
301
+ key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
302
+ return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
303
+
304
+ # --- Main Execution Guard ---
305
+ if __name__ == "__main__":
306
+ import uvicorn
307
+ # Startup checks are implicitly handled by config loading at the top
308
+ print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
309
+ print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
310
+ if UPSTREAM_KEYS_URL:
311
+ print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
312
+ elif DEFAULT_OPENAI_API_KEY:
313
+ print("Using single OPENAI_API_KEY for upstream.")
314
+ else:
315
+ print("ERROR: No upstream key source configured!") # Should have failed earlier
316
+ print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
317
+ print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
318
+ print("---")
319
+
320
+ uvicorn.run(app, host="0.0.0.0", port=7860)
321
+