|
import requests |
|
from bs4 import BeautifulSoup |
|
from fastapi import FastAPI, BackgroundTasks, HTTPException, Form |
|
from fastapi.responses import PlainTextResponse, HTMLResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from pydantic import BaseModel |
|
import re |
|
from concurrent.futures import ThreadPoolExecutor |
|
import urllib.parse |
|
import gradio as gr |
|
import fake_useragent as fake |
|
import random |
|
from dotenv import load_dotenv |
|
import os |
|
|
|
load_dotenv() |
|
|
|
app = FastAPI() |
|
|
|
class Proxy(BaseModel): |
|
ip: str |
|
port: str |
|
|
|
class VisitRequest(BaseModel): |
|
url: str |
|
platform: str |
|
count: int |
|
delay: int |
|
parallel_processes: int |
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
def get_random_proxy(): |
|
pubproxy_url = "http://pubproxy.com/api/proxy?limit=1" |
|
response = requests.get(pubproxy_url, verify=False) |
|
data = response.json() |
|
if data['data']: |
|
proxy_data = data['data'][0] |
|
return f"{proxy_data['ip']}:{proxy_data['port']}" |
|
return None |
|
|
|
@app.get("/get_proxies", response_class=PlainTextResponse) |
|
def get_proxies(): |
|
try: |
|
proxies = [] |
|
pubproxy_url = "http://pubproxy.com/api/proxy?limit=5" |
|
response = requests.get(pubproxy_url, verify=False) |
|
data = response.json() |
|
|
|
for proxy_data in data['data']: |
|
ip, port = proxy_data['ipPort'].split(":") |
|
proxy = f"{ip}:{port}" |
|
proxies.append(proxy) |
|
|
|
return "\n".join(proxies) |
|
except Exception as e: |
|
return PlainTextResponse(str(e), status_code=500) |
|
|
|
@app.get("/rotate_ip", response_class=PlainTextResponse) |
|
def rotate_ip(): |
|
try: |
|
random_ip = fake.ipv4() |
|
|
|
headers = { |
|
"X-Forwarded-For": random_ip, |
|
"Client-IP": random_ip, |
|
"X-Real-IP": random_ip |
|
} |
|
|
|
test_url = "http://pubproxy.com/api/proxy?limit=1&type=http&https=true&speed=60" |
|
response = requests.get(test_url, headers=headers, verify=False) |
|
data = response.json() |
|
|
|
proxies = [] |
|
for proxy_data in data['data']: |
|
ip, port = proxy_data['ipPort'].split(":") |
|
proxy = f"{ip}:{port}" |
|
proxies.append(proxy) |
|
|
|
return "\n".join(proxies) |
|
except Exception as e: |
|
return PlainTextResponse(str(e), status_code=500) |
|
|
|
def extract_video_id(url: str, platform: str) -> str: |
|
url = urllib.parse.unquote(url) |
|
if platform == "instagram": |
|
match = re.search(r"instagram\.com/reel/([^/?]+)", url) |
|
elif platform == "tiktok": |
|
match = re.search(r"tiktok\.com/@[^/]+/video/(\d+)", url) |
|
elif platform == "youtube": |
|
match = re.search(r"youtube\.com/watch\?v=([^&]+)", url) |
|
elif platform == "facebook": |
|
match = re.search(r"facebook\.com/.*/videos/(\d+)", url) |
|
elif platform == "twitch": |
|
match = re.search(r"twitch\.tv/videos/(\d+)", url) |
|
elif platform == "spotify": |
|
match = re.search(r"spotify\.com/track/([^/?]+)", url) |
|
else: |
|
match = None |
|
|
|
if match: |
|
return match.group(1) |
|
else: |
|
raise ValueError("Invalid URL format") |
|
|
|
def instagram_login(username: str, password: str): |
|
login_url = "https://www.instagram.com/accounts/login/ajax/" |
|
session = requests.Session() |
|
|
|
response = session.get("https://www.instagram.com/", headers={"User-Agent": "Mozilla/5.0"}) |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
csrf_token = soup.find("meta", {"name": "csrf-token"})["content"] |
|
|
|
login_data = { |
|
"username": username, |
|
"enc_password": f"#PWD_INSTAGRAM_BROWSER:0:&:{password}" |
|
} |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0", |
|
"X-CSRFToken": csrf_token, |
|
"X-Requested-With": "XMLHttpRequest" |
|
} |
|
|
|
response = session.post(login_url, data=login_data, headers=headers) |
|
|
|
if response.status_code == 200 and response.json().get("authenticated"): |
|
return session |
|
else: |
|
raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
|
def tiktok_login(username: str, password: str): |
|
login_url = "https://www.tiktok.com/login/" |
|
session = requests.Session() |
|
|
|
login_data = { |
|
"username": username, |
|
"password": password |
|
} |
|
|
|
response = session.post(login_url, data=login_data) |
|
|
|
if response.status_code == 200: |
|
return session |
|
else: |
|
raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
|
def youtube_login(username: str, password: str): |
|
login_url = "https://accounts.google.com/ServiceLogin" |
|
session = requests.Session() |
|
|
|
login_data = { |
|
"username": username, |
|
"password": password |
|
} |
|
|
|
response = session.post(login_url, data=login_data) |
|
|
|
if response.status_code == 200: |
|
return session |
|
else: |
|
raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
|
def facebook_login(username: str, password: str): |
|
login_url = "https://www.facebook.com/login" |
|
session = requests.Session() |
|
|
|
login_data = { |
|
"email": username, |
|
"pass": password |
|
} |
|
|
|
response = session.post(login_url, data=login_data) |
|
|
|
if response.status_code == 200: |
|
return session |
|
else: |
|
raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
|
def twitch_login(username: str, password: str): |
|
login_url = "https://www.twitch.tv/login" |
|
session = requests.Session() |
|
|
|
login_data = { |
|
"login": username, |
|
"password": password |
|
} |
|
|
|
response = session.post(login_url, data=login_data) |
|
|
|
if response.status_code == 200: |
|
return session |
|
else: |
|
raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
|
def spotify_login(username: str, password: str): |
|
login_url = "https://accounts.spotify.com/api/token" |
|
session = requests.Session() |
|
|
|
login_data = { |
|
"username": username, |
|
"password": password |
|
} |
|
|
|
response = session.post(login_url, data=login_data) |
|
|
|
if response.status_code == 200: |
|
return session |
|
else: |
|
raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
|
def simulate_instagram_view(video_id: str, proxy: str): |
|
proxies = { |
|
"http": f"http://{proxy}", |
|
"https": f"http://{proxy}" |
|
} |
|
fake_ipv4 = fake.ipv4() |
|
headers = { |
|
"User-Agent": fake.user_agent(), |
|
"X-Forwarded-For": fake_ipv4, |
|
"Client-IP": fake_ipv4, |
|
"X-Real-IP": fake_ipv4 |
|
} |
|
try: |
|
view_url = f"https://www.instagram.com/reel/{video_id}" |
|
response = requests.get(view_url, headers=headers, proxies=proxies) |
|
except Exception as e: |
|
print(f"Error in simulate_instagram_view: {e}") |
|
|
|
def simulate_tiktok_view(video_id: str, proxy: str): |
|
proxies = { |
|
"http": f"http://{proxy}", |
|
"https": f"http://{proxy}" |
|
} |
|
fake_ipv4 = fake.ipv4() |
|
headers = { |
|
"User-Agent": fake.user_agent(), |
|
"X-Forwarded-For": fake_ipv4, |
|
"Client-IP": fake_ipv4, |
|
"X-Real-IP": fake_ipv4 |
|
} |
|
try: |
|
view_url = f"https://www.tiktok.com/@user/video/{video_id}" |
|
response = requests.get(view_url, headers=headers, proxies=proxies) |
|
except Exception as e: |
|
print(f"Error in simulate_tiktok_view: {e}") |
|
|
|
def simulate_youtube_view(video_id: str, proxy: str): |
|
proxies = { |
|
"http": f"http://{proxy}", |
|
"https": f"http://{proxy}" |
|
} |
|
fake_ipv4 = fake.ipv4() |
|
headers = { |
|
"User-Agent": fake.user_agent(), |
|
"X-Forwarded-For": fake_ipv4, |
|
"Client-IP": fake_ipv4, |
|
"X-Real-IP": fake_ipv4 |
|
} |
|
try: |
|
view_url = f"https://www.youtube.com/watch?v={video_id}" |
|
response = requests.get(view_url, headers=headers, proxies=proxies) |
|
except Exception as e: |
|
print(f"Error in simulate_youtube_view: {e}") |
|
|
|
def simulate_facebook_view(video_id: str, proxy: str): |
|
proxies = { |
|
"http": f"http://{proxy}", |
|
"https": f"http://{proxy}" |
|
} |
|
fake_ipv4 = fake.ipv4() |
|
headers = { |
|
"User-Agent": fake.user_agent(), |
|
"X-Forwarded-For": fake_ipv4, |
|
"Client-IP": fake_ipv4, |
|
"X-Real-IP": fake_ipv4 |
|
} |
|
try: |
|
view_url = f"https://www.facebook.com/{video_id}" |
|
response = requests.get(view_url, headers=headers, proxies=proxies) |
|
except Exception as e: |
|
print(f"Error in simulate_facebook_view: {e}") |
|
|
|
def simulate_twitch_view(video_id: str, proxy: str): |
|
proxies = { |
|
"http": f"http://{proxy}", |
|
"https": f"http://{proxy}" |
|
} |
|
fake_ipv4 = fake.ipv4() |
|
headers = { |
|
"User-Agent": fake.user_agent(), |
|
"X-Forwarded-For": fake_ipv4, |
|
"Client-IP": fake_ipv4, |
|
"X-Real-IP": fake_ipv4 |
|
} |
|
try: |
|
view_url = f"https://www.twitch.tv/videos/{video_id}" |
|
response = requests.get(view_url, headers=headers, proxies=proxies) |
|
except Exception as e: |
|
print(f"Error in simulate_twitch_view: {e}") |
|
|
|
def simulate_spotify_view(video_id: str, proxy: str): |
|
proxies = { |
|
"http": f"http://{proxy}", |
|
"https": f"http://{proxy}" |
|
} |
|
fake_ipv4 = fake.ipv4() |
|
headers = { |
|
"User-Agent": fake.user_agent(), |
|
"X-Forwarded-For": fake_ipv4, |
|
"Client-IP": fake_ipv4, |
|
"X-Real-IP": fake_ipv4 |
|
} |
|
try: |
|
view_url = f"https://open.spotify.com/track/{video_id}" |
|
response = requests.get(view_url, headers=headers, proxies=proxies) |
|
except Exception as e: |
|
print(f"Error in simulate_spotify_view: {e}") |
|
|
|
def simulate_views(url: str, platform: str, count: int, delay: int, parallel_processes: int): |
|
video_id = extract_video_id(url, platform) |
|
proxy = get_random_proxy() |
|
|
|
if not proxy: |
|
print("No proxy available.") |
|
return |
|
|
|
with ThreadPoolExecutor(max_workers=parallel_processes) as executor: |
|
for _ in range(count): |
|
if platform == "instagram": |
|
executor.submit(simulate_instagram_view, video_id, proxy) |
|
elif platform == "tiktok": |
|
executor.submit(simulate_tiktok_view, video_id, proxy) |
|
elif platform == "youtube": |
|
executor.submit(simulate_youtube_view, video_id, proxy) |
|
elif platform == "facebook": |
|
executor.submit(simulate_facebook_view, video_id, proxy) |
|
elif platform == "twitch": |
|
executor.submit(simulate_twitch_view, video_id, proxy) |
|
elif platform == "spotify": |
|
executor.submit(simulate_spotify_view, video_id, proxy) |
|
|
|
@app.post("/simulate_views") |
|
async def simulate_views_endpoint(request: VisitRequest): |
|
try: |
|
simulate_views( |
|
url=request.url, |
|
platform=request.platform, |
|
count=request.count, |
|
delay=request.delay, |
|
parallel_processes=request.parallel_processes |
|
) |
|
return PlainTextResponse("Views simulation started") |
|
except Exception as e: |
|
return PlainTextResponse(str(e), status_code=500) |
|
|
|
def authenticate(username: str, password: str, platform: str): |
|
if platform == "instagram": |
|
return instagram_login(username, password) |
|
elif platform == "tiktok": |
|
return tiktok_login(username, password) |
|
elif platform == "youtube": |
|
return youtube_login(username, password) |
|
elif platform == "facebook": |
|
return facebook_login(username, password) |
|
elif platform == "twitch": |
|
return twitch_login(username, password) |
|
elif platform == "spotify": |
|
return spotify_login(username, password) |
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid platform") |
|
|
|
@app.post("/login") |
|
async def login(username: str = Form(...), password: str = Form(...), platform: str = Form(...)): |
|
try: |
|
session = authenticate(username, password, platform) |
|
return PlainTextResponse(f"Authenticated on {platform}") |
|
except HTTPException as e: |
|
return e |
|
|
|
def gradio_interface(url: str, platform: str, count: int, delay: int, parallel_processes: int): |
|
try: |
|
simulate_views(url, platform, count, delay, parallel_processes) |
|
return "Simulation started successfully." |
|
except Exception as e: |
|
return f"Error: {e}" |
|
|
|
gr.Interface( |
|
fn=gradio_interface, |
|
inputs=[ |
|
gr.Textbox(label="URL"), |
|
gr.Dropdown(choices=["instagram", "tiktok", "youtube", "facebook", "twitch", "spotify"], label="Platform"), |
|
gr.Slider(minimum=1, maximum=1000, label="Count"), |
|
gr.Slider(minimum=1, maximum=60, label="Delay (seconds)"), |
|
gr.Slider(minimum=1, maximum=10, label="Parallel Processes") |
|
], |
|
outputs=gr.Textbox(label="Output") |
|
).launch() |
|
|
|
def get_credentials(): |
|
credentials = [] |
|
with open(".env", "r") as file: |
|
lines = file.readlines() |
|
for line in lines: |
|
line = line.strip() |
|
if "=" in line: |
|
platform, details = line.split("=", 1) |
|
if "|" in details: |
|
user, password = details.split("|", 1) |
|
credentials.append((platform.strip(), user.strip(), password.strip())) |
|
return credentials |
|
|
|
def get_random_credentials(): |
|
credentials = get_credentials() |
|
if credentials: |
|
return random.choice(credentials) |
|
else: |
|
raise HTTPException(status_code=404, detail="No credentials found") |
|
|
|
@app.get("/random_login") |
|
async def random_login(platform: str): |
|
try: |
|
platform_cred = next((plat for plat in get_credentials() if plat[0] == platform), None) |
|
if platform_cred: |
|
_, username, password = platform_cred |
|
session = authenticate(username, password, platform) |
|
return PlainTextResponse(f"Authenticated on {platform} with user {username}") |
|
else: |
|
raise HTTPException(status_code=404, detail="Platform not found") |
|
except HTTPException as e: |
|
return e |
|
|