Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Depends , Request,WebSocket , BackgroundTasks | |
import requests | |
import base64 | |
import logging | |
import time | |
import asyncio | |
import PyPDF2 | |
from docx import Document | |
from PIL import Image | |
import pytesseract | |
import io | |
from bs4 import BeautifulSoup | |
import re | |
from get_gmail_data import GmailDataExtractor | |
from dotenv import load_dotenv | |
import os | |
import random | |
import json | |
from supabase import create_client, Client | |
from openai import Client | |
app = FastAPI() | |
load_dotenv() | |
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID')) | |
url: str = os.getenv('SUPABASE_URL') | |
key: str = os.getenv('SUPABASE_KEY') | |
supabase: Client = create_client(url, key) | |
GOOGLE_CLIENT_ID = os.getenv('GOOGLE_CLIENT_ID') | |
GOOGLE_CLIENT_SECRET =os.getenv('GOOGLE_CLIENT_SECRET') | |
GOOGLE_REDIRECT_URI = os.getenv('GOOGLE_REDIRECT_URI') | |
GOOGLE_REDIRECT_URI_2= "https://omkar008-receipt-radar-redirect-uri-test.hf.space/receipt_radar/callback" | |
# Configure the logger | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
async def batch_processing_result(request: Request, background_tasks: BackgroundTasks): | |
body = await request.json() | |
batch_id = body.get('batch_job_id') | |
batch_job = client.batches.retrieve(batch_id) | |
# while batch_job.status == 'in_progress': | |
batch_job = client.batches.retrieve(batch_id) | |
print(batch_job.status) | |
# Add the processing task to background tasks | |
if batch_job.status == 'completed': | |
background_tasks.add_task(process_batch_data, batch_id) | |
return {"batch_job_status":'completed'} | |
# Immediately return success response | |
return {'batch_job_status':'notcompleted'} | |
async def process_batch_data(batch_id: str): | |
try: | |
batch_job = client.batches.retrieve(batch_id) | |
if batch_job.status == 'completed': | |
result_file_id = batch_job.output_file_id | |
result = client.files.content(result_file_id).content | |
json_str = result.decode('utf-8') | |
json_lines = json_str.splitlines() | |
res = [] | |
for line in json_lines: | |
if line.strip(): | |
try: | |
json_dict = json.loads(line) | |
res.append(json_dict) | |
except json.JSONDecodeError as e: | |
print(f"Error decoding JSON on line: {line}\nError: {e}") | |
for resp in res: | |
id = resp.get('custom_id') | |
res_id = id.split('-')[1] | |
output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content')) | |
categories = str(output.get('categories')) | |
summary = str(output.get('summary')) | |
supabase_resp = supabase.table("imdb_dataset").select("Description").eq("imdb_id", res_id).execute() | |
description = supabase_resp.data[0].get('Description') | |
insert_response = ( | |
supabase.table("imdb_outputs") | |
.insert({ | |
"id": res_id, | |
"description": description, | |
'categories': categories, | |
'summary': summary | |
}) | |
.execute() | |
) | |
print(f"Inserted data for ID: {res_id}") | |
except Exception as e: | |
print(f"Error in background processing: {str(e)}") | |
# You might want to log this error or handle it in some way | |
# @app.post("/test/batch_processing_result") | |
# async def batch_processing_result(request:Request): | |
# body = await request.json() | |
# print(body) | |
# res = [] | |
# res_dict = {} | |
# batch_id = body.get('batch_job_id') | |
# batch_job = client.batches.retrieve(batch_id) | |
# # while batch_job.status == 'in_progress': | |
# batch_job = client.batches.retrieve(batch_id) | |
# print(batch_job.status) | |
# if batch_job.status == 'completed': | |
# result_file_id = batch_job.output_file_id | |
# result = client.files.content(result_file_id).content | |
# json_str = result.decode('utf-8') | |
# json_lines = json_str.splitlines() | |
# for line in json_lines: | |
# if line.strip(): # Check if the line is not empty | |
# try: | |
# json_dict = json.loads(line) | |
# res.append(json_dict) | |
# except json.JSONDecodeError as e: | |
# print(f"Error decoding JSON on line: {line}\nError: {e}") | |
# for resp in res: | |
# id = resp.get('custom_id') | |
# res_id = id.split('-')[1] | |
# print(id) | |
# print(res_id) | |
# output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content')) | |
# categories = str(output.get('categories')) | |
# summary = str(output.get('summary')) | |
# prompt_tokens = resp.get('response').get('body').get('usage').get('prompt_tokens') | |
# completion_tokens = resp.get('response').get('body').get('usage').get('completion_tokens') | |
# supabase_resp = supabase.table("imdb_dataset").select("Description").eq("imdb_id",res_id).execute() | |
# print(supabase_resp.data) | |
# description = supabase_resp.data[0].get('Description') | |
# insert_response =( | |
# supabase.table("imdb_outputs") | |
# .insert({"id": res_id, "description": description ,'categories': categories , 'summary': summary }) | |
# .execute() | |
# ) | |
# print(insert_response) | |
# return {"batch_job_status":'true'} | |
# return {'batch_job_status':'false'} | |
async def testv1(request:Request): | |
system_prompt = ''' | |
Your goal is to extract movie categories from movie descriptions, as well as a 1-sentence summary for these movies. | |
You will be provided with a movie description, and you will output a json object containing the following information: | |
{ | |
categories: string[] // Array of categories based on the movie description, | |
summary: string // 1-sentence summary of the movie based on the movie description | |
} | |
Categories refer to the genre or type of the movie, like "action", "romance", "comedy", etc. Keep category names simple and use only lower case letters. | |
Movies can have several categories, but try to keep it under 3-4. Only mention the categories that are the most obvious based on the description. | |
''' | |
dataset = await request.json() | |
openai_tasks = [] | |
for ds in dataset.get('data'): | |
id = ds.get('imdb_id') #Adding this id into the requests construction so that once the batch process is completed we can | |
#add the result to the exact imdb_id defined in supabase table | |
description = ds.get('Description') | |
task = { | |
"custom_id": f"task-{id}", | |
"method": "POST", | |
"url": "/v1/chat/completions", | |
"body": { | |
"model": "gpt-4o-mini", | |
"temperature": 0.1, | |
"response_format": { | |
"type": "json_object" | |
}, | |
"messages": [ | |
{ | |
"role": "system", | |
"content": system_prompt | |
}, | |
{ | |
"role": "user", | |
"content": description | |
} | |
], | |
"response_format" : {"type": "json_object"} | |
} | |
} | |
openai_tasks.append(task) | |
json_obj = io.BytesIO() | |
for obj in openai_tasks: | |
json_obj.write((json.dumps(obj) + '\n').encode('utf-8')) | |
batch_file = client.files.create( | |
file=json_obj, | |
purpose="batch" | |
) | |
batch_job = client.batches.create( | |
input_file_id=batch_file.id, | |
endpoint="/v1/chat/completions", | |
completion_window="24h" | |
) | |
save_data = { | |
'batch_job_id':f"{batch_job.id}", | |
"batch_job_status": False | |
} | |
response = ( | |
supabase.table("batch_processing_details") | |
.insert(save_data) | |
.execute() | |
) | |
return {'data':f'Batch job is scheduled !'} | |
async def get_gmail_data(request:Request): | |
body = await request.json() | |
access_token = body.get("data") | |
user_input = None | |
data = GmailDataExtractor(access_token,user_input).extract_messages() | |
# print(data.error) | |
# print(data.json()) | |
return data | |
async def login_google(): | |
# oauth_url = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email&access_type=offline" | |
#Below is the URL to prompt the user to login to his specified gmail account and also give a readonly access to his gmail | |
oauth_url_hr = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email%20https://www.googleapis.com/auth/gmail.readonly&access_type=offline" | |
return { | |
"url_hr": oauth_url_hr | |
} | |
async def test_google(code:str): | |
# token_url = "https://accounts.google.com/o/oauth2/token" | |
print("Printing authorisation token") | |
print(code) | |
token_url="https://oauth2.googleapis.com/token" | |
data = { | |
"code": code, | |
"client_id": GOOGLE_CLIENT_ID, | |
"client_secret": GOOGLE_CLIENT_SECRET, | |
"redirect_uri": GOOGLE_REDIRECT_URI, | |
"grant_type": "authorization_code", | |
"access_type": "offline" | |
} | |
response = requests.post(token_url, data=data) | |
access_token = response.json().get("access_token") | |
print("printing access token , yo yo test") | |
print(access_token) | |
return {"access_token":response.json()} | |
# # if not access_token: | |
# # raise HTTPException(status_code=400, detail="Authorization code not provided") | |
# print("Entered this function, for testing purposes") | |
# brand_name = "louis vuitton" | |
# user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token}"}) | |
# page_token = None | |
# messages = [] | |
# # user_query = f"subject:((receipt {brand_name}) OR (receipts {brand_name}) OR (reçu {brand_name}) OR (reçus {brand_name}) OR (Quittung {brand_name}) OR (Quittungen {brand_name}) OR (aankoopbon {brand_name}) OR (aankoopbonnen {brand_name}) OR (recibo {brand_name}) OR (recibos {brand_name}) OR (ricevuta {brand_name}) OR (ricevute {brand_name}) OR (ontvangstbewijs {brand_name}) OR (ontvangstbewijzen {brand_name})) has:attachment" | |
# # user_query = f"{brand_name} label:^smartlabel_receipt" | |
# user_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice)) AND subject:amazon" | |
# # user_query = """("invoice" OR (("tracking" OR "track") AND ("delivery" OR "package"))) OR (subject:order OR subject:receipt OR subject:receipts OR subject:invoice OR subject:invoice)""" | |
# while True: | |
# # Construct Gmail API request with pageToken | |
# gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={user_query}" | |
# if page_token: | |
# gmail_url += f"&pageToken={page_token}" | |
# gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) | |
# gmail_data = gmail_response.json() | |
# # Check if there are messages in the response | |
# if "messages" in gmail_data: | |
# messages.extend(gmail_data["messages"]) | |
# # Check if there are more pages | |
# if "nextPageToken" in gmail_data: | |
# page_token = gmail_data["nextPageToken"] | |
# else: | |
# break # No more pages, exit the loop | |
# unique_thread_ids = set() | |
# filtered_data_list = [] | |
# for entry in messages: | |
# thread_id = entry['threadId'] | |
# if thread_id not in unique_thread_ids: | |
# unique_thread_ids.add(thread_id) | |
# filtered_data_list.append(entry) | |
# attachments = [] | |
# attachment_no = 0 | |
# data_new = {} | |
# for i,message in enumerate(messages) : | |
# # print(i) | |
# # print(message) | |
# if message: | |
# message_id = message.get("id") | |
# print(message_id) | |
# if message_id: | |
# message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
# message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) | |
# message_data = message_response.json() | |
# print("printing message_data response json") | |
# print(message_data) | |
# print("Finished printing message_data response json") | |
# subject = '' | |
# body = '' | |
# print("printing body") | |
# print(message_data['snippet']) | |
# if 'payload' in message_data and 'headers' in message_data['payload']: | |
# headers = message_data['payload']['headers'] | |
# for header in headers: | |
# if header['name'] == 'Subject': | |
# subject = header['value'] | |
# if 'parts' in message_data['payload']: | |
# parts = message_data['payload']['parts'] | |
# print("printing parts") | |
# print(parts) | |
# for part in parts: | |
# if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': | |
# body_data = part['body']['data'] | |
# print("printing body data") | |
# print(body_data) | |
# body = base64.urlsafe_b64decode(body_data) | |
# print("Subject:", subject) | |
# if body: | |
# text,links=extract_text_and_links(body) | |
# if text: | |
# print("Printing extracted Text: ") | |
# print(text) | |
# else: | |
# print("No text found or there was some error parsing.") | |
# if links: | |
# print("\nLinks:") | |
# for link_text, link_url in links: | |
# print(f"{link_text}: {link_url}") | |
# else: | |
# print("No links found or there was some error in parsing or maybe don't use for loop.") | |
# # Check for parts in the message payload | |
# if "payload" in message_data and "parts" in message_data["payload"]: | |
# for part in message_data["payload"]["parts"]: | |
# if "body" in part and "attachmentId" in part["body"]: | |
# attachment_id = part["body"]["attachmentId"] | |
# attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
# attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) | |
# attachment_data = attachment_response.json() | |
# data = attachment_data.get("data") | |
# filename = part.get("filename", "untitled.txt") | |
# if data: | |
# data_new[filename]=data[:10] | |
# # attachment_content = base64.urlsafe_b64decode(data) | |
# # extracted_text = await extract_text_from_attachment(filename, attachment_content) | |
# attachment_no+=1 | |
# return {"attachment_count":attachment_no,"attachment_content":data_new} | |
def extract_text_and_links(html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Extract text | |
text = soup.get_text() | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Extract links | |
links = [] | |
for link in soup.find_all('a', href=True): | |
links.append((link.text, link['href'])) | |
return text, links | |
async def auth_google(request: Request): | |
data = await request.json() | |
code = data.get("access_token") | |
brand_name = data.get("brand_name") | |
print("Printing the access token") | |
print(code) | |
if not code: | |
raise HTTPException(status_code=400, detail="Authorization code not provided") | |
access_token_new = code | |
user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token_new}"}) | |
page_token = None | |
messages = [] | |
user_query = f"subject:((receipt {brand_name}) OR (receipts {brand_name}) OR (reçu {brand_name}) OR (reçus {brand_name}) OR (Quittung {brand_name}) OR (Quittungen {brand_name}) OR (aankoopbon {brand_name}) OR (aankoopbonnen {brand_name}) OR (recibo {brand_name}) OR (recibos {brand_name}) OR (ricevuta {brand_name}) OR (ricevute {brand_name}) OR (ontvangstbewijs {brand_name}) OR (ontvangstbewijzen {brand_name})) has:attachment" | |
# user_query = f"{brand_name} label:^smartlabel_receipt" | |
while True: | |
# Construct Gmail API request with pageToken | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={user_query}" | |
if page_token: | |
gmail_url += f"&pageToken={page_token}" | |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token_new}"}) | |
gmail_data = gmail_response.json() | |
# Check if there are messages in the response | |
if "messages" in gmail_data: | |
messages.extend(gmail_data["messages"]) | |
# Check if there are more pages | |
if "nextPageToken" in gmail_data: | |
page_token = gmail_data["nextPageToken"] | |
else: | |
break # No more pages, exit the loop | |
attachments = [] | |
attachment_no = 0 | |
data_new = {} | |
for i,message in enumerate(messages) : | |
# print(i) | |
# print(message) | |
if message: | |
message_id = message.get("id") | |
print(message_id) | |
if message_id: | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token_new}"}) | |
message_data = message_response.json() | |
if "payload" in message_data: | |
payload = message_data["payload"] | |
if "body" in payload and "data" in payload["body"]: | |
body_data = payload["body"]["data"] | |
body_content = base64.urlsafe_b64decode(body_data.encode("UTF-8")).decode("UTF-8") | |
print("Body Content:") | |
print(body_content) | |
# Check for parts in the message payload | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
for part in message_data["payload"]["parts"]: | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token_new}"}) | |
attachment_data = attachment_response.json() | |
data = attachment_data.get("data") | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
data_new[filename]=data | |
attachment_content = base64.urlsafe_b64decode(data) | |
extracted_text = await extract_text_from_attachment(filename, attachment_content) | |
attachment_no+=1 | |
return {"attachment_count":attachment_no,"attachment_content":data_new} | |
async def send_chunked_data(websocket: WebSocket, filename: str, data: str): | |
chunk_size = 1024 # Set an appropriate chunk size | |
for i in range(0, len(data), chunk_size): | |
await websocket.send_json({"filename": filename, "data_chunk": data[i:i + chunk_size]}) | |
await asyncio.sleep(0.4) | |
await websocket.send_text("FinishedThisAttachment") | |
async def extract_text_from_pdf(pdf_data): | |
with io.BytesIO(pdf_data) as pdf_file: | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
text = "" | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
text += page.extract_text() | |
return text | |
async def extract_text_from_docx(docx_data): | |
doc = Document(io.BytesIO(docx_data)) | |
text = "" | |
for para in doc.paragraphs: | |
text += para.text + "\n" | |
return text | |
async def extract_text_from_attachment(filename, data): | |
if filename.endswith('.pdf'): | |
return await extract_text_from_pdf(data) | |
elif filename.endswith('.docx'): | |
return await extract_text_from_docx(data) | |
else: | |
# Add handling for other document types if needed | |
return "Unsupported document type" | |
async def test_websocket(websocket: WebSocket): | |
await websocket.accept() | |
logger.info("Hi hi succefull in connecting !!") | |
data = await websocket.receive_text() | |
logger.info("Received JSON data: %s", data) | |
def get_messages(code:str): | |
logging.info("entered into the get_messages") | |
access_token = code | |
print("printing access_token") | |
print(access_token) | |
page_token = None | |
messages = [] | |
jobs_query = "subject:receipt OR subject:receipts has:attachment" | |
while True: | |
# Construct Gmail API request with pageToken | |
print("into the gmail") | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}" | |
if page_token: | |
gmail_url += f"&pageToken={page_token}" | |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) | |
logging.info(f"{gmail_response}") | |
print(gmail_response) | |
gmail_data = gmail_response.json() | |
# Check if there are messages in the response | |
if "messages" in gmail_data: | |
messages.extend(gmail_data["messages"]) | |
# Check if there are more pages | |
if "nextPageToken" in gmail_data: | |
page_token = gmail_data["nextPageToken"] | |
else: | |
break # No more pages, exit the loop | |
print("returning the messages") | |
unique_thread_ids = set() | |
filtered_data_list = [] | |
for entry in messages: | |
thread_id = entry['threadId'] | |
if thread_id not in unique_thread_ids: | |
unique_thread_ids.add(thread_id) | |
filtered_data_list.append(entry) | |
return filtered_data_list | |
async def event_generator(code:str): | |
logging.info("entered into the event_generator") | |
access_token = code | |
messages=get_messages(access_token) | |
print(len(messages)) | |
await websocket.send_json({"total_messages":len(messages)}) | |
await websocket.send_text("CompletedSendingTotalMessagesLength") | |
attachments = [] | |
prev_data="" | |
data_new={} | |
attachment_no=0 | |
batch_size = 5 | |
prev_filename = None | |
for i,message in enumerate(messages) : | |
print(i) | |
logging.info(f"{i}") | |
logging.info(f"{message}") | |
print(message) | |
if message: | |
message_id = message.get("id") | |
thread_id = message.get("threadId") | |
print(message_id) | |
if message_id: | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) | |
message_data = message_response.json() | |
# Check for parts in the message payload | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
for part in message_data["payload"]["parts"]: | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
print(attachment_id) | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) | |
attachment_data = attachment_response.json() | |
data = attachment_data.get("data",{}) | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
data_new[filename]=str(data[:10]) | |
attachment_content = base64.urlsafe_b64decode(data) | |
logging.info(filename) | |
extracted_text = await extract_text_from_attachment(filename, attachment_content) | |
logging.info(extracted_text) | |
await send_chunked_data(websocket, filename, data) | |
attachment_no+=1 | |
await websocket.send_text("CompletedFetchingMessages") | |
await event_generator(data) | |
logging.info("Closing connection") | |
await websocket.close() |