Hhgggg / aphhp.py
Ffftdtd5dtft's picture
Rename app.py to aphhp.py
9e223cb verified
from fastapi import FastAPI, BackgroundTasks, HTTPException, Form
from fastapi.responses import PlainTextResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import requests
from requests.exceptions import ProxyError, ConnectTimeout
from faker import Faker
import re
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
import time
import gradio as gr
app = FastAPI()
fake = Faker()
class Proxy(BaseModel):
ip: str
port: str
class VisitRequest(BaseModel):
url: str
platform: str
count: int
delay: int
parallel_processes: int
app.mount("/static", StaticFiles(directory="static"), name="static")
def get_random_proxy():
pubproxy_url = "http://pubproxy.com/api/proxy?limit=1"
response = requests.get(pubproxy_url, verify=False)
data = response.json()
if data['data']:
proxy_data = data['data'][0]
return f"{proxy_data['ip']}:{proxy_data['port']}"
return None
@app.get("/get_proxies", response_class=PlainTextResponse)
def get_proxies():
try:
proxies = []
pubproxy_url = "http://pubproxy.com/api/proxy?limit=5"
response = requests.get(pubproxy_url, verify=False)
data = response.json()
for proxy_data in data['data']:
ip, port = proxy_data['ipPort'].split(":")
proxy = f"{ip}:{port}"
proxies.append(proxy)
return "\n".join(proxies)
except Exception as e:
return PlainTextResponse(str(e), status_code=500)
@app.get("/rotate_ip", response_class=PlainTextResponse)
def rotate_ip():
try:
random_ip = fake.ipv4()
headers = {
"X-Forwarded-For": random_ip,
"Client-IP": random_ip,
"X-Real-IP": random_ip
}
test_url = "http://pubproxy.com/api/proxy?limit=1&type=http&https=true&speed=60"
response = requests.get(test_url, headers=headers, verify=False)
data = response.json()
proxies = []
for proxy_data in data['data']:
ip, port = proxy_data['ipPort'].split(":")
proxy = f"{ip}:{port}"
proxies.append(proxy)
return "\n".join(proxies)
except ProxyError as e:
return PlainTextResponse(f"ProxyError: {str(e)}", status_code=500)
except ConnectTimeout as e:
return PlainTextResponse(f"ConnectTimeoutError: {str(e)}", status_code=500)
except Exception as e:
return PlainTextResponse(str(e), status_code=500)
def extract_video_id(url: str, platform: str) -> str:
url = urllib.parse.unquote(url) # Decode the URL
if platform == "instagram":
match = re.search(r"instagram\.com/reel/([^/?]+)", url)
elif platform == "tiktok":
match = re.search(r"tiktok\.com/@[^/]+/video/(\d+)", url)
elif platform == "youtube":
match = re.search(r"youtube\.com/watch\?v=([^&]+)", url)
elif platform == "facebook":
match = re.search(r"facebook\.com/.*/videos/(\d+)", url)
elif platform == "twitch":
match = re.search(r"twitch\.tv/videos/(\d+)", url)
elif platform == "spotify":
match = re.search(r"spotify\.com/track/([^/?]+)", url)
else:
match = None
if match:
return match.group(1)
else:
raise ValueError("Invalid URL format")
def simulate_view(video_id: str, platform: str, proxy: str, delay: int):
proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}"
}
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
try:
if platform == "instagram":
view_url = f"http://www.instagram.com/p/{video_id}?autoplay=true"
elif platform == "tiktok":
view_url = f"http://www.tiktok.com/@{video_id}?autoplay=true"
elif platform == "youtube":
view_url = f"http://www.youtube.com/watch?v={video_id}&autoplay=1"
elif platform == "facebook":
view_url = f"http://www.facebook.com/watch/?v={video_id}&autoplay=1"
elif platform == "twitch":
view_url = f"http://www.twitch.tv/videos/{video_id}?autoplay=true"
elif platform == "spotify":
view_url = f"http://open.spotify.com/track/{video_id}?autoplay=true"
else:
raise ValueError("Invalid platform")
response = requests.get(view_url, headers=headers, proxies=proxies, timeout=10)
time.sleep(delay)
except Exception as e:
print(f"Error in simulate_view: {e}")
def perform_views(url: str, platform: str, count: int, delay: int, parallel_processes: int):
video_id = extract_video_id(url, platform)
def task():
try:
proxy_response = requests.get("http://0.0.0.0:8000/rotate_ip")
if proxy_response.status_code == 200:
proxy = proxy_response.text.strip()
if proxy:
simulate_view(video_id, platform, proxy, delay)
except Exception as e:
print(f"Error in perform_views: {e}")
with ThreadPoolExecutor(max_workers=parallel_processes) as executor:
for _ in range(count):
executor.submit(task)
@app.post("/increase_views", response_class=PlainTextResponse)
def increase_views(
url: str = Form(...),
platform: str = Form(...),
count: int = Form(...),
delay: int = Form(...),
parallel_processes: int = Form(...),
background_tasks: BackgroundTasks = BackgroundTasks()
):
if platform not in ["instagram", "tiktok", "youtube", "facebook", "twitch", "spotify"]:
raise HTTPException(status_code=400, detail="Invalid platform. Choose 'instagram', 'tiktok', 'youtube', 'facebook', 'twitch', or 'spotify'.")
background_tasks.add_task(perform_views, url, platform, count, delay, parallel_processes)
return PlainTextResponse(f"Started {count} view tasks for {platform} at {url} with {delay} seconds delay and {parallel_processes} parallel processes.")
def increase_views_gradio(url, platform, count, delay, parallel_processes):
return increase_views(url, platform, count, delay, parallel_processes, background_tasks=BackgroundTasks())
iface = gr.Interface(
fn=increase_views_gradio,
inputs=[
gr.Textbox(label="URL"),
gr.Dropdown(["instagram", "tiktok", "youtube", "facebook", "twitch", "spotify"], label="Platform"),
gr.Number(label="View Count", minimum=1),
gr.Number(label="Delay (seconds)", minimum=1),
gr.Number(label="Parallel Processes", minimum=1),
],
outputs="text",
title="Increase Views",
description="Increase views on your favorite social media platforms.",
)
@app.get("/form", response_class=HTMLResponse)
def form():
return iface.render()
@app.get("/", response_class=HTMLResponse)
def read_root():
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Main Page</title>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.5.2/css/bootstrap.min.css">
<style>
body {
background-color: #f8f9fa;
}
.container {
margin-top: 50px;
text-align: center;
}
h1 {
color: #343a40;
}
.btn {
margin: 10px;
}
</style>
</head>
<body>
<div class="container">
<h1>Welcome</h1>
<p>Choose an action:</p>
<a href="/rotate_ip"><button class="btn btn-primary">Rotate IP</button></a>
<a href="/form"><button class="btn btn-primary">Increase Views</button></a>
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
if __name__ == "__main__":
import uvicorn
iface.launch(share=True)