Hhgggg / app.py
Ffftdtd5dtft's picture
Update app.py
0ca74de verified
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, BackgroundTasks, HTTPException, Form
from fastapi.responses import PlainTextResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import re
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
import gradio as gr
import fake_useragent as fake
import random
from dotenv import load_dotenv
import os
load_dotenv()
app = FastAPI()
class Proxy(BaseModel):
ip: str
port: str
class VisitRequest(BaseModel):
url: str
platform: str
count: int
delay: int
parallel_processes: int
app.mount("/static", StaticFiles(directory="static"), name="static")
def get_random_proxy():
pubproxy_url = "http://pubproxy.com/api/proxy?limit=1"
response = requests.get(pubproxy_url, verify=False)
data = response.json()
if data['data']:
proxy_data = data['data'][0]
return f"{proxy_data['ip']}:{proxy_data['port']}"
return None
@app.get("/get_proxies", response_class=PlainTextResponse)
def get_proxies():
try:
proxies = []
pubproxy_url = "http://pubproxy.com/api/proxy?limit=5"
response = requests.get(pubproxy_url, verify=False)
data = response.json()
for proxy_data in data['data']:
ip, port = proxy_data['ipPort'].split(":")
proxy = f"{ip}:{port}"
proxies.append(proxy)
return "\n".join(proxies)
except Exception as e:
return PlainTextResponse(str(e), status_code=500)
@app.get("/rotate_ip", response_class=PlainTextResponse)
def rotate_ip():
try:
random_ip = fake.ipv4()
headers = {
"X-Forwarded-For": random_ip,
"Client-IP": random_ip,
"X-Real-IP": random_ip
}
test_url = "http://pubproxy.com/api/proxy?limit=1&type=http&https=true&speed=60"
response = requests.get(test_url, headers=headers, verify=False)
data = response.json()
proxies = []
for proxy_data in data['data']:
ip, port = proxy_data['ipPort'].split(":")
proxy = f"{ip}:{port}"
proxies.append(proxy)
return "\n".join(proxies)
except Exception as e:
return PlainTextResponse(str(e), status_code=500)
def extract_video_id(url: str, platform: str) -> str:
url = urllib.parse.unquote(url)
if platform == "instagram":
match = re.search(r"instagram\.com/reel/([^/?]+)", url)
elif platform == "tiktok":
match = re.search(r"tiktok\.com/@[^/]+/video/(\d+)", url)
elif platform == "youtube":
match = re.search(r"youtube\.com/watch\?v=([^&]+)", url)
elif platform == "facebook":
match = re.search(r"facebook\.com/.*/videos/(\d+)", url)
elif platform == "twitch":
match = re.search(r"twitch\.tv/videos/(\d+)", url)
elif platform == "spotify":
match = re.search(r"spotify\.com/track/([^/?]+)", url)
else:
match = None
if match:
return match.group(1)
else:
raise ValueError("Invalid URL format")
def instagram_login(username: str, password: str):
login_url = "https://www.instagram.com/accounts/login/ajax/"
session = requests.Session()
response = session.get("https://www.instagram.com/", headers={"User-Agent": "Mozilla/5.0"})
soup = BeautifulSoup(response.text, "html.parser")
csrf_token = soup.find("meta", {"name": "csrf-token"})["content"]
login_data = {
"username": username,
"enc_password": f"#PWD_INSTAGRAM_BROWSER:0:&:{password}"
}
headers = {
"User-Agent": "Mozilla/5.0",
"X-CSRFToken": csrf_token,
"X-Requested-With": "XMLHttpRequest"
}
response = session.post(login_url, data=login_data, headers=headers)
if response.status_code == 200 and response.json().get("authenticated"):
return session
else:
raise HTTPException(status_code=401, detail="Authentication failed")
def tiktok_login(username: str, password: str):
login_url = "https://www.tiktok.com/login/"
session = requests.Session()
login_data = {
"username": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPException(status_code=401, detail="Authentication failed")
def youtube_login(username: str, password: str):
login_url = "https://accounts.google.com/ServiceLogin"
session = requests.Session()
login_data = {
"username": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPException(status_code=401, detail="Authentication failed")
def facebook_login(username: str, password: str):
login_url = "https://www.facebook.com/login"
session = requests.Session()
login_data = {
"email": username,
"pass": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPException(status_code=401, detail="Authentication failed")
def twitch_login(username: str, password: str):
login_url = "https://www.twitch.tv/login"
session = requests.Session()
login_data = {
"login": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPException(status_code=401, detail="Authentication failed")
def spotify_login(username: str, password: str):
login_url = "https://accounts.spotify.com/api/token"
session = requests.Session()
login_data = {
"username": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPException(status_code=401, detail="Authentication failed")
def simulate_instagram_view(video_id: str, proxy: str):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
view_url = f"https://www.instagram.com/reel/{video_id}"
response = requests.get(view_url, headers=headers, proxies=proxies)
except Exception as e:
print(f"Error in simulate_instagram_view: {e}")
def simulate_tiktok_view(video_id: str, proxy: str):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
view_url = f"https://www.tiktok.com/@user/video/{video_id}"
response = requests.get(view_url, headers=headers, proxies=proxies)
except Exception as e:
print(f"Error in simulate_tiktok_view: {e}")
def simulate_youtube_view(video_id: str, proxy: str):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
view_url = f"https://www.youtube.com/watch?v={video_id}"
response = requests.get(view_url, headers=headers, proxies=proxies)
except Exception as e:
print(f"Error in simulate_youtube_view: {e}")
def simulate_facebook_view(video_id: str, proxy: str):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
view_url = f"https://www.facebook.com/{video_id}"
response = requests.get(view_url, headers=headers, proxies=proxies)
except Exception as e:
print(f"Error in simulate_facebook_view: {e}")
def simulate_twitch_view(video_id: str, proxy: str):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
view_url = f"https://www.twitch.tv/videos/{video_id}"
response = requests.get(view_url, headers=headers, proxies=proxies)
except Exception as e:
print(f"Error in simulate_twitch_view: {e}")
def simulate_spotify_view(video_id: str, proxy: str):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
view_url = f"https://open.spotify.com/track/{video_id}"
response = requests.get(view_url, headers=headers, proxies=proxies)
except Exception as e:
print(f"Error in simulate_spotify_view: {e}")
def simulate_views(url: str, platform: str, count: int, delay: int, parallel_processes: int):
video_id = extract_video_id(url, platform)
proxy = get_random_proxy()
if not proxy:
print("No proxy available.")
return
with ThreadPoolExecutor(max_workers=parallel_processes) as executor:
for _ in range(count):
if platform == "instagram":
executor.submit(simulate_instagram_view, video_id, proxy)
elif platform == "tiktok":
executor.submit(simulate_tiktok_view, video_id, proxy)
elif platform == "youtube":
executor.submit(simulate_youtube_view, video_id, proxy)
elif platform == "facebook":
executor.submit(simulate_facebook_view, video_id, proxy)
elif platform == "twitch":
executor.submit(simulate_twitch_view, video_id, proxy)
elif platform == "spotify":
executor.submit(simulate_spotify_view, video_id, proxy)
@app.post("/simulate_views")
async def simulate_views_endpoint(request: VisitRequest):
try:
simulate_views(
url=request.url,
platform=request.platform,
count=request.count,
delay=request.delay,
parallel_processes=request.parallel_processes
)
return PlainTextResponse("Views simulation started")
except Exception as e:
return PlainTextResponse(str(e), status_code=500)
def authenticate(username: str, password: str, platform: str):
if platform == "instagram":
return instagram_login(username, password)
elif platform == "tiktok":
return tiktok_login(username, password)
elif platform == "youtube":
return youtube_login(username, password)
elif platform == "facebook":
return facebook_login(username, password)
elif platform == "twitch":
return twitch_login(username, password)
elif platform == "spotify":
return spotify_login(username, password)
else:
raise HTTPException(status_code=400, detail="Invalid platform")
@app.post("/login")
async def login(username: str = Form(...), password: str = Form(...), platform: str = Form(...)):
try:
session = authenticate(username, password, platform)
return PlainTextResponse(f"Authenticated on {platform}")
except HTTPException as e:
return e
def gradio_interface(url: str, platform: str, count: int, delay: int, parallel_processes: int):
try:
simulate_views(url, platform, count, delay, parallel_processes)
return "Simulation started successfully."
except Exception as e:
return f"Error: {e}"
gr.Interface(
fn=gradio_interface,
inputs=[
gr.Textbox(label="URL"),
gr.Dropdown(choices=["instagram", "tiktok", "youtube", "facebook", "twitch", "spotify"], label="Platform"),
gr.Slider(minimum=1, maximum=1000, label="Count"),
gr.Slider(minimum=1, maximum=60, label="Delay (seconds)"),
gr.Slider(minimum=1, maximum=10, label="Parallel Processes")
],
outputs=gr.Textbox(label="Output")
).launch()
def get_credentials():
credentials = []
with open(".env", "r") as file:
lines = file.readlines()
for line in lines:
line = line.strip()
if "=" in line:
platform, details = line.split("=", 1)
if "|" in details:
user, password = details.split("|", 1)
credentials.append((platform.strip(), user.strip(), password.strip()))
return credentials
def get_random_credentials():
credentials = get_credentials()
if credentials:
return random.choice(credentials)
else:
raise HTTPException(status_code=404, detail="No credentials found")
@app.get("/random_login")
async def random_login(platform: str):
try:
platform_cred = next((plat for plat in get_credentials() if plat[0] == platform), None)
if platform_cred:
_, username, password = platform_cred
session = authenticate(username, password, platform)
return PlainTextResponse(f"Authenticated on {platform} with user {username}")
else:
raise HTTPException(status_code=404, detail="Platform not found")
except HTTPException as e:
return e