Ghhvv / app.py
Uhhy's picture
Create app.py
c92aac9 verified
raw
history blame
11.3 kB
import requests
from bs4 import BeautifulSoup
from httpx import Client, HTTPError
from pydantic import BaseModel
import re
import urllib.parse
import fake_useragent as fake
import random
from dotenv import load_dotenv
import os
from faker import Faker
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import time
from webdriver_manager.chrome import ChromeDriverManager
from tqdm import tqdm
import threading
import gradio as gr
load_dotenv()
class Proxy(BaseModel):
ip: str
port: str
type: str
class VisitRequest(BaseModel):
url: str
count: int
delay: int
parallel_processes: int
def get_random_proxy():
try:
response = requests.get("https://uhhy-fsfsfs.hf.space/valid")
response.raise_for_status()
proxies = response.text.splitlines()
if proxies:
return random.choice(proxies)
else:
return None
except Exception as e:
print(f"Error getting proxy: {e}")
return None
def get_proxies():
try:
proxies = []
for _ in range(5):
proxy = get_random_proxy()
if proxy:
proxies.append(proxy)
return "\n".join(proxies)
except Exception as e:
return str(e)
def rotate_ip():
try:
fake = Faker()
random_ip = fake.ipv4()
headers = {
"X-Forwarded-For": random_ip,
"Client-IP": random_ip,
"X-Real-IP": random_ip
}
proxy = get_random_proxy()
if not proxy:
return "No proxy available."
return proxy
except Exception as e:
return str(e)
def extract_video_id(url: str, platform: str) -> str:
url = urllib.parse.unquote(url)
if platform == "instagram":
match = re.search(r"instagram\.com/reel/([^/?]+)", url)
elif platform == "tiktok":
match = re.search(r"tiktok\.com/@[^/]+/video/(\d+)", url)
elif platform == "youtube":
match = re.search(r"youtube\.com/watch\?v=([^&]+)", url)
elif platform == "facebook":
match = re.search(r"facebook\.com/.*/videos/(\d+)", url)
elif platform == "twitch":
match = re.search(r"twitch\.tv/videos/(\d+)", url)
elif platform == "spotify":
match = re.search(r"spotify\.com/track/([^/?]+)", url)
else:
match = None
if match:
return match.group(1)
else:
return None
def instagram_login(username: str, password: str):
login_url = "https://www.instagram.com/accounts/login/ajax/"
session = requests.Session()
response = session.get("https://www.instagram.com/", headers={"User-Agent": "Mozilla/5.0"})
soup = BeautifulSoup(response.text, "html.parser")
csrf_token = soup.find("meta", {"name": "csrf-token"})["content"]
login_data = {
"username": username,
"enc_password": f"#PWD_INSTAGRAM_BROWSER:0:&:{password}"
}
headers = {
"User-Agent": "Mozilla/5.0",
"X-CSRFToken": csrf_token,
"X-Requested-With": "XMLHttpRequest"
}
response = session.post(login_url, data=login_data, headers=headers)
if response.status_code == 200 and response.json().get("authenticated"):
return session
else:
raise HTTPError("Authentication failed")
def tiktok_login(username: str, password: str):
login_url = "https://www.tiktok.com/login/"
session = requests.Session()
login_data = {
"username": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPError("Authentication failed")
def youtube_login(username: str, password: str):
login_url = "https://accounts.google.com/ServiceLogin"
session = requests.Session()
login_data = {
"username": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPError("Authentication failed")
def facebook_login(username: str, password: str):
login_url = "https://www.facebook.com/login"
session = requests.Session()
login_data = {
"email": username,
"pass": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPError("Authentication failed")
def twitch_login(username: str, password: str):
login_url = "https://www.twitch.tv/login"
session = requests.Session()
login_data = {
"login": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPError("Authentication failed")
def spotify_login(username: str, password: str):
login_url = "https://accounts.spotify.com/api/token"
session = requests.Session()
login_data = {
"username": username,
"password": password
}
response = session.post(login_url, data=login_data)
if response.status_code == 200:
return session
else:
raise HTTPError("Authentication failed")
def simulate_view(url: str, proxy: str, session: Client, delay: int):
webdriver.DesiredCapabilities.CHROME['proxy'] = {
"httpProxy": proxy,
"ftpProxy": proxy,
"sslProxy": proxy,
"proxyType": "MANUAL",
}
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--disable-popup-blocking")
options.add_argument("--disable-infobars")
options.add_argument("--disable-web-security")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--disable-notifications")
options.add_argument("--disable-extensions")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--mute-audio")
options.add_argument('--ignore-ssl-errors=yes')
options.add_argument('--ignore-certificate-errors')
driver = webdriver.Chrome(options=options)
try:
fake = Faker()
fake_ipv4 = fake.ipv4()
headers = {
"User-Agent": fake.user_agent(),
"X-Forwarded-For": fake_ipv4,
"Client-IP": fake_ipv4,
"X-Real-IP": fake_ipv4
}
driver.get(url)
try:
not_now = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, 'mt3GC')))
a = not_now.find_elements(By.TAG_NAME, "button")[1]
actions = ActionChains(driver)
actions.click(a)
actions.perform()
except:
pass
time.sleep(delay)
return True
except Exception as e:
print(f"Error simulating view: {e}")
return False
finally:
driver.quit()
def simulate_views_background(url: str, count: int, delay: int, session: Client = None):
proxy = get_random_proxy()
if not proxy:
print("No proxy available.")
return
successful_views = 0
failed_views = 0
for i in tqdm(range(count), desc=f"Simulating views for {url}"):
try:
success = simulate_view(url, proxy, session, delay)
if success:
successful_views += 1
else:
failed_views += 1
remaining_views = count - i - 1
print(f"Successful: {successful_views}, Failed: {failed_views}, Remaining: {remaining_views}, Time Remaining: {((count - i - 1) * delay) / 60:.2f} minutes")
except Exception as e:
failed_views += 1
remaining_views = count - i - 1
print(f"Successful: {successful_views}, Failed: {failed_views}, Remaining: {remaining_views}, Time Remaining: {((count - i - 1) * delay) / 60:.2f} minutes")
def simulate_views_endpoint(request: VisitRequest):
try:
session = None
threading.Thread(
target=simulate_views_background,
args=(
request.url,
request.count,
request.delay,
session,
)
).start()
return "Views simulation started in the background."
except Exception as e:
return str(e)
def simulate(urls: str, count: int, delay: int, parallel_processes: int):
try:
session = None
for url in urls.split("\n"):
for _ in range(parallel_processes):
threading.Thread(
target=simulate_views_background,
args=(
url,
count,
delay,
session
)
).start()
return "Simulations started in the background."
except Exception as e:
return str(e)
def rand(min, max):
return random.randint(min, max)
with gr.Blocks(css="""
body {
background-color: #f0f0f0;
font-family: sans-serif;
}
.container {
background-color: #fff;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
text-align: center;
animation: fadeIn 1s ease-in-out;
}
h1 {
color: #333;
margin-bottom: 20px;
text-shadow: 0 1px 2px rgba(0, 0, 0, 0.1);
}
form {
display: flex;
flex-direction: column;
width: 350px;
}
label {
margin-bottom: 5px;
font-weight: bold;
}
input[type="text"],
textarea,
select,
input[type="number"] {
padding: 10px;
margin-bottom: 15px;
border: 1px solid #ddd;
border-radius: 5px;
}
input[type="submit"] {
background-color: #007bff;
color: #fff;
border: none;
padding: 12px 20px;
border-radius: 5px;
cursor: pointer;
transition: background-color 0.3s;
}
input[type="submit"]:hover {
background-color: #0056b3;
}
@keyframes fadeIn {
from {
opacity: 0;
}
to {
opacity: 1;
}
}
""") as interface:
urls = gr.Textbox(label="Website URLs or IP addresses (one per line)")
count = gr.Number(label="Number of Views", value=1)
delay = gr.Number(label="Delay (seconds)", value=1)
parallel_processes = gr.Number(label="Parallel Processes", value=1)
simulate_btn = gr.Button("Simulate")
output = gr.Textbox(label="Output")
simulate_btn.click(
fn=simulate,
inputs=[urls, count, delay, parallel_processes],
outputs=output,
)
interface.launch(server_name="0.0.0.0", server_port=7860, share=True)