Spaces:
Sleeping
Sleeping
import re | |
import json | |
import time | |
import requests | |
import importlib.metadata | |
import gradio as gr | |
import os # Needed for writing files | |
from huggingface_hub import ( | |
create_repo, upload_file, list_models, constants | |
) | |
from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status | |
from google import genai | |
# Import Content and Part types for structured input | |
from google.genai.types import Content, Part | |
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch | |
# --- USER INFO & MODEL LISTING --- | |
def show_profile(profile: gr.OAuthProfile | None) -> str: | |
return f"✅ Logged in as **{profile.username}**" if profile else "*Not logged in.*" | |
def list_private_models( | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None | |
) -> str: | |
# Gradio injects profile and oauth_token automatically when inputs=None | |
# and the function signature has these parameter types. | |
if not profile or not oauth_token or not hasattr(oauth_token, 'token') or not oauth_token.token: | |
return "Please log in to see your models." | |
try: | |
models = [ | |
f"{m.id} ({'private' if m.private else 'public'})" | |
for m in list_models(author=profile.username, token=oauth_token.token) | |
] | |
return "No models found." if not models else "Models:\n\n" + "\n - ".join(models) | |
except Exception as e: | |
# Catching potential API errors during model listing | |
return f"Error listing models: {e}" | |
# --- UTILITIES --- | |
def get_sdk_version(sdk_choice: str) -> str: | |
pkg = "gradio" if sdk_choice == "gradio" else "streamlit" | |
try: | |
return importlib.metadata.version(pkg) | |
except importlib.metadata.PackageNotFoundError: | |
return "UNKNOWN" | |
def classify_errors(logs: str) -> str: | |
errs = set() | |
# Convert logs to lower for case-insensitive matching | |
logs_lower = logs.lower() | |
if "syntaxerror" in logs_lower: | |
errs.add("syntax") | |
elif "importerror" in logs_lower or "modulenotfounderror" in logs_lower: | |
errs.add("import") | |
# Catch common error indicators | |
elif "traceback" in logs_lower or "exception" in logs_lower or "error" in logs_lower: | |
errs.add("runtime/generic") # More general error indication | |
return ", ".join(errs) or "none" | |
# --- HF SPACE LOGGING --- | |
# Modified: Removed 'token' parameter | |
def _get_space_jwt(repo_id: str) -> str: | |
"""Fetches JWT for Space logs using your local HF credentials (HF_TOKEN/config.json).""" | |
jwt_url = f"{constants.ENDPOINT}/api/spaces/{repo_id}/jwt" | |
# Modified: Called build_hf_headers() without token= argument | |
# This picks up credentials from env var or config.json where the app is running | |
r = get_session().get(jwt_url, headers=build_hf_headers()) | |
hf_raise_for_status(r) # Raises HTTPError for bad responses (e.g. 404 if repo doesn't exist) | |
return r.json()["token"] | |
# Modified: Removed 'token' parameter | |
def fetch_logs(repo_id: str, level: str) -> str: | |
"""Fetches build or run logs from an HF Space.""" | |
try: | |
# Modified: Called _get_space_jwt() without token argument | |
# This will now use the HF_TOKEN env var if available in the Space | |
jwt = _get_space_jwt(repo_id) | |
url = f"https://api.hf.space/v1/{repo_id}/logs/{level}" | |
lines = [] | |
# Kept: Still need to pass the fetched JWT to the logs API request headers | |
headers = build_hf_headers(token=jwt) | |
# Use a timeout for the request | |
with get_session().get(url, headers=headers, stream=True, timeout=10) as resp: | |
hf_raise_for_status(resp) | |
# Read lines with a timeout | |
for raw in resp.iter_lines(decode_unicode=True, chunk_size=512): | |
if raw is None: # handle keep-alive or similar | |
continue | |
if raw.startswith("data: "): | |
try: | |
ev = json.loads(raw[len("data: "):]) | |
ts, txt = ev.get("timestamp","N/A"), ev.get("data","") | |
lines.append(f"[{ts}] {txt}") | |
except json.JSONDecodeError: | |
lines.append(f"Error decoding log line: {raw}") | |
except Exception as e: | |
lines.append(f"Unexpected error processing log line: {raw} - {e}") | |
return "\n".join(lines) | |
except requests.exceptions.Timeout: | |
return f"Error: Timeout fetching {level} logs." | |
except requests.exceptions.RequestException as e: | |
# Catch 401 specifically if build_hf_headers() failed to find credentials | |
if e.response and e.response.status_code == 401: | |
return f"Error fetching {level} logs: Authentication failed. Ensure HF_TOKEN env var is set on the Space or you are logged in via huggingface-cli where the app is running." | |
return f"Error fetching {level} logs: {e}" | |
except Exception as e: | |
return f"An unexpected error occurred while fetching logs: {e}" | |
def check_iframe(url: str, timeout: int = 10) -> bool: | |
"""Checks if the iframe URL is reachable and returns a 200 status code.""" | |
# For public spaces, simple request should suffice, no special headers needed here | |
try: | |
response = requests.get(url, timeout=timeout) | |
return response.status_code == 200 | |
except requests.exceptions.RequestException: | |
return False # Any request exception (timeout, connection error, etc.) means it's not accessible | |
# --- AGENT PROMPTS --- | |
SYSTEM_ORCHESTRATOR = { | |
"role": "system", | |
"content": ( | |
"You are **Orchestrator Agent**, the project manager. " | |
"Your role is to guide the development process from user request to a deployed HF Space application. " | |
"You will analyze the current project state (requirements, plan, files, logs, feedback, status, attempt_count) " | |
"and decide the *single* next step/task for the team. " | |
"Output *only* the name of the next task from the following list: " | |
"'PLANNING', 'CODING - {task_description}', 'PUSHING', 'LOGGING', 'DEBUGGING', 'COMPLETE', 'FAILED'. " | |
"If moving to 'CODING', briefly describe the specific area to focus on (e.g., 'CODING - Initial UI', 'CODING - Adding Data Loading', 'CODING - Fixing Import Errors'). " | |
"Analyze the debug feedback and logs carefully to decide the appropriate coding task description." | |
"If the debug feedback indicates 'All clear', transition to 'COMPLETE'." | |
"If maximum attempts are reached or a critical error occurs, transition to 'FAILED'." | |
) | |
} | |
SYSTEM_ARCHITECT = { | |
"role": "system", | |
"content": ( | |
"You are **Architect Agent**, the lead planner. " | |
"Given the user requirements and the current project state, your task is to devise or refine the high-level plan for the application. " | |
"Outline the main features, suggest a logical structure, identify potential files (e.g., `app.py`, `utils.py`, `requirements.txt`), and key components needed. " | |
"The target SDK is {sdk_choice}. The main application file should be `{main_app_file}`. " | |
"Output the plan clearly, using bullet points or a simple numbered list. Do NOT write code. Focus only on the plan." | |
) | |
} | |
SYSTEM_CODEGEN = { | |
"role": "system", | |
"content": ( | |
"You are **Code‑Gen Agent**, a proactive AI developer. " | |
"Your sole responsibility is to author and correct code files based on the plan and the assigned task. " | |
"You will receive the full project state, including the requirements, plan, existing files, and debug feedback. " | |
"Based on the current task assigned by the Orchestrator ('{current_task}'), write or modify the necessary code *only* in the specified file(s). " | |
"Output the *full content* of the updated file(s) in markdown code blocks. " | |
"Each code block must be immediately preceded by the filename in backticks on its own line. " | |
"Use this format exactly: `` `filename.ext` ``\\n```<language>\\ncode goes here\\n```\n" | |
"Provide the *complete* modified code for any file you touch." | |
"Only output the code blocks and their preceding filenames. Do not add extra commentary outside the code blocks." | |
"Crucially, ensure the app uses the specified SDK ({sdk_choice}) and writes its main application logic to the file `{main_app_file}`." # Reinforce main file | |
) | |
} | |
SYSTEM_DEBUG = { | |
"role": "system", | |
"content": ( | |
"You are **Debug Agent**, a meticulous code reviewer and tester. " | |
"You have access to the full project state: requirements, plan, code files, build logs, and run logs. " | |
"Your task is to analyze the logs and code in the context of the plan and requirements. " | |
"Identify errors, potential issues, missing features based on the plan, and suggest concrete improvements or fixes for the Code-Gen agent. " | |
"Pay close attention to the build and run logs for specific errors (SyntaxError, ImportError, runtime errors). " | |
"Also check if the implemented features align with the plan." | |
"If the application appears to be working based on the logs and iframe check, and seems to meet the plan's core requirements, state 'All clear. Project appears complete.' as the *first line* of your feedback." | |
"Otherwise, provide actionable feedback, referencing file names and line numbers where possible. Format feedback clearly." | |
"Example feedback:\n'Error in `app.py`: ModuleNotFoundError for 'missing_library'. Add 'missing_library' to `requirements.txt`.'\n'Issue: The plan required a download button, but it's missing in `app.py`.'\n'Suggestion: Check the loop in `utils.py`, it might cause an infinite loop based on run logs.' " | |
"Do NOT write or suggest large code blocks directly in your feedback. Focus on *what* needs fixing/adding and *why*." | |
) | |
} | |
# --- AGENT RUNNER HELPER --- | |
# MODIFIED AGAIN: Combine system prompt into user message for Gemini generate_content API | |
def run_agent(client, model_name, system_prompt_template, user_input_state, config): | |
"""Helper to run a single agent interaction using the project state as input.""" | |
try: | |
# Format the system prompt using state variables | |
system_prompt = system_prompt_template["content"].format(**user_input_state) | |
except KeyError as e: | |
print(f"Error formatting system prompt: Missing key {e}. Prompt template: {system_prompt_template['content']}") | |
return f"ERROR: Internal agent error - Missing key {e} for prompt formatting." | |
# Prepare the message content by formatting the project state | |
user_message_content = "Project State:\n" + json.dumps(user_input_state, indent=2) | |
model_to_use = model_name | |
try: | |
# MODIFIED: Combine system prompt and user content into a single 'user' message | |
messages = [ | |
Content(role="user", parts=[Part(text=system_prompt + "\n\n" + user_message_content)]) | |
] | |
response = client.models.generate_content( | |
model=model_to_use, | |
contents=messages, # Pass the list of Content objects | |
config=config | |
) | |
if not response.candidates or not response.candidates[0].content: | |
print("Agent returned no candidates or empty content.") | |
if response.prompt_feedback and response.prompt_feedback.block_reason: | |
block_reason = response.prompt_feedback.block_reason | |
print(f"Prompt was blocked. Reason: {block_reason}") | |
return f"ERROR: Agent response blocked by safety filters. Reason: {block_reason.name}" | |
return f"ERROR: Agent returned no response content." | |
response_text = "".join([part.text for part in response.candidates[0].content.parts]) | |
# Log the raw agent output for debugging | |
print(f"--- Raw Agent Response --- ({model_to_use})") | |
print(response_text[:1000] + ('...' if len(response_text) > 1000 else '')) # Print first 1000 chars | |
print("--------------------------") | |
return response_text.strip() | |
except Exception as e: | |
print(f"Agent call failed: {e}") | |
error_details = str(e) | |
if hasattr(e, 'response') and e.response is not None: | |
try: | |
error_json = e.response.json() | |
error_details = json.dumps(error_json, indent=2) | |
except: | |
try: | |
error_details = e.response.text | |
except: | |
pass | |
return f"ERROR: Agent failed - {error_details}" | |
# --- AGENT FUNCTIONS (called by Orchestrator) --- | |
# These functions now expect only the response text from run_agent | |
def run_planner(client, project_state, config): | |
print("Orchestrator: Running Planner Agent...") | |
input_state_for_planner = { | |
"requirements": project_state['requirements'], | |
"sdk_choice": project_state['sdk_choice'], | |
"main_app_file": project_state['main_app_file'], | |
"files": project_state['files'] | |
} | |
response_text = run_agent( | |
client=client, | |
model_name="gemini-2.5-flash-preview-04-17", | |
system_prompt_template=SYSTEM_ARCHITECT, | |
user_input_state=input_state_for_planner, | |
config=config, | |
) | |
if response_text.startswith("ERROR:"): | |
project_state['status_message'] = response_text | |
return False | |
project_state['plan'] = response_text | |
print("Orchestrator: Planner Output Received.") | |
project_state['status_message'] = "Planning complete." | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Plan:**\n{project_state['plan']}"}) | |
return True | |
# Modified: Implemented more flexible code block parsing and logging | |
def run_codegen(client, project_state, config): | |
print(f"Orchestrator: Running Code-Gen Agent for task: {project_state['current_task']}...") | |
input_state_for_codegen = { | |
"current_task": project_state['current_task'], | |
"requirements": project_state['requirements'], | |
"plan": project_state['plan'], | |
"files": project_state['files'], | |
"feedback": project_state['feedback'] or 'None', | |
"sdk_choice": project_state['sdk_choice'], | |
"main_app_file": project_state['main_app_file'] | |
} | |
response_text = run_agent( | |
client=client, | |
model_name="gemini-2.5-flash-preview-04-17", | |
system_prompt_template=SYSTEM_CODEGEN, | |
user_input_state=input_state_for_codegen, | |
config=config, | |
) | |
if response_text.startswith("ERROR:"): | |
project_state['status_message'] = response_text | |
project_state['feedback'] = project_state.get('feedback', '') + "\n\n" + response_text | |
project_state['chat_history'].append({"role": "assistant", "content": response_text}) | |
return False | |
files_updated = {} | |
syntax_errors = [] | |
# Modified Regex: Catches ```lang filename\ncode``` OR `filename`\n```lang\ncode``` | |
# Using named groups and DOTALL flag | |
pattern = re.compile( | |
r"```(?:\w+)?\s*(?P<fname1>[\w\-/]+?\.\w+)\s*\n(?P<code1>[\s\S]+?)```" # Case 1: ```lang filename\ncode``` | |
r"|`(?P<fname2>[\w\-/]+?\.\w+)`\s*\n```(?:\w*)\n(?P<code>[\s\S]+?)```", # Case 2: `filename`\n```lang\ncode``` | |
re.DOTALL # Allow . to match newlines | |
) | |
matches = pattern.finditer(response_text) | |
# Check if any matches were found at all by iterating once | |
match_found = False | |
extracted_blocks = [] | |
for m in matches: | |
match_found = True | |
filename = m.group('fname1') or m.group('fname2') | |
# Corrected: Use 'code' for group 2 | |
code_content = m.group('code1') if m.group('code1') is not None else m.group('code') | |
extracted_blocks.append((filename, code_content, m.start())) | |
# Process extracted blocks | |
for filename, code_content, start_pos in extracted_blocks: | |
if not filename: | |
syntax_errors.append(f"Code block found near position {start_pos} without a clearly parsed filename.") | |
continue | |
if code_content is None: | |
syntax_errors.append(f"Empty code content parsed for file `{filename}` near position {start_pos}.") | |
continue | |
files_updated[filename] = code_content.strip() | |
# Quick syntax check for Python files | |
if filename.endswith('.py'): | |
try: | |
compile(code_content, filename, "exec") | |
except SyntaxError as e: | |
syntax_errors.append(f"Syntax Error in generated `{filename}` near position {start_pos}: {e}") | |
print(f"Syntax Error in generated `{filename}`: {e}") | |
except Exception as e: | |
syntax_errors.append(f"Unexpected error during syntax check for `{filename}` near position {start_pos}: {e}") | |
print(f"Unexpected error during syntax check for `{filename}`: {e}") | |
# Handle cases where no code blocks matched the pattern | |
if not match_found: | |
print("Code-Gen Agent did not output any code blocks matching the expected format.") | |
parse_error_msg = "ERROR: Code-Gen Agent failed to output code blocks in the expected `filename`\\n```code``` or ```lang filename\\ncode``` format." | |
project_state['status_message'] = parse_error_msg | |
project_state['feedback'] = project_state.get('feedback', '') + "\n\n" + parse_error_msg + "\nRaw Agent Response (no matching blocks detected):\n" + response_text[:2000] + "..." | |
project_state['chat_history'].append({"role": "assistant", "content": parse_error_msg + "\nSee Debug Feedback for raw response."}) | |
return False # Indicate failure | |
# Handle cases where blocks were found, but none yielded valid files, or only syntax errors occurred | |
if not files_updated and not syntax_errors: | |
print("Code-Gen Agent outputted text with blocks, but no valid files were parsed.") | |
parse_error_msg = "ERROR: Code-Gen Agent outputted text with blocks, but no valid filenames were parsed from them." | |
project_state['status_message'] = parse_error_msg | |
project_state['feedback'] = project_state.get('feedback', '') + "\n\n" + parse_error_msg + "\nRaw Agent Response (blocks parsed, no filenames):\n" + response_text[:2000] + "..." | |
project_state['chat_history'].append({"role": "assistant", "content": parse_error_msg + "\nSee Debug Feedback for raw response."}) | |
return False | |
if syntax_errors: | |
# If syntax errors found, add them to feedback and signal failure for CodeGen step | |
syntax_error_msg = "ERROR: Code-Gen Agent introduced syntax errors." | |
project_state['feedback'] = syntax_error_msg + "\n" + "\n".join(syntax_errors) + "\n\n" + project_state.get('feedback', '') | |
project_state['status_message'] = syntax_error_msg + " Debugging needed." | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['chat_history'].append({"role": "assistant", "content": "Details:\n" + "\n".join(syntax_errors)}) | |
return False # Indicate failure due to syntax errors | |
project_state['files'].update(files_updated) | |
print(f"Orchestrator: Code-Gen Agent updated files: {list(files_updated.keys())}") | |
# Add the generated/updated code content snippet to the chat history for visibility | |
code_summary = "\n".join([f"`{fn}`:\n```python\n{code[:500]}{'...' if len(code) > 500 else ''}\n```" for fn, code in files_updated.items()]) | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Code Generated/Updated:**\n\n{code_summary}"}) | |
project_state['status_message'] = f"Code generated/updated: {list(files_updated.keys())}" | |
return True | |
def run_debugger(client, project_state, config): | |
print("Orchestrator: Running Debug Agent...") | |
input_state_for_debugger = { | |
"requirements": project_state['requirements'], | |
"plan": project_state['plan'], | |
"files": project_state['files'], | |
"build_logs": project_state['logs'].get('build', 'No build logs.'), | |
"run_logs": project_state['logs'].get('run', 'No run logs.'), | |
"iframe_status": 'Responding OK' if project_state.get('iframe_ok', False) else 'Not responding or check failed.', | |
"error_types_found": classify_errors(project_state['logs'].get('build', '') + '\n' + project_state['logs'].get('run', '')) | |
} | |
response_text = run_agent( | |
client=client, | |
model_name="gemini-2.5-flash-preview-04-17", | |
system_prompt_template=SYSTEM_DEBUG, | |
user_input_state=input_state_for_debugger, | |
config=config, | |
) | |
if response_text.startswith("ERROR:"): | |
project_state['status_message'] = response_text | |
project_state['feedback'] = project_state.get('feedback', '') + "\n\n" + response_text | |
project_state['chat_history'].append({"role": "assistant", "content": response_text}) | |
return False | |
project_state['feedback'] = response_text | |
print("Orchestrator: Debug Agent Feedback Received.") | |
project_state['status_message'] = "Debug feedback generated." | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Debug Feedback:**\n{project_state['feedback']}"}) | |
return True | |
# --- MAIN ORCHESTRATION LOGIC --- | |
def orchestrate_development(client, project_state, config, oauth_token_token): | |
"""Manages the overall development workflow.""" | |
if project_state['current_task'] == 'START': | |
project_state['current_task'] = 'PLANNING' | |
project_state['status_message'] = "Starting project: Initializing and moving to Planning." | |
project_state['chat_history'].append({"role": "assistant", "content": "Project initialized. Starting development team."}) | |
while project_state['status'] == 'In Progress' and project_state['attempt_count'] < 7: | |
print(f"\n--- Attempt {project_state['attempt_count'] + 1} ---") | |
print(f"Current Task: {project_state['current_task']}") | |
current_task = project_state['current_task'] | |
task_message = f"➡️ Task: {current_task}" | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != task_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": task_message}) | |
step_successful = True | |
if current_task == 'PLANNING': | |
step_successful = run_planner(client, project_state, config) | |
if step_successful: | |
project_state['current_task'] = 'CODING - Initial Implementation' | |
if project_state['plan'] and not any("**Plan:**" in msg.get('content', '') for msg in project_state['chat_history']): | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Plan:**\n{project_state['plan']}"}) | |
else: | |
project_state['current_task'] = 'FAILED' | |
elif current_task.startswith('CODING'): | |
if project_state['main_app_file'] not in project_state['files']: | |
print(f"Adding initial stub for {project_state['main_app_file']}...") | |
project_state['files'][project_state['main_app_file']] = f"# Initial {project_state['sdk_choice']} app file\n" | |
if project_state['sdk_choice'] == 'gradio': | |
project_state['files'][project_state['main_app_file']] += "import gradio as gr\n\n# Define a simple interface\n# For example: gr.Interface(...).launch()\n" | |
elif project_state['sdk_choice'] == 'streamlit': | |
project_state['files'][project_state['main_app_file']] += "import streamlit as st\n\n# Your Streamlit app starts here\n# For example: st.write('Hello, world!')\n" | |
if 'requirements.txt' not in project_state['files']: | |
print("Adding initial requirements.txt stub...") | |
# FIX: Specify gradio>=4.0.0 to get gr.Interval/gr.Timer | |
req_content = "pandas\n" + ("streamlit\n" if project_state['sdk_choice']=="streamlit" else "gradio>=4.0.0\n") + "google-generativeai\nhuggingface-hub\n" | |
project_state['files']['requirements.txt'] = req_content | |
if 'README.md' not in project_state['files']: | |
print("Adding initial README.md stub...") | |
readme_content = f"""--- | |
title: {project_state['repo_id']} | |
emoji: 🐢 | |
sdk: {project_state['sdk_choice']} | |
sdk_version: {project_state['sdk_version']} | |
app_file: {project_state['main_app_file']} | |
pinned: false | |
--- | |
# {project_state['repo_id']} | |
This is an auto-generated HF Space. | |
**Requirements:** {project_state['requirements']} | |
**Plan:** | |
{project_state['plan']} | |
""" | |
project_state['files']['README.md'] = readme_content | |
step_successful = run_codegen(client, project_state, config) | |
if step_successful: | |
project_state['current_task'] = 'PUSHING' | |
else: | |
print("Code-Gen step failed. Moving to Debugging.") | |
project_state['current_task'] = 'DEBUGGING' | |
elif current_task == 'PUSHING': | |
try: | |
create_repo(repo_id=project_state['repo_id'], token=oauth_token_token, # Token is needed for pushing! | |
exist_ok=True, repo_type="space", space_sdk=project_state['sdk_choice']) | |
files_to_push = { | |
fn: content | |
for fn, content in project_state['files'].items() | |
if fn and fn.strip() | |
} | |
print(f"Attempting to push {len(files_to_push)} valid files to {project_state['repo_id']}...") | |
for fn, content in files_to_push.items(): | |
dirpath = os.path.dirname(fn) | |
if dirpath: | |
os.makedirs(dirpath, exist_ok=True) | |
filepath = os.path.join(os.getcwd(), fn) | |
with open(filepath, "w") as f: | |
f.write(content) | |
upload_file( | |
path_or_fileobj=filepath, path_in_repo=fn, | |
repo_id=project_state['repo_id'], token=oauth_token_token, # Token is needed for pushing! | |
repo_type="space" | |
) | |
os.remove(filepath) | |
print(f"Pushed {len(files_to_push)} files to {project_state['repo_id']}") | |
project_state['status_message'] = f"Pushed code to HF Space **{project_state['repo_id']}**. Waiting for build..." | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['current_task'] = 'LOGGING' | |
except Exception as e: | |
step_successful = False | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = f"ERROR: Failed to push to HF Space: {e}" | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
print(project_state['status_message']) | |
project_state['current_task'] = 'FINISHED' | |
elif current_task == 'LOGGING': | |
time.sleep(5) | |
wait_time = 5 | |
max_log_wait = 150 | |
elapsed_log_wait = 0 | |
logs_fetched = False | |
iframe_checked = False | |
status_logging_message = "Fetching logs and checking iframe..." | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != status_logging_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": status_logging_message}) | |
project_state['status_message'] = status_logging_message | |
print("Starting orchestration-based log/iframe check loop...") | |
while elapsed_log_wait < max_log_wait: | |
try: | |
# Modified: Call fetch_logs without the token argument (uses HF_TOKEN env var) | |
current_build_logs = fetch_logs(project_state['repo_id'], "build") | |
current_run_logs = fetch_logs(project_state['repo_id'], "run") | |
current_iframe_ok = check_iframe(project_state['iframe_url']) | |
project_state['logs']['build'] = current_build_logs | |
project_state['logs']['run'] = current_run_logs | |
project_state['iframe_ok'] = current_iframe_ok | |
logs_fetched = True | |
iframe_checked = True # If we checked, record it | |
print(f"Orchestration Log/Iframe check at {elapsed_log_wait}s. Build logs len: {len(current_build_logs)}, Run logs len: {len(current_run_logs)}, Iframe OK: {current_iframe_ok}") | |
if project_state['iframe_ok'] or \ | |
"ERROR" in current_build_logs.upper() or "FATAL" in current_build_logs.upper() or \ | |
elapsed_log_wait >= max_log_wait - wait_time or \ | |
("Building" in current_build_logs or len(current_build_logs) > 100) or \ | |
len(current_run_logs) > 0: | |
print("Proceeding to Debugging based on logs/iframe status.") | |
break | |
else: | |
print(f"Logs or iframe not ready. Waiting {wait_time}s...") | |
time.sleep(wait_time) | |
elapsed_log_wait += wait_time | |
wait_time = min(wait_time * 1.5, 20) | |
except Exception as e: | |
print(f"Error during orchestration-based log fetching or iframe check: {e}. Will retry.") | |
time.sleep(wait_time) | |
elapsed_log_wait += wait_time | |
wait_time = min(wait_time * 1.5, 20) | |
if logs_fetched or iframe_checked: | |
project_state['status_message'] = "Logs fetched and iframe checked (or timeout reached)." | |
else: | |
project_state['status_message'] = "Warning: Could not fetch logs or check iframe status within timeout." | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['current_task'] = 'DEBUGGING' | |
elif current_task == 'DEBUGGING': | |
step_successful = run_debugger(client, project_state, config) | |
if step_successful: | |
feedback = project_state['feedback'] | |
iframe_ok = project_state.get('iframe_ok', False) | |
error_types = classify_errors(project_state['logs'].get('build', '') + '\n' + project_state['logs'].get('run', '')) | |
print(f"Debug Analysis - Feedback: {feedback[:100]}... | Iframe OK: {iframe_ok} | Errors: {error_types}") | |
is_complete = ("All clear. Project appears complete." in feedback) or \ | |
(iframe_ok and error_types == "none" and "ERROR" not in feedback.upper() and len(project_state['logs'].get('run', '')) > 10) | |
if is_complete: | |
project_state['status'] = 'Complete' | |
project_state['current_task'] = 'FINISHED' | |
project_state['status_message'] = "Debug Agent reports clear. Project appears complete." | |
elif project_state['attempt_count'] >= 6: | |
project_state['status'] = 'Failed' | |
project_state['current_task'] = 'FINISHED' | |
project_state['status_message'] = f"Max attempts ({project_state['attempt_count']+1}/7) reached after debugging. Project failed." | |
else: | |
project_state['current_task'] = 'CODING - Addressing Feedback' | |
project_state['status_message'] = "Debug Agent found issues. Returning to Coding phase to address feedback." | |
project_state['attempt_count'] += 1 | |
backoff_wait = min(project_state['attempt_count'] * 5, 30) | |
print(f"Waiting {backoff_wait} seconds before next coding attempt...") | |
time.sleep(backoff_wait) | |
else: | |
project_state['status'] = 'Failed' | |
project_state['current_task'] = 'FINISHED' | |
elif current_task == 'FINISHED': | |
pass | |
else: | |
step_successful = False | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = f"ERROR: Orchestrator entered an unknown task state: {current_task}" | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
print(project_state['status_message']) | |
project_state['current_task'] = 'FINISHED' | |
if not step_successful and project_state['status'] == 'In Progress': | |
print(f"Orchestration step '{current_task}' failed, but status is still 'In Progress'. Transitioning to DEBUGGING or FAILED.") | |
if project_state['attempt_count'] >= 6: | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = project_state.get('status_message', f'An unexpected error caused task failure: {current_task}') | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['current_task'] = 'FINISHED' | |
else: | |
project_state['current_task'] = 'DEBUGGING' | |
if project_state['status'] == 'In Progress': | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = project_state.get('status_message', 'Orchestration loop exited unexpectedly.') | |
final_outcome_message = f"**Project Outcome:** {project_state['status']} - {project_state['status_message']}" | |
if not project_state['chat_history'] or not project_state['chat_history'][-1].get('content', '').strip().startswith("**Project Outcome:"): | |
project_state['chat_history'].append({"role": "assistant", "content": final_outcome_message}) | |
if project_state['status'] == 'Complete': | |
completion_message = f"✅ Application deployed successfully (likely)! Check the preview above: [https://huggingface.co/spaces/{project_state['repo_id']}](https://huggingface.co/spaces/{project_state['repo_id']})" | |
if not project_state['chat_history'] or not project_state['chat_history'][-1].get('content', '').strip().startswith("✅ Application deployed successfully"): | |
project_state['chat_history'].append({"role": "assistant", "content": completion_message}) | |
elif project_state['status'] == 'Failed': | |
failure_message = "❌ Project failed to complete. Review logs and feedback for details." | |
if not project_state['chat_history'] or not project_state['chat_history'][-1].get('content', '').strip().startswith("❌ Project failed to complete."): | |
project_state['chat_history'].append({"role": "assistant", "content": failure_message}) | |
return ( | |
project_state['chat_history'], | |
project_state['logs'].get('build', 'No build logs.'), | |
project_state['logs'].get('run', 'No run logs.'), | |
(f'<iframe src="{project_state["iframe_url"]}" width="100%" height="500px" allow="accelerometer; ambient-light-sensor; autoplay; camera; gyroscope; hid; fullscreen; illustration; xr-spatial-tracking; sync-xhr;" sandbox="allow-forms allow-modals allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts allow-storage-access-by-user-activation allow-top-navigation-by-user-activation" frameborder="0"></iframe>' | |
+ ("" if project_state.get('iframe_ok') else "<p style='color:red;'>⚠️ iframe not responding or check failed.</p>")), | |
project_state['status_message'] | |
) | |
# --- Create a unified updater function (for the poller) --- | |
# Place this function *before* the gr.Blocks() definition | |
def _update_logs_and_preview(profile_token_state, space_name): | |
"""Fetches logs and checks iframe status for the poller.""" | |
profile, token = profile_token_state # profile_token_state is the state from the login_btn component | |
if not profile or not token or not token.token: | |
return "Login required", "Login required", "<p>Please log in.</p>" | |
if not space_name or not space_name.strip(): | |
return "Enter Space name", "Enter Space name", "<p>Enter a Space name.</p>" | |
clean = space_name.strip() | |
rid = f"{profile.username}/{clean}" | |
url = f"https://huggingface.co/spaces/{profile.username}/{clean}" | |
# Fetch logs and check iframe status using the revised functions | |
try: | |
# Modified: Call fetch_logs without the token argument (uses HF_TOKEN env var) | |
b = fetch_logs(rid, "build") | |
except Exception as e: | |
b = f"Error fetching build logs: {e}" | |
print(f"Poller error fetching build logs for {rid}: {e}") | |
try: | |
# Modified: Call fetch_logs without the token argument (uses HF_TOKEN env var) | |
r = fetch_logs(rid, "run") | |
except Exception as e: | |
r = f"Error fetching run logs: {e}" | |
print(f"Poller error fetching run logs for {rid}: {e}") | |
try: | |
# check_iframe does not need authentication for public spaces | |
ok = check_iframe(url) | |
except Exception as e: | |
ok = False | |
print(f"Poller error checking iframe {url}: {e}") | |
preview_html = ( | |
f'<iframe src="{url}" width="100%" height="500px" allow="accelerometer; ambient-light-sensor; autoplay; camera; gyroscope; hid; fullscreen; illustration; xr-spatial-tracking; sync-xhr;" sandbox="allow-forms allow-modals allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts allow-storage-access-by-user-activation allow-top-navigation-by-user-activation" frameborder="0"></iframe>' | |
if ok else | |
f"<p style='color:red;'>⚠️ iframe not responding yet ({url}). Make sure the Space name is correct and wait for the build to complete. Check build logs for errors.</p>" | |
) | |
return b, r, preview_html | |
# --- MAIN HANDLER (Called by Gradio) --- | |
# MOVED THIS FUNCTION BLOCK HERE, BEFORE gr.Blocks() | |
# Updated signature to include space_name | |
def handle_user_message( | |
history, | |
user_input: str, | |
sdk_choice: str, | |
gemini_api_key: str, | |
space_name: str, # <-- New parameter for Space name | |
grounding_enabled: bool, | |
temperature: float, | |
max_output_tokens: int, | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None # Gradio auto-injects. Keep to pass token to orchestrate_development for PUSHING. | |
): | |
if not history or history[-1].get("role") != "user" or history[-1].get("content") != user_input: | |
history.append({"role": "user", "content": user_input}) | |
if not space_name or not space_name.strip(): | |
msg = "⚠️ Please enter a Space name." | |
if not history or history[-1].get("content") != msg: | |
history.append({"role":"assistant","content":msg}) | |
return history, "", "", "<p>Enter a Space name.</p>", msg | |
if not profile or not oauth_token or not oauth_token.token: | |
error_msg = "⚠️ Please log in first via the Hugging Face button." | |
if not history or history[-1].get("content") != error_msg: | |
history.append({"role":"assistant","content":error_msg}) | |
return history, "", "", "<p>Please log in.</p>", "Login required." | |
if not gemini_api_key: | |
error_msg = "⚠️ Please provide your Gemini API Key." | |
if not history or history[-1].get("content") != error_msg: | |
history.append({"role":"assistant","content":error_msg}) | |
return history, "", "", "<p>Please provide API Key.</p>", "API Key required." | |
if not user_input or user_input.strip() == "": | |
error_msg = "Please enter requirements for the application." | |
if not history or history[-1].get("content") != error_msg: | |
history.append({"role":"assistant","content":error_msg}) | |
return history, "", "", "<p>Enter requirements.</p>", "Waiting for prompt." | |
client = genai.Client(api_key=gemini_api_key) | |
sdk_version = get_sdk_version(sdk_choice) | |
code_fn = "app.py" if sdk_choice == "gradio" else "streamlit_app.py" | |
user_prompt = user_input | |
clean_space_name = space_name.strip() | |
repo_id = f"{profile.username}/{clean_space_name}" | |
iframe_url = f"https://huggingface.co/spaces/{profile.username}/{clean_space_name}" | |
project_state = { | |
'requirements': user_prompt, | |
'plan': '', | |
'files': {}, | |
'logs': {'build': '', 'run': ''}, # Initial state, will be updated during orchestration | |
'feedback': '', | |
'current_task': 'START', | |
'status': 'In Progress', | |
'status_message': 'Initializing...', | |
'attempt_count': 0, | |
'sdk_choice': sdk_choice, | |
'sdk_version': sdk_version, | |
'repo_id': repo_id, | |
'iframe_url': iframe_url, | |
'main_app_file': code_fn, | |
'chat_history': history[:] | |
} | |
cfg = GenerateContentConfig( | |
tools=[Tool(google_search=GoogleSearch())] if grounding_enabled else [], | |
response_modalities=["TEXT"], | |
temperature=temperature, | |
max_output_tokens=int(max_output_tokens), | |
) | |
# Pass the token here for pushing files | |
final_history, final_build_logs, final_run_logs, final_iframe_html, final_status_message = orchestrate_development( | |
client, project_state, cfg, oauth_token.token | |
) | |
return ( | |
final_history, | |
final_build_logs, | |
final_run_logs, | |
final_iframe_html, | |
final_status_message | |
) | |
# --- SIMPLE UI WITH HIGHER MAX TOKENS & STATUS DISPLAY --- | |
with gr.Blocks(title="HF Space Auto‑Builder (Team AI)") as demo: | |
gr.Markdown("## 🐢 HF Space Auto‑Builder (Team AI)\nUse AI agents to build and deploy a simple Gradio or Streamlit app on a Hugging Face Space.") | |
gr.Markdown("1) Log in with Hugging Face. 2) Enter your Gemini API Key. 3) Enter a Space name. 4) Provide app requirements. 5) Click 'Start Development Team' and watch the process.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# --- LOGIN BUTTON / PROFILE & MODEL LISTING --- | |
login_btn = gr.LoginButton(variant="huggingface", size="lg") | |
status_md = gr.Markdown("*Not logged in.*") | |
models_md = gr.Markdown() | |
demo.load(show_profile, inputs=None, outputs=status_md, api_name="load_profile") | |
demo.load(list_private_models, inputs=None, outputs=models_md, api_name="load_models") | |
login_btn.click( | |
fn=show_profile, | |
inputs=None, | |
outputs=status_md, | |
api_name="login_profile" | |
) | |
login_btn.click( | |
fn=list_private_models, | |
inputs=None, | |
outputs=models_md, | |
api_name="login_models" | |
) | |
# --- END LOGIN FIX --- | |
gr.Markdown("---") | |
sdk_choice = gr.Radio(["gradio","streamlit"], value="gradio", label="SDK", info="Choose the framework for your app.") | |
api_key = gr.Textbox(label="Gemini API Key", type="password", info="Get one from Google AI Studio.") | |
space_name = gr.Textbox( | |
label="Space name", | |
placeholder="e.g. test, example, my-cool-space", | |
info="Becomes username/this-name on HF" | |
) | |
grounding = gr.Checkbox(label="Enable Google Search (Grounding)", value=False, info="Allow agents to use Google Search.") | |
temp = gr.Slider(0,1,value=0.2, label="Temperature", info="Creativity of agents. Lower is more focused.") | |
max_tokens = gr.Number(value=4096, label="Max Output Tokens", minimum=1000, info="Max length of agent responses (code, feedback, etc.). Recommend 4096+.") | |
with gr.Column(scale=2): | |
project_status_md = gr.Markdown("Waiting for prompt...") | |
chatbot = gr.Chatbot(type="messages", label="Team Communication & Status", show_copy_button=True) | |
user_in = gr.Textbox(placeholder="Describe the application you want to build...", label="Application Requirements", lines=3) | |
send_btn = gr.Button("🚀 Start Development Team") | |
with gr.Accordion("Logs", open=False): | |
build_box = gr.Textbox(label="Build logs", lines=10, interactive=False, max_lines=20) | |
run_box = gr.Textbox(label="Run logs", lines=10, interactive=False, max_lines=20) | |
# Manual refresh button REMOVED | |
with gr.Accordion("App Preview", open=True): | |
preview = gr.HTML("<p>App preview will load here when available.</p>") | |
# --- Hidden poller (using Timer) --- | |
log_poller = gr.Timer(value=2, active=True, render=False) | |
# --- End hidden poller --- | |
# handle_user_message is defined ABOVE this block | |
send_btn.click( | |
fn=handle_user_message, | |
inputs=[ | |
chatbot, | |
user_in, | |
sdk_choice, | |
api_key, | |
space_name, | |
grounding, | |
temp, | |
max_tokens, | |
], | |
outputs=[chatbot, build_box, run_box, preview, project_status_md] | |
) | |
user_in.submit( | |
fn=handle_user_message, | |
inputs=[ | |
chatbot, | |
user_in, | |
sdk_choice, | |
api_key, | |
space_name, | |
grounding, | |
temp, | |
max_tokens, | |
], | |
outputs=[chatbot, build_box, run_box, preview, project_status_md] | |
) | |
# Manual refresh handler REMOVED | |
# --- Wire the poller to that function --- | |
# _update_logs_and_preview is defined ABOVE this block | |
log_poller.tick( | |
fn=_update_logs_and_preview, | |
inputs=[login_btn, space_name], # Pass login button state (for profile/token) and the space_name textbox | |
outputs=[build_box, run_box, preview] # Update the log textareas and the preview iframe | |
) | |
# --- End wire poller --- | |
# Clean up files created during the process when the app stops (optional) | |
# demo.on_event("close", lambda: [os.remove(f) for f in os.listdir() if os.path.isfile(f) and (f.endswith(".py") or f.endswith(".txt") or f.endswith(".md"))]) | |
demo.launch(server_name="0.0.0.0", server_port=7860) |