Spaces:
Runtime error
Runtime error
| # database_functions.py | |
| import psycopg2 | |
| import random | |
| import string | |
| from urllib.parse import urlparse | |
| from datetime import datetime | |
| import json | |
| from dotenv import load_dotenv | |
| import os | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Get the database URL from the environment variables | |
| url = os.getenv("DATABASE_URL") | |
| if not url: | |
| raise ValueError("DATABASE_URL is not set in the environment variables") | |
| parsed_url = urlparse(url) | |
| # Extract connection parameters | |
| db_config = { | |
| 'user': parsed_url.username, | |
| 'password': parsed_url.password, | |
| 'host': parsed_url.hostname, | |
| 'port': parsed_url.port, | |
| 'database': parsed_url.path.lstrip('/') | |
| } | |
| # Since we define password in schema, we will just generate password | |
| def generate_password(): | |
| characters = string.ascii_letters + string.digits + string.punctuation | |
| password = ''.join(random.choice(characters) for _ in range(8)) | |
| return password | |
| # add student method (privacy with only class & indexNo) | |
| def add_user_privacy(class_name, index_no): | |
| connection = psycopg2.connect(**db_config) | |
| cursor = connection.cursor() | |
| password = generate_password() | |
| dbMsg = "" | |
| try: | |
| # Check if user with the same email already exists | |
| cursor.execute("SELECT id FROM oc_students WHERE class = %s and index_no = %s", (class_name,index_no)) | |
| existing_user = cursor.fetchone() | |
| if existing_user: | |
| user_id = existing_user[0] | |
| dbMsg = "User already exists" | |
| else: | |
| # If user doesn't exist, insert a new user | |
| cursor.execute("INSERT INTO oc_students (index_no, class, hashPassword) VALUES (%s, %s, %s) RETURNING id", | |
| (index_no, class_name, password)) | |
| user_id = cursor.fetchone()[0] # Fetch the ID of the newly inserted user | |
| connection.commit() # without this, data is not persist on db! | |
| dbMsg = "User Created" | |
| return user_id, dbMsg | |
| except psycopg2.Error as e: | |
| return "Error adding user:" + str(e) | |
| def add_submission(userid, transcribed_text, ai_responses, scores, feedback, questionNo): | |
| connection = psycopg2.connect(**db_config) | |
| cursor = connection.cursor() | |
| dbMsg = "" | |
| try: | |
| current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| cursor.execute("INSERT INTO oc_submissions (userid, datetime, Transcribed_text, AI_conversation_responses, Scores, Feedback, questionNo) " | |
| "VALUES (%s, %s, %s, %s, %s, %s, %s)", | |
| (userid, current_datetime, transcribed_text, ai_responses, scores, feedback, questionNo)) | |
| connection.commit() | |
| dbMsg = "Submission added" | |
| except psycopg2.Error as e: | |
| print("Error adding submission:", e) | |
| finally: | |
| if connection: | |
| cursor.close() | |
| connection.close() | |
| print("PostgreSQL connection is closed") | |
| def get_submissions_by_date_and_class(from_date, to_date, class_name, display_ai_feedback): | |
| # Connect to the database | |
| conn = psycopg2.connect(**db_config) | |
| cursor = conn.cursor() | |
| try: | |
| print(f"From Date: {from_date}") | |
| print(f"To Date: {to_date}") | |
| print(f"Class Name: {class_name}") | |
| # Swap from_date and to_date if from_date is later than to_date | |
| if from_date > to_date: | |
| from_date, to_date = to_date, from_date | |
| query = """ | |
| SELECT s.index_no, s.class, sub.datetime, sub.questionNo, sub.transcribed_text, | |
| CASE WHEN %s THEN sub.ai_conversation_responses ELSE NULL END AS ai_conversation_responses, | |
| sub.userid | |
| FROM oc_students AS s | |
| JOIN oc_submissions AS sub ON s.id = sub.userid | |
| WHERE TO_DATE(sub.datetime::text, 'YYYY-MM-DD') BETWEEN TO_DATE(%s, 'YYYY-MM-DD') AND TO_DATE(%s, 'YYYY-MM-DD') | |
| AND s.class = %s | |
| ORDER BY sub.userid, sub.questionNo, sub.datetime DESC | |
| """ | |
| cursor.execute(query, (display_ai_feedback, from_date, to_date, class_name)) | |
| results = cursor.fetchall() | |
| if results: | |
| return generate_report_as_json(results, display_ai_feedback) | |
| else: | |
| return [{"Email": "No data found for the selected date range and class", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}] | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return [{"Email": "Error occurred while fetching data", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}] | |
| finally: | |
| cursor.close() | |
| conn.close() | |
| def generate_report_as_json(results, display_ai_feedback): | |
| user_ids_info = [] # To store tuples of (UserID, Name, Class) | |
| user_question_map = {} # To map UserID to answered questions | |
| if results: | |
| for result in results: | |
| user_id = result[6] # Assuming UserID is at index 6 | |
| # Storing tuples of (UserID, Name, Class) | |
| user_info = (user_id, result[0], result[1]) # (UserID, Name, Class) | |
| if user_info not in user_ids_info: | |
| user_ids_info.append(user_info) | |
| # Creating a map of UserIDs to answered questions | |
| question = result[3] # Assuming Question number is at index 3 | |
| details = { | |
| "Datetime": result[2].strftime("%Y-%m-%d %H:%M:%S") if result[2] else "", | |
| "Question": question, | |
| "Student Response": result[4], | |
| "AI Feedback": result[5] if display_ai_feedback else "Not displayed" | |
| } | |
| if user_id in user_question_map: | |
| user_question_map[user_id].append(details) | |
| else: | |
| user_question_map[user_id] = [details] | |
| report_data = [] | |
| for user_info in user_ids_info: | |
| user_id, name, class_ = user_info | |
| user_dict = { | |
| "Index No": name, | |
| "Class": class_, | |
| "Questions": [] | |
| } | |
| question_numbers = [1, 2, 3] # List of required question numbers | |
| if user_id in user_question_map: | |
| user_questions = user_question_map[user_id] | |
| for question_details in user_questions: | |
| question_data = { | |
| "Question": question_details["Question"], | |
| "Datetime": question_details["Datetime"], | |
| "Student Response": question_details["Student Response"], | |
| "AI Feedback": question_details["AI Feedback"] | |
| } | |
| user_dict["Questions"].append(question_data) | |
| # Remove answered question number from the (fixed list) | |
| if question_data["Question"] in question_numbers: | |
| question_numbers.remove(question_data["Question"]) | |
| # Add NA entries for unanswered questions | |
| for missing_question in question_numbers: | |
| missing_question_data = { | |
| "Question": missing_question, | |
| "Datetime": "NA", | |
| "Student Response": "NA", | |
| "AI Feedback": "NA" if display_ai_feedback else "Not displayed" | |
| } | |
| user_dict["Questions"].append(missing_question_data) | |
| # Sort the user's questions by question number before appending to report | |
| user_dict["Questions"] = sorted(user_dict["Questions"], key=lambda x: x['Question']) | |
| report_data.append(user_dict) | |
| return json.dumps(report_data, indent=4) | |
| def getUniqueSubmitDate(): | |
| # Connect to the database | |
| conn = psycopg2.connect(**db_config) | |
| cursor = conn.cursor() | |
| try: | |
| # Fetch all submissions on the provided date | |
| cursor.execute(""" | |
| SELECT DISTINCT DATE(datetime) AS unique_date | |
| FROM public.oc_submissions | |
| ORDER BY unique_date desc | |
| LIMIT 14; | |
| """) | |
| dates = [str(row[0]) for row in cursor.fetchall()] | |
| return dates | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return [{"Error": "Error occurred while fetching data"}] | |
| finally: | |
| cursor.close() | |
| conn.close() | |
| def getUniqueClass(): | |
| # Connect to the database | |
| conn = psycopg2.connect(**db_config) | |
| cursor = conn.cursor() | |
| try: | |
| # Fetch all submissions on the provided date | |
| cursor.execute(""" | |
| SELECT DISTINCT s.class | |
| FROM oc_students AS s | |
| JOIN oc_submissions AS sub ON s.id = sub.userid | |
| ORDER BY s.class | |
| """) | |
| listClass = [str(row[0]) for row in cursor.fetchall()] | |
| return listClass | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return [{"Error": "Error occurred while fetching data"}] | |
| finally: | |
| cursor.close() | |
| conn.close() |