Spaces:
Running
Running
import gradio as gr | |
import os | |
import logging | |
import gc | |
import psutil | |
from functools import wraps | |
import time | |
import threading | |
import json | |
from model.generate import generate_test_cases, get_generator, monitor_memory | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Thread-safe initialization | |
_init_lock = threading.Lock() | |
_initialized = False | |
def init_model(): | |
"""Initialize model on startup""" | |
try: | |
# Skip AI model loading in low memory environments | |
memory_mb = psutil.Process().memory_info().rss / 1024 / 1024 | |
if memory_mb > 200 or os.environ.get('HUGGINGFACE_SPACE'): | |
logger.info("β οΈ Skipping AI model loading due to memory constraints") | |
logger.info("π§ Using template-based generation mode") | |
return True | |
logger.info("π Initializing AI model...") | |
generator = get_generator() | |
model_info = generator.get_model_info() | |
logger.info(f"β Model initialized: {model_info['model_name']} | Memory: {model_info['memory_usage']}") | |
return True | |
except Exception as e: | |
logger.error(f"β Model initialization failed: {e}") | |
logger.info("π§ Falling back to template-based generation") | |
return False | |
def check_health(): | |
"""Check system health""" | |
try: | |
memory_mb = psutil.Process().memory_info().rss / 1024 / 1024 | |
return { | |
"status": "healthy" if memory_mb < 450 else "warning", | |
"memory_usage": f"{memory_mb:.1f}MB", | |
"memory_limit": "512MB" | |
} | |
except Exception: | |
return {"status": "unknown", "memory_usage": "unavailable"} | |
def smart_memory_monitor(func): | |
"""Enhanced memory monitoring with automatic cleanup""" | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
try: | |
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024 | |
logger.info(f"π {func.__name__} started | Memory: {initial_memory:.1f}MB") | |
if initial_memory > 400: | |
logger.warning("β οΈ High memory detected, forcing cleanup...") | |
gc.collect() | |
result = func(*args, **kwargs) | |
return result | |
except Exception as e: | |
logger.error(f"β Error in {func.__name__}: {str(e)}") | |
return { | |
"error": "Internal server error occurred", | |
"message": "Please try again or contact support" | |
} | |
finally: | |
final_memory = psutil.Process().memory_info().rss / 1024 / 1024 | |
execution_time = time.time() - start_time | |
logger.info(f"β {func.__name__} completed | Memory: {final_memory:.1f}MB | Time: {execution_time:.2f}s") | |
if final_memory > 450: | |
logger.warning("π§Ή High memory usage, forcing aggressive cleanup...") | |
gc.collect() | |
post_cleanup_memory = psutil.Process().memory_info().rss / 1024 / 1024 | |
logger.info(f"π§Ή Post-cleanup memory: {post_cleanup_memory:.1f}MB") | |
return wrapper | |
def ensure_initialized(): | |
"""Ensure model is initialized (thread-safe)""" | |
global _initialized | |
if not _initialized: | |
with _init_lock: | |
if not _initialized: | |
logger.info("π Gradio app starting up on Hugging Face Spaces...") | |
success = init_model() | |
if success: | |
logger.info("β Startup completed successfully") | |
else: | |
logger.warning("β οΈ Model initialization failed, using template mode") | |
_initialized = True | |
def read_uploaded_file(file_obj): | |
"""Read and extract text from uploaded file""" | |
if file_obj is None: | |
return "" | |
try: | |
file_path = file_obj.name | |
file_extension = os.path.splitext(file_path)[1].lower() | |
if file_extension in ['.txt', '.md']: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
elif file_extension in ['.doc', '.docx']: | |
try: | |
import docx | |
doc = docx.Document(file_path) | |
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) | |
except ImportError: | |
logger.warning("python-docx not available, trying to read as text") | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
elif file_extension == '.pdf': | |
try: | |
import PyPDF2 | |
with open(file_path, 'rb') as f: | |
reader = PyPDF2.PdfReader(f) | |
content = '' | |
for page in reader.pages: | |
content += page.extract_text() + '\n' | |
except ImportError: | |
logger.warning("PyPDF2 not available, cannot read PDF files") | |
return "β PDF support requires PyPDF2. Please install it or use text/Word files." | |
else: | |
# Try to read as plain text | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
logger.info(f"π File read successfully: {len(content)} characters") | |
return content | |
except Exception as e: | |
logger.error(f"β Error reading file: {str(e)}") | |
return f"β Error reading file: {str(e)}" | |
def combine_inputs(prompt_text, uploaded_file): | |
"""Combine prompt text and uploaded file content""" | |
file_content = "" | |
if uploaded_file is not None: | |
file_content = read_uploaded_file(uploaded_file) | |
if file_content.startswith("β"): | |
return file_content # Return error message | |
# Combine both inputs | |
combined_text = "" | |
if prompt_text and prompt_text.strip(): | |
combined_text += "PROMPT:\n" + prompt_text.strip() + "\n\n" | |
if file_content and not file_content.startswith("β"): | |
combined_text += "DOCUMENT CONTENT:\n" + file_content.strip() | |
if not combined_text.strip(): | |
return "β Please provide either text input or upload a document." | |
return combined_text.strip() | |
# Initialize on startup | |
ensure_initialized() | |
def generate_test_cases_api(prompt_text, uploaded_file): | |
"""Main API function for generating test cases with dual input support""" | |
# Combine inputs | |
combined_input = combine_inputs(prompt_text, uploaded_file) | |
if combined_input.startswith("β"): | |
return { | |
"error": combined_input, | |
"test_cases": [], | |
"count": 0 | |
} | |
if len(combined_input) > 8000: | |
logger.warning(f"Input text truncated from {len(combined_input)} to 8000 characters") | |
combined_input = combined_input[:8000] | |
try: | |
logger.info(f"π― Generating test cases for combined input ({len(combined_input)} chars)") | |
test_cases = generate_test_cases(combined_input) | |
if not test_cases or len(test_cases) == 0: | |
logger.error("No test cases generated") | |
return { | |
"error": "Failed to generate test cases", | |
"test_cases": [], | |
"count": 0 | |
} | |
try: | |
generator = get_generator() | |
model_info = generator.get_model_info() | |
model_used = model_info.get("model_name", "Unknown Model") | |
generation_method = model_info.get("status", "unknown") | |
except Exception: | |
model_used = "Template-Based Generator" | |
generation_method = "template_mode" | |
if model_used == "Template-Based Generator": | |
model_algorithm = "Enhanced Rule-based Template" | |
model_reason = "Used enhanced rule-based generation with pattern matching and context analysis." | |
elif "distilgpt2" in model_used: | |
model_algorithm = "Transformer-based LM" | |
model_reason = "Used DistilGPT2 for balanced performance and memory efficiency." | |
elif "DialoGPT" in model_used: | |
model_algorithm = "Transformer-based LM" | |
model_reason = "Used DialoGPT-small as it fits within memory limits and handles conversational input well." | |
else: | |
model_algorithm = "Transformer-based LM" | |
model_reason = "Used available Hugging Face causal LM due to sufficient resources." | |
logger.info(f"β Successfully generated {len(test_cases)} test cases") | |
return { | |
"test_cases": test_cases, | |
"count": len(test_cases), | |
"model_used": model_used, | |
"generation_method": generation_method, | |
"model_algorithm": model_algorithm, | |
"model_reason": model_reason, | |
"input_source": "Combined (Prompt + Document)" if (prompt_text and uploaded_file) else | |
"Document Upload" if uploaded_file else "Text Prompt" | |
} | |
except Exception as e: | |
logger.error(f"β Test case generation failed: {str(e)}") | |
return { | |
"error": "Failed to generate test cases", | |
"message": "Please try again with different input", | |
"test_cases": [], | |
"count": 0 | |
} | |
def format_test_cases_output(result): | |
"""Format the test cases for display""" | |
if "error" in result: | |
return f"β Error: {result['error']}", "" | |
test_cases = result.get("test_cases", []) | |
if not test_cases: | |
return "No test cases generated", "" | |
# Format test cases for display | |
formatted_output = f"β Generated {result['count']} Test Cases\n\n" | |
formatted_output += f"π₯ Input Source: {result.get('input_source', 'Unknown')}\n" | |
formatted_output += f"π€ Model: {result['model_used']}\n" | |
formatted_output += f"π§ Algorithm: {result['model_algorithm']}\n" | |
formatted_output += f"π‘ Reason: {result['model_reason']}\n\n" | |
formatted_output += "=" * 60 + "\n" | |
formatted_output += "GENERATED TEST CASES\n" | |
formatted_output += "=" * 60 + "\n\n" | |
for i, tc in enumerate(test_cases, 1): | |
formatted_output += f"πΉ Test Case {i}:\n" | |
formatted_output += f" ID: {tc.get('id', f'TC_{i:03d}')}\n" | |
formatted_output += f" Title: {tc.get('title', 'N/A')}\n" | |
formatted_output += f" Priority: {tc.get('priority', 'Medium')}\n" | |
formatted_output += f" Category: {tc.get('category', 'Functional')}\n" | |
formatted_output += f" Description: {tc.get('description', 'N/A')}\n" | |
# Pre-conditions | |
preconditions = tc.get('preconditions', []) | |
if preconditions: | |
formatted_output += f" Pre-conditions:\n" | |
for j, precond in enumerate(preconditions, 1): | |
formatted_output += f" β’ {precond}\n" | |
# Test steps | |
steps = tc.get('steps', []) | |
if isinstance(steps, list) and steps: | |
formatted_output += f" Test Steps:\n" | |
for j, step in enumerate(steps, 1): | |
formatted_output += f" {j}. {step}\n" | |
else: | |
formatted_output += f" Test Steps: {steps if steps else 'N/A'}\n" | |
formatted_output += f" Expected Result: {tc.get('expected', 'N/A')}\n" | |
# Post-conditions | |
postconditions = tc.get('postconditions', []) | |
if postconditions: | |
formatted_output += f" Post-conditions:\n" | |
for postcond in postconditions: | |
formatted_output += f" β’ {postcond}\n" | |
formatted_output += f" Test Data: {tc.get('test_data', 'N/A')}\n" | |
formatted_output += "\n" + "-" * 40 + "\n\n" | |
# Return JSON for API access | |
json_output = json.dumps(result, indent=2) | |
return formatted_output, json_output | |
def gradio_generate_test_cases(prompt_text, uploaded_file): | |
"""Gradio interface function""" | |
result = generate_test_cases_api(prompt_text, uploaded_file) | |
return format_test_cases_output(result) | |
def get_system_status(): | |
"""Get system status information""" | |
health_data = check_health() | |
try: | |
generator = get_generator() | |
model_info = generator.get_model_info() | |
except Exception: | |
model_info = { | |
"model_name": "Enhanced Template-Based Generator", | |
"status": "template_mode", | |
"optimization": "memory_safe" | |
} | |
status_info = f""" | |
π₯ SYSTEM STATUS | |
================ | |
Status: {health_data["status"]} | |
Memory Usage: {health_data["memory_usage"]} | |
Memory Limit: 512MB | |
π€ MODEL INFORMATION | |
=================== | |
Model Name: {model_info["model_name"]} | |
Status: {model_info["status"]} | |
Optimization: {model_info.get("optimization", "standard")} | |
π APPLICATION INFO | |
================== | |
Version: 2.0.0-enhanced-input | |
Environment: Hugging Face Spaces | |
Backend: Gradio | |
Features: Text Input + File Upload | |
Supported Files: .txt, .md, .doc, .docx, .pdf | |
""" | |
return status_info | |
def get_model_info_detailed(): | |
"""Get detailed model information""" | |
try: | |
generator = get_generator() | |
info = generator.get_model_info() | |
health_data = check_health() | |
detailed_info = f""" | |
π§ DETAILED MODEL INFORMATION | |
============================ | |
Model Name: {info.get('model_name', 'N/A')} | |
Status: {info.get('status', 'N/A')} | |
Memory Usage: {info.get('memory_usage', 'N/A')} | |
Optimization Level: {info.get('optimization', 'N/A')} | |
π SYSTEM METRICS | |
================ | |
System Status: {health_data['status']} | |
Current Memory: {health_data['memory_usage']} | |
Memory Limit: {health_data.get('memory_limit', 'N/A')} | |
βοΈ CONFIGURATION | |
=============== | |
Environment: {"Hugging Face Spaces" if os.environ.get('SPACE_ID') else "Local"} | |
Backend: Gradio | |
Threading: Enabled | |
Memory Monitoring: Active | |
Input Methods: Text + File Upload | |
File Support: TXT, MD, DOC, DOCX, PDF | |
""" | |
return detailed_info | |
except Exception as e: | |
return f"β Error getting model info: {str(e)}" | |
# Create Gradio interface | |
with gr.Blocks(title="AI Test Case Generator - Enhanced", theme=gr.themes.Soft()) as app: | |
gr.Markdown(""" | |
# π§ͺ AI Test Case Generator - Enhanced Edition | |
Generate comprehensive test cases from Software Requirements Specification (SRS) documents using AI models. | |
**New Features:** | |
- π **Dual Input Support**: Text prompt AND/OR document upload | |
- π **File Upload**: Support for .txt, .md, .doc, .docx, .pdf files | |
- π― **Enhanced Test Cases**: More detailed and comprehensive test case generation | |
- π§ **Improved Templates**: Better rule-based fallback with pattern matching | |
- π **Better Formatting**: Enhanced output with priorities, categories, and conditions | |
""") | |
with gr.Tab("π§ͺ Generate Test Cases"): | |
gr.Markdown("### Choose your input method: Enter text directly, upload a document, or use both!") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# Text input | |
srs_input = gr.Textbox( | |
label="π Text Input (SRS, Requirements, or Prompt)", | |
placeholder="Enter your requirements, user stories, or specific prompt here...\n\nExample:\n- The system shall provide user authentication with username and password\n- Users should be able to login, logout, and reset passwords\n- The system should validate input and display appropriate error messages\n- Performance requirement: Login should complete within 3 seconds", | |
lines=8, | |
max_lines=15 | |
) | |
# File upload | |
file_upload = gr.File( | |
label="π Upload Document (Optional)", | |
file_types=[".txt", ".md", ".doc", ".docx", ".pdf"], | |
type="filepath" | |
) | |
gr.Markdown(""" | |
**π‘ Tips:** | |
- Use **text input** for quick requirements or specific prompts | |
- Use **file upload** for complete SRS documents | |
- Use **both** to combine a specific prompt with a detailed document | |
- Supported formats: TXT, Markdown, Word (.doc/.docx), PDF | |
""") | |
generate_btn = gr.Button("π Generate Test Cases", variant="primary", size="lg") | |
with gr.Column(scale=3): | |
output_display = gr.Textbox( | |
label="π Generated Test Cases", | |
lines=25, | |
max_lines=35, | |
interactive=False | |
) | |
with gr.Row(): | |
json_output = gr.Textbox( | |
label="π JSON Output (for API use)", | |
lines=12, | |
max_lines=20, | |
interactive=False | |
) | |
with gr.Tab("π System Status"): | |
with gr.Column(): | |
status_display = gr.Textbox( | |
label="π₯ System Health & Status", | |
lines=18, | |
interactive=False | |
) | |
refresh_status_btn = gr.Button("π Refresh Status", variant="secondary") | |
with gr.Tab("π§ Model Information"): | |
with gr.Column(): | |
model_info_display = gr.Textbox( | |
label="π€ Detailed Model Information", | |
lines=22, | |
interactive=False | |
) | |
refresh_model_btn = gr.Button("π Refresh Model Info", variant="secondary") | |
with gr.Tab("π API Documentation"): | |
gr.Markdown(""" | |
## π Enhanced API Endpoints | |
This Gradio app supports both text input and file upload through API: | |
### Generate Test Cases (Text Only) | |
**Endpoint:** `/api/predict` | |
**Method:** POST | |
**Body:** | |
```json | |
{ | |
"data": ["Your SRS text here", null] | |
} | |
``` | |
### Generate Test Cases (With File) | |
**Endpoint:** `/api/predict` | |
**Method:** POST (multipart/form-data) | |
- Upload file and include text in the data array | |
**Response Format:** | |
```json | |
{ | |
"data": [ | |
"Formatted test cases output", | |
"JSON output with enhanced test cases" | |
] | |
} | |
``` | |
### Enhanced Test Case Structure | |
```json | |
{ | |
"test_cases": [ | |
{ | |
"id": "TC_001", | |
"title": "Test Case Title", | |
"priority": "High/Medium/Low", | |
"category": "Functional/Security/Performance/UI", | |
"description": "Detailed test description", | |
"preconditions": ["Pre-condition 1", "Pre-condition 2"], | |
"steps": ["Step 1", "Step 2", "Step 3"], | |
"expected": "Expected result", | |
"postconditions": ["Post-condition 1"], | |
"test_data": "Required test data" | |
} | |
], | |
"count": 5, | |
"model_used": "distilgpt2", | |
"model_algorithm": "Enhanced Rule-based Template", | |
"model_reason": "Detailed selection reasoning...", | |
"input_source": "Combined (Prompt + Document)" | |
} | |
``` | |
### Example Usage (Python with File): | |
```python | |
import requests | |
# Text only | |
response = requests.post( | |
"YOUR_SPACE_URL/api/predict", | |
json={"data": ["User login requirements...", None]} | |
) | |
# With file upload (requires multipart handling) | |
files = {'file': open('requirements.pdf', 'rb')} | |
data = {'data': json.dumps(["Additional prompt", "file_placeholder"])} | |
response = requests.post("YOUR_SPACE_URL/api/predict", files=files, data=data) | |
``` | |
## π Supported File Formats | |
- **Text Files**: .txt, .md | |
- **Word Documents**: .doc, .docx (requires python-docx) | |
- **PDF Files**: .pdf (requires PyPDF2) | |
- **Fallback**: Any text-readable format | |
## π― Enhanced Features | |
- **Dual Input**: Combine text prompts with document uploads | |
- **Better Test Cases**: Includes priorities, categories, pre/post-conditions | |
- **Smart Parsing**: Automatically detects requirement types and generates appropriate tests | |
- **Memory Optimized**: Handles large documents efficiently | |
""") | |
# Event handlers | |
generate_btn.click( | |
fn=gradio_generate_test_cases, | |
inputs=[srs_input, file_upload], | |
outputs=[output_display, json_output] | |
) | |
refresh_status_btn.click( | |
fn=get_system_status, | |
outputs=[status_display] | |
) | |
refresh_model_btn.click( | |
fn=get_model_info_detailed, | |
outputs=[model_info_display] | |
) | |
# Load initial status | |
app.load( | |
fn=get_system_status, | |
outputs=[status_display] | |
) | |
app.load( | |
fn=get_model_info_detailed, | |
outputs=[model_info_display] | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
port = int(os.environ.get("PORT", 7860)) | |
logger.info(f"π Starting Enhanced Gradio app on port {port}") | |
logger.info(f"π₯οΈ Environment: {'Hugging Face Spaces' if os.environ.get('SPACE_ID') else 'Local'}") | |
logger.info("π Features: Text Input + File Upload Support") | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=port, | |
share=False, | |
show_error=True | |
) |