sameernotes commited on
Commit
c2d88be
·
verified ·
1 Parent(s): be2097f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +26 -21
app.py CHANGED
@@ -5,7 +5,7 @@ from fastapi.responses import FileResponse, JSONResponse, HTMLResponse
5
  from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
6
  from fastapi.templating import Jinja2Templates
7
  from pydantic import BaseModel, EmailStr, Field
8
- from typing import List, Optional # Import Optional from typing
9
  import cv2
10
  import numpy as np
11
  import tensorflow as tf
@@ -92,7 +92,7 @@ class Token(BaseModel):
92
  token_type: str
93
 
94
  class TokenData(BaseModel):
95
- username: Optional[str] = None # Use Optional[str] instead of str | None
96
 
97
  class OCRResponse(BaseModel):
98
  sakshi_output: str
@@ -114,6 +114,7 @@ def get_db():
114
  db.close()
115
 
116
  async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
 
117
  user = get_user_by_username(db, username=token)
118
  if not user:
119
  raise HTTPException(
@@ -123,17 +124,19 @@ async def get_current_user(db: Session = Depends(get_db), token: str = Depends(o
123
  )
124
  return user
125
 
126
- async def get_current_active_user(current_user: UserResponse = Depends(get_current_user)):
 
127
  if not current_user.is_active:
128
  raise HTTPException(status_code=400, detail="Inactive user")
129
  return current_user
130
 
131
- async def get_current_admin_user(current_user: UserResponse = Depends(get_current_active_user)):
132
  if not current_user.is_admin:
133
  raise HTTPException(status_code=403, detail="Not an administrator")
134
  return current_user
135
 
136
 
 
137
  # --- CRUD Operations ---
138
  def get_user(db: Session, user_id: int):
139
  return db.query(UserModel).filter(UserModel.id == user_id).first()
@@ -340,18 +343,20 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
340
  return {"access_token": access_token, "token_type": "bearer"}
341
 
342
  @app.post("/signup", response_model=UserResponse)
343
- async def signup(user: UserCreate = Depends(), db: Session = Depends(get_db)):
344
- db_user = get_user_by_username(db, username=user.username)
345
- if db_user:
346
- raise HTTPException(status_code=400, detail="Username already registered")
347
- db_user = get_user_by_email(db, email=user.email)
348
- if db_user:
349
- raise HTTPException(status_code=400, detail="Email already registered")
350
- return create_user(db=db, user=user)
 
 
351
 
352
  # OCR Endpoint
353
  @app.post("/process/", response_model=OCRResponse)
354
- async def process(file: UploadFile = File(...), current_user: UserResponse = Depends(get_current_active_user)):
355
  if not file.content_type.startswith("image/"):
356
  raise HTTPException(status_code=400, detail="File must be an image")
357
 
@@ -381,44 +386,44 @@ async def process(file: UploadFile = File(...), current_user: UserResponse = De
381
  os.unlink(temp_file.name)
382
 
383
  @app.get("/word-detection/")
384
- async def get_word_detection(current_user: UserResponse = Depends(get_current_active_user)):
385
  if 'word_detection' not in session_files or not os.path.exists(session_files['word_detection']):
386
  raise HTTPException(status_code=404, detail="Word detection image not found")
387
  return FileResponse(session_files['word_detection'])
388
 
389
  @app.get("/prediction/")
390
- async def get_prediction(current_user: UserResponse = Depends(get_current_active_user)):
391
  if 'prediction' not in session_files or not os.path.exists(session_files['prediction']):
392
  raise HTTPException(status_code=404, detail="Prediction image not found")
393
  return FileResponse(session_files['prediction'])
394
 
395
  # Feedback Endpoint
396
  @app.post("/feedback/", response_model=FeedbackResponse)
397
- async def create_feedback_route(feedback: FeedbackCreate, current_user: UserResponse = Depends(get_current_active_user),db: Session = Depends(get_db)):
398
  return create_feedback(db=db, feedback=feedback)
399
 
400
  # Admin Endpoints
401
  @app.get("/admin/users/", response_model=List[UserResponse])
402
- async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: UserResponse = Depends(get_current_admin_user)):
403
  users = get_users(db, skip=skip, limit=limit)
404
  return users
405
 
406
  @app.get("/admin/users/{user_id}", response_model=UserResponse)
407
- async def read_user(user_id: int, db: Session = Depends(get_db), current_user: UserResponse = Depends(get_current_admin_user)):
408
  db_user = get_user(db, user_id=user_id)
409
  if db_user is None:
410
  raise HTTPException(status_code=404, detail="User not found")
411
  return db_user
412
 
413
  @app.put("/admin/users/{user_id}", response_model=UserResponse)
414
- async def update_user_route(user_id: int, user: UserUpdate, db: Session = Depends(get_db), current_user: UserResponse = Depends(get_current_admin_user)):
415
  updated_user = update_user(db=db, user_id=user_id, user=user)
416
  if updated_user is None:
417
  raise HTTPException(status_code=404, detail="User not found")
418
  return updated_user
419
 
420
  @app.delete("/admin/users/{user_id}", response_model=dict)
421
- async def delete_user_route(user_id: int, db: Session = Depends(get_db), current_user: UserResponse = Depends(get_current_admin_user)):
422
  if delete_user(db=db, user_id=user_id):
423
  return {"message": "User deleted successfully"}
424
  else:
@@ -426,7 +431,7 @@ async def delete_user_route(user_id: int, db: Session = Depends(get_db), current
426
 
427
 
428
  @app.get("/admin/feedback/", response_model=List[FeedbackResponse])
429
- async def read_feedback(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: UserResponse = Depends(get_current_admin_user)):
430
  feedback = get_feedback(db, skip=skip, limit=limit)
431
  return feedback
432
 
 
5
  from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
6
  from fastapi.templating import Jinja2Templates
7
  from pydantic import BaseModel, EmailStr, Field
8
+ from typing import List, Optional # Import Optional from typing
9
  import cv2
10
  import numpy as np
11
  import tensorflow as tf
 
92
  token_type: str
93
 
94
  class TokenData(BaseModel):
95
+ username: Optional[str] = None # Use Optional[str] instead of str | None
96
 
97
  class OCRResponse(BaseModel):
98
  sakshi_output: str
 
114
  db.close()
115
 
116
  async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
117
+ # Correctly retrieve user by matching token with username.
118
  user = get_user_by_username(db, username=token)
119
  if not user:
120
  raise HTTPException(
 
124
  )
125
  return user
126
 
127
+
128
+ async def get_current_active_user(current_user: UserModel = Depends(get_current_user)): #Use UserModel
129
  if not current_user.is_active:
130
  raise HTTPException(status_code=400, detail="Inactive user")
131
  return current_user
132
 
133
+ async def get_current_admin_user(current_user: UserModel = Depends(get_current_active_user)): #Use UserModel
134
  if not current_user.is_admin:
135
  raise HTTPException(status_code=403, detail="Not an administrator")
136
  return current_user
137
 
138
 
139
+
140
  # --- CRUD Operations ---
141
  def get_user(db: Session, user_id: int):
142
  return db.query(UserModel).filter(UserModel.id == user_id).first()
 
343
  return {"access_token": access_token, "token_type": "bearer"}
344
 
345
  @app.post("/signup", response_model=UserResponse)
346
+ async def signup(user: UserCreate, db: Session = Depends(get_db)):
347
+ db_user_username = get_user_by_username(db, username=user.username)
348
+ if db_user_username:
349
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
350
+ db_user_email = get_user_by_email(db, email=user.email)
351
+ if db_user_email:
352
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
353
+ created = create_user(db=db, user=user)
354
+ return created
355
+
356
 
357
  # OCR Endpoint
358
  @app.post("/process/", response_model=OCRResponse)
359
+ async def process(file: UploadFile = File(...), current_user: UserModel = Depends(get_current_active_user)):
360
  if not file.content_type.startswith("image/"):
361
  raise HTTPException(status_code=400, detail="File must be an image")
362
 
 
386
  os.unlink(temp_file.name)
387
 
388
  @app.get("/word-detection/")
389
+ async def get_word_detection(current_user: UserModel = Depends(get_current_active_user)):
390
  if 'word_detection' not in session_files or not os.path.exists(session_files['word_detection']):
391
  raise HTTPException(status_code=404, detail="Word detection image not found")
392
  return FileResponse(session_files['word_detection'])
393
 
394
  @app.get("/prediction/")
395
+ async def get_prediction(current_user: UserModel = Depends(get_current_active_user)):
396
  if 'prediction' not in session_files or not os.path.exists(session_files['prediction']):
397
  raise HTTPException(status_code=404, detail="Prediction image not found")
398
  return FileResponse(session_files['prediction'])
399
 
400
  # Feedback Endpoint
401
  @app.post("/feedback/", response_model=FeedbackResponse)
402
+ async def create_feedback_route(feedback: FeedbackCreate, current_user: UserModel = Depends(get_current_active_user),db: Session = Depends(get_db)):
403
  return create_feedback(db=db, feedback=feedback)
404
 
405
  # Admin Endpoints
406
  @app.get("/admin/users/", response_model=List[UserResponse])
407
+ async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_admin_user)):
408
  users = get_users(db, skip=skip, limit=limit)
409
  return users
410
 
411
  @app.get("/admin/users/{user_id}", response_model=UserResponse)
412
+ async def read_user(user_id: int, db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_admin_user)):
413
  db_user = get_user(db, user_id=user_id)
414
  if db_user is None:
415
  raise HTTPException(status_code=404, detail="User not found")
416
  return db_user
417
 
418
  @app.put("/admin/users/{user_id}", response_model=UserResponse)
419
+ async def update_user_route(user_id: int, user: UserUpdate, db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_admin_user)):
420
  updated_user = update_user(db=db, user_id=user_id, user=user)
421
  if updated_user is None:
422
  raise HTTPException(status_code=404, detail="User not found")
423
  return updated_user
424
 
425
  @app.delete("/admin/users/{user_id}", response_model=dict)
426
+ async def delete_user_route(user_id: int, db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_admin_user)):
427
  if delete_user(db=db, user_id=user_id):
428
  return {"message": "User deleted successfully"}
429
  else:
 
431
 
432
 
433
  @app.get("/admin/feedback/", response_model=List[FeedbackResponse])
434
+ async def read_feedback(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_admin_user)):
435
  feedback = get_feedback(db, skip=skip, limit=limit)
436
  return feedback
437