import gradio as gr from huggingface_hub import HfApi, whoami from config import howManyModelsToUse,num_models,max_images,inference_timeout,MAX_SEED,thePrompt,preSetPrompt,negPreSetPrompt from all_models import models import asyncio import os import logging from fastapi import HTTPException import http.client import json import ssl import pandas as pd import re from datetime import datetime from threading import RLock lock = RLock() HF_TOKEN = os.environ.get("ohgoddamn") if os.environ.get("ohgoddamn") else None # If private or gated models aren't used, ENV setting is unnecessary. stop_event = asyncio.Event() default_models = models[:howManyModelsToUse] api = HfApi() user_info = whoami(token=HF_TOKEN) username = user_info["name"] print(f"{username}") print(f"{username}") print(f"{username}") print(f"{username}") from handle_models import load_fn,infer,gen_fn from externalmod import gr_Interface_load, save_image, randomize_seed def extend_choices(choices): return choices[:num_models] + (num_models - len(choices[:num_models])) * ['NA'] def update_imgbox(choices): choices_plus = extend_choices(choices[:num_models]) return [gr.Image(None, label=m, visible=(m!='NA')) for m in choices_plus] def random_choices(): import random random.seed() return random.choices(models, k=num_models) load_fn(models,HF_TOKEN) #from huggingface_hub import InferenceClient #client = InferenceClient( provider="hf-inference", # api_key="hf_xxxxxxxxxxxxxxxxxxxxxxxx",) #image = client.text_to_image( "Astronaut riding a horse", model="charliebaby2023/cybrpny",) # === CONFIG === host = "api-inference.huggingface.co" endpoint = "/models/charliebaby2023/cybrpny" token = HF_TOKEN prompt = "a futuristic city on Mars at sunset" # === REQUEST SETUP === body = json.dumps({ "inputs": prompt }) headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "User-Agent": "PythonRawClient/1.0" } # === CONNECTION === context = ssl.create_default_context() conn = http.client.HTTPSConnection(host, context=context) # === RAW REQUEST === print("๐Ÿ”ธ REQUEST LINE:") print(f"POST {endpoint} HTTP/1.1") print(f"Host: {host}") for key, value in headers.items(): print(f"{key}: {value}") print(f"\n{body}\n") # Send request conn.request("POST", endpoint, body=body, headers=headers) # === RAW RESPONSE === response = conn.getresponse() print("๐Ÿ”น STATUS:", response.status, response.reason) print("๐Ÿ”น RESPONSE HEADERS:") for hdr in response.getheaders(): print(f"{hdr[0]}: {hdr[1]}") print("\n๐Ÿ”น RESPONSE BODY (raw):") raw = response.read() try: print(raw.decode("utf-8")[:1000]) # print first 1k chars except UnicodeDecodeError: print("[binary data]") def query_model(model_name,prompt): logs = [] img_out = None host = "api-inference.huggingface.co" endpoint = f"/models/{model_name}" # Prepare request body = json.dumps({"inputs": prompt}) headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "User-Agent": "PythonRawClient/1.0" } # Connect context = ssl.create_default_context() conn = http.client.HTTPSConnection(host, context=context) logs.append(f"๐Ÿ“ค POST {endpoint}") logs.append(f"Headers: {headers}") logs.append(f"Body: {body}\n") try: conn.request("POST", endpoint, body=body, headers=headers) response = conn.getresponse() logs.append(f"๐Ÿ“ฅ Status: {response.status} {response.reason}") logs.append("Headers:") for k, v in response.getheaders(): logs.append(f"{k}: {v}") raw = response.read() try: text = raw.decode("utf-8") result = json.loads(text) logs.append("\nBody:\n" + text[:1000]) except: result = raw logs.append("\nโš ๏ธ Binary response.") # === HANDLE RESPONSE === def show(img_bytes): try: img = Image.open(BytesIO(img_bytes)) return img except Exception as e: logs.append(f"โŒ Failed to open image: {e}") return None if isinstance(result, dict): if "image" in result: logs.append("๐Ÿง  Found base64 image in 'image'") return show(base64.b64decode(result["image"])), "\n".join(logs) elif "url" in result or "image_url" in result: url = result.get("url") or result.get("image_url") logs.append(f"๐ŸŒ Found image URL: {url}") r = requests.get(url) return show(r.content), "\n".join(logs) else: logs.append("โš ๏ธ No image found in response.") return None, "\n".join(logs) elif isinstance(result, bytes): logs.append("๐Ÿงพ Raw image bytes returned.") return show(result), "\n".join(logs) else: logs.append("โŒ Unknown response format.") return None, "\n".join(logs) except Exception as e: logs.append(f"๐Ÿ’ฅ Exception: {e}") return None, "\n".join(logs) # === GRADIO UI === def query_model2(model_name, prompt): logs = [] img_out = None try: model = gr.Interface.load(f"models/{model_name}", token=HF_TOKEN) logs.append(f"Prompt: {prompt}") response = model.predict(prompt) logs.append(f"Model response: {response}") def get_image_from_response(response): if isinstance(response, dict): if "image" in response: img_data = base64.b64decode(response["image"]) img = Image.open(BytesIO(img_data)) return img elif "url" in response or "image_url" in response: url = response.get("url") or response.get("image_url") img_data = requests.get(url).content img = Image.open(BytesIO(img_data)) return img elif isinstance(response, bytes): img = Image.open(BytesIO(response)) return img return None img_out = get_image_from_response(response) except Exception as e: logs.append(f"Error: {e}") response = None return img_out, "\n".join(logs) #print(f"Time launched: {hms()}") pattern = r'HTTP/1\.1" (\d{3}) \d+' class ErrorCodeLogHandler(logging.Handler): def __init__(self): super().__init__() self.last_error_code = None # Store the last error code #printed self.model_name_pattern = r'Model\s+(\S+)' # Pattern to extract model name (adjust this regex to your needs) def emit(self, record): log_message = self.format(record) # Get the log message from the record error_code = self.extract_error_code(log_message) # Extract error code model_name = self.extract_model_name(log_message) # Extract model name if error_code and error_code != self.last_error_code: #print(f'Error code: {error_code} | Model: {model_name}') # #print both error code and model name self.last_error_code = error_code # Update the last #printed error code def extract_error_code(self, log_message): match = re.search(pattern, log_message) if match: return match.group(1) # Return the current error code return None # Return None if no match is found def extract_model_name(self, log_message): match = re.search(self.model_name_pattern, log_message) if match: return match.group(1) # Return the model name or identifier return "Unknown model" # Return a default value if no model name is found def debugon(): print(f"DEBUGGING MODE : ON ") logging.basicConfig(level=logging.DEBUG, format='%(message)s') error_handler = ErrorCodeLogHandler() print(f"{error_handler}") logging.getLogger().addHandler(error_handler) def debugoff(): print(f"DEBUGGING MODE : OFF ") logging.basicConfig(level=logging.WARNING, format='%(message)s') error_handler = ErrorCodeLogHandler() print(f"{error_handler}") logging.getLogger().addHandler(error_handler) def handle_debug_mode(selected_option): if selected_option == "debug on": debugon() else: debugoff() def stop_all_tasks(): print("Stopping...") stop_event.set() with gr.Blocks(fill_width=True) as demo: with gr.Tab(label="DEBUG"): with gr.Row(): radio = gr.Radio(["debug on", "debug off"], value="debug off", label=" Debug mode: activated in output log", interactive=True) radio.change(handle_debug_mode, radio, None) with gr.Tab(str(num_models) + ' Models'): with gr.Column(scale=2): with gr.Group(): txt_input = gr.Textbox(label='Your prompt:', value=preSetPrompt, lines=3, autofocus=1) neg_input = gr.Textbox(label='Negative prompt:', value=negPreSetPrompt, lines=1) timeout = gr.Slider(label="Timeout (seconds)", minimum=5, maximum=300, value=120, step=1) with gr.Accordion("Advanced", open=False, visible=True): with gr.Row(): width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=1216, step=32, value=0) height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=1216, step=32, value=0) with gr.Row(): steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0) cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0) seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1) seed_rand = gr.Button("Randomize Seed ๐ŸŽฒ", size="sm", variant="secondary") seed_rand.click(randomize_seed, None, [seed], queue=False) with gr.Row(): gen_button = gr.Button(f'Generate up to {int(num_models)} images', variant='primary', scale=3, elem_classes=["butt"]) random_button = gr.Button(f'Randomize Models', variant='secondary', scale=1) with gr.Column(scale=1): with gr.Group(): with gr.Row(): output = [gr.Image(label=m, show_download_button=True, elem_classes=["image-monitor"], interactive=False, width=112, height=112, show_share_button=False, format="png", visible=True) for m in default_models] current_models = [gr.Textbox(m, visible=False) for m in default_models] #for m, o in zip(current_models, output): # gen_event = gr.on(triggers=[gen_button.click, txt_input.submit], fn=gen_fn, # inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed], outputs=[o], # concurrency_limit=None, queue=False) for m, o in zip(current_models, output): gen_button.click( fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed, timeout], outputs=[o],queue=False) #concurrency_limit=None, txt_input.submit( fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed, timeout], outputs=[o],queue=False) with gr.Column(scale=4): with gr.Accordion('Model selection'): model_choice = gr.CheckboxGroup(models, label = f'Choose up to {int(num_models)} different models from the {len(models)} available!', value=models, interactive=True) model_choice.change(update_imgbox, model_choice, output) model_choice.change(extend_choices, model_choice, current_models) random_button.click(random_choices, None, model_choice) stop_button = gr.Button("Stop ๐Ÿ›‘", variant="stop") stop_button.click( fn=stop_all_tasks, inputs=[], outputs=[] ) demo.launch(show_api=True, max_threads=400) ''' with gr.Blocks(fill_width=True) as demo: with gr.Row(): gr.Markdown(f"# ({username}) you are logged in") #model_selector = gr.CheckboxGroup(choices=model_ids,value=model_ids, label="your models", interactive=True, ) #output_box = gr.Textbox(lines=10, label="Selected Models") #model_selector.change(fn=handle_model_selection, inputs=model_selector, outputs=output_box) source_selector = gr.CheckboxGroup(choices=source_choices, label="Model Source", value=["Combined"], interactive=True) output = gr.Textbox(label="Selected Model Summary") with gr.Tab(str(num_models) + ' Models'): with gr.Column(scale=2): with gr.Group(): txt_input = gr.Textbox(label='Your prompt:', value=preSetPrompt, lines=3, autofocus=1) with gr.Accordion("Advanced", open=False, visible=True): with gr.Row(): neg_input = gr.Textbox(label='Negative prompt:', value=negPreSetPrompt, lines=1) with gr.Row(): width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=1216, step=32, value=0) height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=1216, step=32, value=0) with gr.Row(): steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0) cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0) seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1) seed_rand = gr.Button("Randomize Seed ๐ŸŽฒ", size="sm", variant="secondary") seed_rand.click(randomize_seed, None, [seed], queue=False) with gr.Row(): gen_button = gr.Button(f'Generate up to {int(num_models)} images', variant='primary', scale=3, elem_classes=["butt"]) random_button = gr.Button(f'Randomize Models', variant='secondary', scale=1) with gr.Column(scale=1): with gr.Group(): with gr.Row(): output = [gr.Image(label=m, show_download_button=True, elem_classes=["image-monitor"], interactive=False, width=112, height=112, show_share_button=False, format="png", visible=True) for m in default_models] current_models = [gr.Textbox(m, visible=False) for m in default_models] for m, o in zip(current_models, output): gen_event = gr.on(triggers=[gen_button.click, txt_input.submit], fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed], outputs=[o], concurrency_limit=None, queue=False) with gr.Column(scale=4): with gr.Accordion('Model selection'): #model_choice = gr.CheckboxGroup(models, label = f'Choose up to {int(num_models)} different models from the {len(models)} available!', value=default_models, interactive=True) #model_choice.change(update_imgbox, model_choice, output) #model_choice.change(extend_choices, model_choice, current_models) model_choice = gr.CheckboxGroup(choices=combined_models, label="Models", value=combined_models[:20], interactive=True) source_selector.change(update_model_choice, source_selector, model_choice) model_choice.change(handle_model_selection, model_choice, output) model_choice.change(update_imgbox, model_choice, output) model_choice.change(extend_choices, model_choice, current_models) random_button.click(random_choices, None, model_choice) ''' ''' # --- Step 2: Fetch user's Spaces spaces = list(api.list_spaces(author=username, token=HF_TOKEN)) space_df = pd.DataFrame([{"Space Name": f"{space.id.split('/')[-1]}", "Last Modified": space.lastModified,} for space in spaces]) def load_space_files(evt: gr.SelectData): clicked_html = evt.value space_id = clicked_html.split("data-space='")[1].split("'")[0] files = api.list_repo_files(repo_id=space_id, repo_type="space", token=HF_TOKEN) file_df = pd.DataFrame([{ "File": f"{file}" } for file in files]) return file_df # --- Step 4: Build Gradio interface gr.Markdown(f"# Hugging Face Spaces for `{username}`") with gr.Row(): left_df = gr.Dataframe(value=space_df, label="Your Spaces (click a name)", interactive=False, datatype="str", max_rows=len(space_df), wrap=True ) right_df = gr.Dataframe( value=pd.DataFrame(columns=["File"]), label="Files in Selected Space", interactive=False, wrap=True ) left_df.select(fn=load_space_files, outputs=right_df) '''