Spaces:
Running
Running
Refactor summarization and email sending logic; improve error handling and environment variable checks
f50b29d
| import os | |
| from datetime import datetime | |
| from email import encoders | |
| from email.mime.base import MIMEBase | |
| from pytz import timezone | |
| import sib_api_v3_sdk | |
| from sib_api_v3_sdk.rest import ApiException | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| MAIL_API_KEY = os.getenv("MAIL_API") | |
| def get_current_time(): | |
| fmt = "%d-%m-%Y %H:%M:%S %Z%z" | |
| now_utc = datetime.now(timezone('UTC')) | |
| now_kolkata = now_utc.astimezone(timezone('Asia/Kolkata')) | |
| return now_kolkata.strftime(fmt) | |
| def mail_body(generation_details): | |
| body = f""" | |
| Hello, | |
| These are the details of the Blogs Posted at ReXplore: Science @ Fingertips. | |
| Date & Time: {get_current_time()} | |
| {generation_details} | |
| Regards, | |
| Nayan Kasturi (Raanna), | |
| Developer & Maintainer, | |
| ReXplore. | |
| """ | |
| return body.strip() | |
| def create_attachment(content, filename): | |
| attachment = MIMEBase('application', 'octet-stream') | |
| attachment.set_payload(content) | |
| encoders.encode_base64(attachment) | |
| attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"') | |
| return attachment | |
| def send_email(generation_details): | |
| configuration = sib_api_v3_sdk.Configuration() | |
| configuration.api_key['api-key'] = MAIL_API_KEY | |
| api_client = sib_api_v3_sdk.ApiClient(configuration) | |
| transactional_api = sib_api_v3_sdk.TransactionalEmailsApi(api_client) | |
| email_content = mail_body(generation_details) | |
| attachment_obj = create_attachment(email_content.encode('utf-8'), "data.txt") | |
| subject = f"New Blog Batch Published to ReXplore at {get_current_time()}" | |
| sender = { | |
| "name": "Project Gatekeeper", | |
| "email": "[email protected]" | |
| } | |
| reply_to = { | |
| "name": "Project Gatekeeper", | |
| "email": "[email protected]" | |
| } | |
| recipients = [{"email": "[email protected]"}] | |
| attachments = [{ | |
| "content": attachment_obj.get_payload(), | |
| "name": attachment_obj.get_filename() | |
| }] | |
| email = sib_api_v3_sdk.SendSmtpEmail( | |
| to=recipients, | |
| reply_to=reply_to, | |
| attachment=attachments, | |
| text_content=email_content, | |
| sender=sender, | |
| subject=subject | |
| ) | |
| try: | |
| transactional_api.send_transac_email(email) | |
| print("Email Sent") | |
| return True | |
| except ApiException as e: | |
| print("Failed to send email:") | |
| print(f"Exception when calling SMTPApi->send_transac_email: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| generation_details = "Example: 5 blogs generated and posted successfully." | |
| if send_email(generation_details): | |
| print("Email sent successfully.") | |
| else: | |
| print("Email sending failed.") | |