Spaces:
Building
Building
import asyncio | |
from fastapi import FastAPI, Request, Form | |
from fastapi.responses import HTMLResponse, JSONResponse | |
from fastapi.staticfiles import StaticFiles | |
from playwright.async_api import async_playwright | |
import json | |
import os | |
from datetime import datetime | |
from pathlib import Path | |
app = FastAPI() | |
# ملف حفظ المنشورات | |
OUTPUT_FILE = "posts.json" | |
LIMIT = 5 | |
# تثبيت ملفات الستاتيك (لو حبيت تضيف CSS أو JS خارجي لاحقاً) | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
HTML_PAGE = """ | |
<!DOCTYPE html> | |
<html lang="ar" dir="rtl"> | |
<head> | |
<meta charset="UTF-8" /> | |
<title>استخراج منشورات فيسبوك - نورا</title> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 30px; } | |
input, button { padding: 10px; margin: 5px 0; width: 300px; } | |
#results { margin-top: 20px; } | |
.post { border-bottom: 1px solid #ddd; padding: 10px 0; } | |
.media img, .media video { max-width: 100%; max-height: 200px; } | |
</style> | |
</head> | |
<body> | |
<h1>استخراج منشورات فيسبوك</h1> | |
<form id="fbForm"> | |
<input type="email" id="email" placeholder="البريد الإلكتروني" required /><br/> | |
<input type="password" id="password" placeholder="كلمة المرور" required /><br/> | |
<input type="number" id="limit" placeholder="عدد المنشورات (افتراضي 5)" min="1" max="20" /><br/> | |
<button type="submit">ابدأ الاستخراج</button> | |
</form> | |
<div id="status"></div> | |
<div id="results"></div> | |
<script> | |
document.getElementById('fbForm').addEventListener('submit', async (e) => { | |
e.preventDefault(); | |
document.getElementById('status').textContent = 'جاري الاستخراج... الرجاء الانتظار'; | |
document.getElementById('results').innerHTML = ''; | |
let email = document.getElementById('email').value; | |
let password = document.getElementById('password').value; | |
let limit = parseInt(document.getElementById('limit').value) || 5; | |
try { | |
let response = await fetch('/extract', { | |
method: 'POST', | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ email, password, limit }) | |
}); | |
let data = await response.json(); | |
if(data.error){ | |
document.getElementById('status').textContent = 'خطأ: ' + data.error; | |
return; | |
} | |
document.getElementById('status').textContent = 'تم الاستخراج بنجاح!'; | |
data.posts.forEach((post, i) => { | |
let postDiv = document.createElement('div'); | |
postDiv.className = 'post'; | |
let html = `<p><b>المنشور ${i+1}:</b> ${post.content}</p>`; | |
if(post.media_url){ | |
if(post.media_url.endsWith('.mp4') || post.media_url.endsWith('.webm')){ | |
html += `<div class="media"><video controls src="${post.media_url}"></video></div>`; | |
} else { | |
html += `<div class="media"><img src="${post.media_url}" alt="Media"></div>`; | |
} | |
} | |
html += `<small>تم الاستخراج: ${post.extracted_at}</small>`; | |
postDiv.innerHTML = html; | |
document.getElementById('results').appendChild(postDiv); | |
}); | |
} catch (e) { | |
document.getElementById('status').textContent = 'حدث خطأ غير متوقع.'; | |
console.error(e); | |
} | |
}); | |
</script> | |
</body> | |
</html> | |
""" | |
async def home(): | |
return HTML_PAGE | |
async def extract_posts(data: dict): | |
email = data.get("email") | |
password = data.get("password") | |
limit = int(data.get("limit", LIMIT)) | |
if not email or not password: | |
return JSONResponse({"error": "يرجى إدخال البريد الإلكتروني وكلمة المرور."}) | |
posts = [] | |
try: | |
async with async_playwright() as p: | |
browser = await p.chromium.launch(headless=True) | |
page = await browser.new_page() | |
await page.goto("https://www.facebook.com/") | |
await page.fill('input[name="email"]', email) | |
await page.fill('input[name="pass"]', password) | |
await page.click('button[name="login"]') | |
await page.wait_for_timeout(7000) | |
if "login" in page.url or await page.query_selector("input[name='email']"): | |
await browser.close() | |
return JSONResponse({"error": "فشل في تسجيل الدخول! تحقق من البيانات."}) | |
await page.goto("https://www.facebook.com/me") | |
await page.wait_for_timeout(5000) | |
post_blocks = await page.query_selector_all("div[role='feed'] div[data-ad-preview='message']") | |
for i, post in enumerate(post_blocks): | |
if i >= limit: | |
break | |
try: | |
content = await post.inner_text() | |
parent_post = await post.evaluate_handle("node => node.closest('div[role=article]')") | |
media_url = None | |
if parent_post: | |
img = await parent_post.query_selector("img[src]") | |
if img: | |
media_url = await img.get_attribute("src") | |
else: | |
video = await parent_post.query_selector("video[src]") | |
if video: | |
media_url = await video.get_attribute("src") | |
posts.append({ | |
"content": content.strip(), | |
"media_url": media_url, | |
"extracted_at": datetime.now().isoformat() | |
}) | |
except: | |
continue | |
await browser.close() | |
except Exception as e: | |
return JSONResponse({"error": f"حدث خطأ أثناء التشغيل: {str(e)}"}) | |
with open(OUTPUT_FILE, "w", encoding="utf-8") as f: | |
json.dump(posts, f, ensure_ascii=False, indent=2) | |
return {"posts": posts} | |