Spaces:
Build error
Build error
| import os | |
| import subprocess | |
| from huggingface_hub import InferenceClient | |
| import gradio as gr | |
| import random | |
| import prompts | |
| import time | |
| from typing import List, Dict | |
| from gensim import corpora, models | |
| from gensim.summarization import summarize | |
| # Simulated agent and tool libraries | |
| AGENT_TYPES = [ | |
| "Task Executor", | |
| "Information Retriever", | |
| "Decision Maker", | |
| "Data Analyzer", | |
| ] | |
| TOOL_TYPES = [ | |
| "Web Scraper", | |
| "Database Connector", | |
| "API Caller", | |
| "File Handler", | |
| "Text Processor", | |
| ] | |
| # Initialize Hugging Face client | |
| client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
| VERBOSE = False | |
| MAX_HISTORY = 100 | |
| MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| # Import necessary prompts and functions from the existing code | |
| from prompts import ( | |
| ACTION_PROMPT, | |
| ADD_PROMPT, | |
| COMPRESS_HISTORY_PROMPT, | |
| LOG_PROMPT, | |
| LOG_RESPONSE, | |
| MODIFY_PROMPT, | |
| PREFIX, | |
| READ_PROMPT, | |
| TASK_PROMPT, | |
| UNDERSTAND_TEST_RESULTS_PROMPT, | |
| ) | |
| from .utils import parse_action, parse_file_content, read_python_module_structure | |
| from flask import Flask, request, jsonify | |
| class Agent: | |
| def __init__(self, name: str, agent_type: str, complexity: int): | |
| self.name = name | |
| self.type = agent_type | |
| self.complexity = complexity | |
| self.tools = [] | |
| def add_tool(self, tool): | |
| self.tools.append(tool) | |
| def __str__(self): | |
| return f"{self.name} ({self.type}) - Complexity: {self.complexity}" | |
| class Tool: | |
| def __init__(self, name: str, tool_type: str): | |
| self.name = name | |
| self.type = tool_type | |
| def __str__(self): | |
| return f"{self.name} ({self.type})" | |
| class Pypelyne: | |
| def __init__(self): | |
| self.agents: List[Agent] = [] | |
| self.tools: List[Tool] = [] | |
| self.history = "" | |
| self.task = None | |
| self.purpose = None | |
| self.directory = None | |
| def add_agent(self, agent: Agent): | |
| self.agents.append(agent) | |
| def add_tool(self, tool: Tool): | |
| self.tools.append(tool) | |
| def generate_chat_app(self): | |
| time.sleep(2) # Simulate processing time | |
| return f"Chat app generated with {len(self.agents)} agents and {len(self.tools)} tools." | |
| def run_gpt(self, prompt_template, stop_tokens, max_tokens, **prompt_kwargs): | |
| content = PREFIX.format( | |
| module_summary=read_python_module_structure(self.directory)[0], | |
| purpose=self.purpose, | |
| ) + prompt_template.format(**prompt_kwargs) | |
| if VERBOSE: | |
| print(LOG_PROMPT.format(content)) | |
| stream = client.text_generation( | |
| prompt=content, | |
| max_new_tokens=max_tokens, | |
| stop_sequences=stop_tokens if stop_tokens else None, | |
| do_sample=True, | |
| temperature=0.7, | |
| ) | |
| resp = "".join(token for token in stream) | |
| if VERBOSE: | |
| print(LOG_RESPONSE.format(resp)) | |
| return resp | |
| def compress_history(self): | |
| resp = self.run_gpt( | |
| COMPRESS_HISTORY_PROMPT, | |
| stop_tokens=["observation:", "task:", "action:", "thought:"], | |
| max_tokens=512, | |
| task=self.task, | |
| history=self.history, | |
| ) | |
| self.history = f"observation: {resp}\n" | |
| def run_action(self, action_name, action_input): | |
| if action_name == "COMPLETE": | |
| return "Task completed." | |
| if len(self.history.split("\n")) > MAX_HISTORY: | |
| if VERBOSE: | |
| print("COMPRESSING HISTORY") | |
| self.compress_history() | |
| action_funcs = { | |
| "MAIN": self.call_main, | |
| "UPDATE-TASK": self.call_set_task, | |
| "MODIFY-FILE": self.call_modify, | |
| "READ-FILE": self.call_read, | |
| "ADD-FILE": self.call_add, | |
| "TEST": self.call_test, | |
| } | |
| if action_name not in action_funcs: | |
| return f"Unknown action: {action_name}" | |
| print(f"RUN: {action_name} {action_input}") | |
| return action_funcs[action_name](action_input) | |
| def call_main(self, action_input): | |
| resp = self.run_gpt( | |
| ACTION_PROMPT, | |
| stop_tokens=["observation:", "task:"], | |
| max_tokens=256, | |
| task=self.task, | |
| history=self.history, | |
| ) | |
| lines = resp.strip().strip("\n").split("\n") | |
| for line in lines: | |
| if line == "": | |
| continue | |
| if line.startswith("thought: "): | |
| self.history += f"{line}\n" | |
| elif line.startswith("action: "): | |
| action_name, action_input = parse_action(line) | |
| self.history += f"{line}\n" | |
| return self.run_action(action_name, action_input) | |
| return "No valid action found." | |
| def call_set_task(self, action_input): | |
| self.task = self.run_gpt( | |
| TASK_PROMPT, | |
| stop_tokens=[], | |
| max_tokens=64, | |
| task=self.task, | |
| history=self.history, | |
| ).strip("\n") | |
| self.history += f"observation: task has been updated to: {self.task}\n" | |
| return f"Task updated: {self.task}" | |
| def call_modify(self, action_input): | |
| if not os.path.exists(action_input): | |
| self.history += "observation: file does not exist\n" | |
| return "File does not exist." | |
| content = read_python_module_structure(self.directory)[1] | |
| f_content = ( | |
| content[action_input] if content[action_input] else "< document is empty >" | |
| ) | |
| resp = self.run_gpt( | |
| MODIFY_PROMPT, | |
| stop_tokens=["action:", "thought:", "observation:"], | |
| max_tokens=2048, | |
| task=self.task, | |
| history=self.history, | |
| file_path=action_input, | |
| file_contents=f_content, | |
| ) | |
| new_contents, description = parse_file_content(resp) | |
| if new_contents is None: | |
| self.history += "observation: failed to modify file\n" | |
| return "Failed to modify file." | |
| with open(action_input, "w") as f: | |
| f.write(new_contents) | |
| self.history += f"observation: file successfully modified\n" | |
| self.history += f"observation: {description}\n" | |
| return f"File modified: {action_input}" | |
| def call_read(self, action_input): | |
| if not os.path.exists(action_input): | |
| self.history += "observation: file does not exist\n" | |
| return "File does not exist." | |
| content = read_python_module_structure(self.directory)[1] | |
| f_content = ( | |
| content[action_input] if content[action_input] else "< document is empty >" | |
| ) | |
| resp = self.run_gpt( | |
| READ_PROMPT, | |
| stop_tokens=[], | |
| max_tokens=256, | |
| task=self.task, | |
| history=self.history, | |
| file_path=action_input, | |
| file_contents=f_content, | |
| ).strip("\n") | |
| self.history += f"observation: {resp}\n" | |
| return f"File read: {action_input}" | |
| def call_add(self, action_input): | |
| d = os.path.dirname(action_input) | |
| if not d.startswith(self.directory): | |
| self.history += ( | |
| f"observation: files must be under directory {self.directory}\n" | |
| ) | |
| return f"Invalid directory: {d}" | |
| elif not action_input.endswith(".py"): | |
| self.history += "observation: can only write .py files\n" | |
| return "Only .py files are allowed." | |
| else: | |
| if d and not os.path.exists(d): | |
| os.makedirs(d) | |
| if not os.path.exists(action_input): | |
| resp = self.run_gpt( | |
| ADD_PROMPT, | |
| stop_tokens=["action:", "thought:", "observation:"], | |
| max_tokens=2048, | |
| task=self.task, | |
| history=self.history, | |
| file_path=action_input, | |
| ) | |
| new_contents, description = parse_file_content(resp) | |
| if new_contents is None: | |
| self.history += "observation: failed to write file\n" | |
| return "Failed to write file." | |
| with open(action_input, "w") as f: | |
| f.write(new_contents) | |
| self.history += "observation: file successfully written\n" | |
| self.history += f"observation: {description}\n" | |
| return f"File added: {action_input}" | |
| else: | |
| self.history += "observation: file already exists\n" | |
| return "File already exists." | |
| def call_test(self, action_input): | |
| result = subprocess.run( | |
| ["python", "-m", "pytest", "--collect-only", self.directory], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode != 0: | |
| self.history += f"observation: there are no tests! Test should be written in a test folder under {self.directory}\n" | |
| return "No tests found." | |
| result = subprocess.run( | |
| ["python", "-m", "pytest", self.directory], capture_output=True, text=True | |
| ) | |
| if result.returncode == 0: | |
| self.history += "observation: tests pass\n" | |
| return "All tests passed." | |
| resp = self.run_gpt( | |
| UNDERSTAND_TEST_RESULTS_PROMPT, | |
| stop_tokens=[], | |
| max_tokens=256, | |
| task=self.task, | |
| history=self.history, | |
| stdout=result.stdout[:5000], | |
| stderr=result.stderr[:5000], | |
| ) | |
| self.history += f"observation: tests failed: {resp}\n" | |
| return f"Tests failed: {resp}" | |
| pypelyne = Pypelyne() | |
| def create_agent(name: str, agent_type: str, complexity: int) -> str: | |
| agent = Agent(name, agent_type, complexity) | |
| pypelyne.add_agent(agent) | |
| return f"Agent created: {agent}" | |
| def create_tool(name: str, tool_type: str) -> str: | |
| tool = Tool(name, tool_type) | |
| pypelyne.add_tool(tool) | |
| return f"Tool created: {tool}" | |
| def assign_tool(agent_name: str, tool_name: str) -> str: | |
| agent = next((a for a in pypelyne.agents if a.name == agent_name), None) | |
| tool = next((t for t in pypelyne.tools if t.name == tool_name), None) | |
| if agent and tool: | |
| agent.add_tool(tool) | |
| return f"Tool '{tool.name}' assigned to agent '{agent.name}'" | |
| else: | |
| return "Agent or tool not found." | |
| def generate_chat_app() -> str: | |
| return pypelyne.generate_chat_app() | |
| def list_agents() -> str: | |
| return ( | |
| "\n".join(str(agent) for agent in pypelyne.agents) or "No agents created yet." | |
| ) | |
| def list_tools() -> str: | |
| return "\n".join(str(tool) for tool in pypelyne.tools) or "No tools created yet." | |
| def chat_with_pypelyne(message: str) -> str: | |
| return pypelyne.run_action("MAIN", message) | |
| def set_purpose_and_directory(purpose: str, directory: str) -> str: | |
| pypelyne.purpose = purpose | |
| pypelyne.directory = directory | |
| return f"Purpose set to: {purpose}\nWorking directory set to: {directory}" | |
| with gr.Blocks() as app: | |
| gr.Markdown("# Welcome to Pypelyne") | |
| gr.Markdown("Create your custom pipeline with agents and tools, then chat with it!") | |
| with gr.Tab("Setup"): | |
| purpose_input = gr.Textbox(label="Set Purpose") | |
| directory_input = gr.Textbox(label="Set Working Directory") | |
| setup_btn = gr.Button("Set Purpose and Directory") | |
| setup_output = gr.Textbox(label="Setup Output") | |
| setup_btn.click( | |
| set_purpose_and_directory, | |
| inputs=[purpose_input, directory_input], | |
| outputs=setup_output, | |
| ) | |
| with gr.Tab("Create Agents"): | |
| agent_name = gr.Textbox(label="Agent Name") | |
| agent_type = gr.Dropdown(choices=AGENT_TYPES, label="Agent Type") | |
| agent_complexity = gr.Slider( | |
| minimum=1, maximum=10, step=1, label="Agent Complexity" | |
| ) | |
| create_agent_btn = gr.Button("Create Agent") | |
| agent_output = gr.Textbox(label="Output") | |
| create_agent_btn.click( | |
| create_agent, | |
| inputs=[agent_name, agent_type, agent_complexity], | |
| outputs=agent_output, | |
| ) | |
| with gr.Tab("Create Tools"): | |
| tool_name = gr.Textbox(label="Tool Name") | |
| tool_type = gr.Dropdown(choices=TOOL_TYPES, label="Tool Type") | |
| create_tool_btn = gr.Button("Create Tool") | |
| tool_output = gr.Textbox(label="Output") | |
| create_tool_btn.click( | |
| create_tool, inputs=[tool_name, tool_type], outputs=tool_output | |
| ) | |
| with gr.Tab("Assign Tools"): | |
| agent_select = gr.Dropdown(choices=[], label="Select Agent") | |
| tool_select = gr.Dropdown(choices=[], label="Select Tool") | |
| assign_tool_btn = gr.Button("Assign Tool") | |
| assign_output = gr.Textbox(label="Output") | |
| assign_tool_btn.click( | |
| assign_tool, inputs=[agent_select, tool_select], outputs=assign_output | |
| ) | |
| with gr.Tab("Generate Chat App"): | |
| generate_btn = gr.Button("Generate Chat App") | |
| generate_output = gr.Textbox(label="Output") | |
| generate_btn.click(generate_chat_app, outputs=generate_output) | |
| with gr.Tab("Chat with Pypelyne"): | |
| chat_input = gr.Textbox(label="Your Message") | |
| chat_output = gr.Textbox(label="Pypelyne's Response") | |
| chat_btn = gr.Button("Send") | |
| chat_btn.click(chat_with_pypelyne, inputs=chat_input, outputs=chat_output) | |
| with gr.Tab("View Pypelyne"): | |
| view_agents_btn = gr.Button("View Agents") | |
| view_tools_btn = gr.Button("View Tools") | |
| view_output = gr.Textbox(label="Pypelyne Components") | |
| view_agents_btn.click(list_agents, outputs=view_output) | |
| view_tools_btn.click(list_tools, outputs=view_output) | |
| def update_dropdowns(): | |
| return gr.Dropdown.update( | |
| choices=[agent.name for agent in pypelyne.agents] | |
| ), gr.Dropdown.update(choices=[tool.name for tool in pypelyne.tools]) | |
| create_agent_btn.click(update_dropdowns, outputs=[agent_select, tool_select]) | |
| create_tool_btn.click(update_dropdowns, outputs=[agent_select, tool_select]) | |
| if __name__ == "__main__": | |
| app.launch() | |
| app = Flask(__name__) | |
| def chat(): | |
| message = request.json["message"] | |
| response = chat_with_pypelyne(message) | |
| return jsonify({"response": response}) | |
| def get_agents(): | |
| agents = list_agents() | |
| return jsonify({"agents": agents}) | |
| def get_tools(): | |
| tools = list_tools() | |
| return jsonify({"tools": tools}) | |
| if __name__ == "__main__": | |
| app.run() |