Kimi-Dev-72B / kimi_dev /serve /templates.py
miaoyibo
update location
c8c66b5
import os
import re
import json
import subprocess
import ast
import difflib
def show_project_structure(structure, spacing=0) -> str:
"""pprint the project structure"""
pp_string = ''
for key, value in structure.items():
if '.' in key and '.py' not in key:
continue # skip none python files
# TODO: maybe we should skip the test files...
if key.startswith('test'):
continue # skip the test files as well...
if '.' in key:
pp_string += ' ' * spacing + str(key) + '\n'
else:
pp_string += ' ' * spacing + str(key) + '/' + '\n'
if 'text' not in value:
pp_string += show_project_structure(value, spacing + 4)
return pp_string
# def clone_github_repo(github_url, local_path):
# """Clone GitHub repository to local path"""
# try:
# subprocess.run(['git', 'clone', github_url, local_path], check=True)
# print(f"Successfully cloned repository to: {local_path}")
# except subprocess.CalledProcessError as e:
# print(f"Warning: Repository cloning may have failed: {e}")
def clone_github_repo(github_url, local_path, commit_hash=None):
"""Clone GitHub repository to local path and optionally checkout specific commit"""
try:
subprocess.run(['git', 'clone', github_url, local_path], check=True)
print(f"Successfully cloned repository to: {local_path}")
# If commit hash is provided, checkout to that specific commit
if commit_hash:
subprocess.run(['git', 'checkout', commit_hash], cwd=local_path, check=True)
print(f"Successfully checked out to commit: {commit_hash}")
except subprocess.CalledProcessError as e:
print(f"Warning: Repository cloning or checkout may have failed: {e}")
def parse_python_file(file_path, file_content=None):
"""Parse a Python file to extract class and function definitions with their line numbers.
:param file_path: Path to the Python file.
:return: Class names, function names, and file contents
"""
if file_content is None:
try:
with open(file_path, "r") as file:
file_content = file.read()
parsed_data = ast.parse(file_content)
except Exception as e: # Catch all types of exceptions
print(f"Error in file {file_path}: {e}")
return [], [], ""
else:
try:
parsed_data = ast.parse(file_content)
except Exception as e: # Catch all types of exceptions
print(f"Error in file {file_path}: {e}")
return [], [], ""
class_info = []
function_names = []
class_methods = set()
for node in ast.walk(parsed_data):
if isinstance(node, ast.ClassDef):
methods = []
for n in node.body:
if isinstance(n, ast.FunctionDef):
methods.append(
{
"name": n.name,
"start_line": n.lineno,
"end_line": n.end_lineno,
"text": file_content.splitlines()[
n.lineno - 1 : n.end_lineno
],
}
)
class_methods.add(n.name)
class_info.append(
{
"name": node.name,
"start_line": node.lineno,
"end_line": node.end_lineno,
"text": file_content.splitlines()[
node.lineno - 1 : node.end_lineno
],
"methods": methods,
}
)
elif isinstance(node, ast.FunctionDef) and not isinstance(
node, ast.AsyncFunctionDef
):
if node.name not in class_methods:
function_names.append(
{
"name": node.name,
"start_line": node.lineno,
"end_line": node.end_lineno,
"text": file_content.splitlines()[
node.lineno - 1 : node.end_lineno
],
}
)
return class_info, function_names, file_content.splitlines()
# def create_structure(directory_path):
# """Create the structure of the repository directory by parsing Python files.
# :param directory_path: Path to the repository directory.
# :return: A dictionary representing the structure.
# """
# structure = {}
# for root, _, files in os.walk(directory_path):
# repo_name = os.path.basename(directory_path)
# relative_root = os.path.relpath(root, directory_path)
# if relative_root == ".":
# relative_root = repo_name
# curr_struct = structure
# for part in relative_root.split(os.sep):
# if part not in curr_struct:
# curr_struct[part] = {}
# curr_struct = curr_struct[part]
# for file_name in files:
# if file_name.endswith(".py"):
# file_path = os.path.join(root, file_name)
# class_info, function_names, file_lines = parse_python_file(file_path)
# curr_struct[file_name] = {
# "classes": class_info,
# "functions": function_names,
# "text": file_lines,
# }
# else:
# curr_struct[file_name] = {}
# return structure
def create_structure(directory_path):
"""Create the structure of the repository directory by parsing Python files.
:param directory_path: Path to the repository directory.
:return: A dictionary representing the structure.
"""
structure = {}
for root, dirs, files in os.walk(directory_path):
relative_root = os.path.relpath(root, directory_path)
# Build the current directory position in the structure
if relative_root == ".":
curr_struct = structure
else:
curr_struct = structure
# Split by path separator and create directory structure layer by layer
path_parts = relative_root.split(os.sep)
for part in path_parts:
if part not in curr_struct:
curr_struct[part] = {}
curr_struct = curr_struct[part]
# First create empty dictionary structure for all subdirectories
for dir_name in dirs:
# Skip hidden directories and common ignored directories
if not dir_name.startswith('.') and dir_name not in ['__pycache__', 'node_modules']:
if dir_name not in curr_struct:
curr_struct[dir_name] = {}
# Process all files in the current directory
for file_name in files:
# Skip hidden files and compiled files
if file_name.startswith('.') or file_name.endswith('.pyc'):
continue
file_path = os.path.join(root, file_name)
if file_name.endswith(".py"):
# Python files: parse class and function information
try:
class_info, function_names, file_lines = parse_python_file(file_path)
curr_struct[file_name] = {
"classes": class_info,
"functions": function_names,
"text": file_lines,
}
except Exception as e:
print(f"Failed to parse Python file {file_path}: {e}")
curr_struct[file_name] = {"text": []}
else:
code_extensions = ['.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.c', '.h', '.hpp',
'.cs', '.php', '.rb', '.go', '.rs', '.swift', '.kt', '.scala',
'.sh', '.bat', '.ps1', '.sql', '.html', '.css', '.scss', '.less',
'.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.cfg', '.conf',
'.md', '.txt', '.rst', '.tex', '.r', '.R', '.m', '.pl', '.lua']
if any(file_name.endswith(ext) for ext in code_extensions):
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
file_content = f.read()
curr_struct[file_name] = {"text": file_content.splitlines()}
except Exception as e:
print(f"Failed to read file {file_path}: {e}")
curr_struct[file_name] = {"text": []}
else:
curr_struct[file_name] = {"text": []}
return structure
def build_repo_structure(root_path):
"""Build repository structure using improved parsing method"""
return create_structure(root_path)
def get_loc_prompt(issue_text,repo_structure):
obtain_relevant_files_prompt = """
Please look through the following GitHub problem description and Repository structure and provide a list of files that one would need to edit to fix the problem.
### GitHub Problem Description ###
{problem_statement}
###
### Repository Structure ###
{structure}
###
Please only provide the full path and return at most 5 files.
The returned files should be separated by new lines ordered by most to least important and wrapped with ```
For example:
```
file1.py
file2.py
```
"""
prompt_content = obtain_relevant_files_prompt.format(problem_statement=issue_text,structure=repo_structure)
return prompt_content
def get_repair_prompt(issue_text,file_content):
repair_prompt_combine_topn_cot_diff = """
We are currently solving the following issue within our repository. Here is the issue text:
--- BEGIN ISSUE ---
{problem_statement}
--- END ISSUE ---
Below are some code segments, each from a relevant file. One or more of these files may contain bugs.
--- BEGIN FILE ---
```
{content}
```
--- END FILE ---
Please first localize the bug based on the issue statement, and then generate *SEARCH/REPLACE* edits to fix the issue.
Every *SEARCH/REPLACE* edit must use this format:
1. The file path
2. The start of search block: <<<<<<< SEARCH
3. A contiguous chunk of lines to search for in the existing source code
4. The dividing line: =======
5. The lines to replace into the source code
6. The end of the replace block: >>>>>>> REPLACE
Here is an example:
```python
### mathweb/flask/app.py
<<<<<<< SEARCH
from flask import Flask
=======
import math
from flask import Flask
>>>>>>> REPLACE
```
Please note that the *SEARCH/REPLACE* edit REQUIRES PROPER INDENTATION. If you would like to add the line ' print(x)', you must fully write that out, with all those spaces before the code!
Wrap the *SEARCH/REPLACE* edit in blocks ```python...```.
"""
prompt_content = repair_prompt_combine_topn_cot_diff.format(problem_statement=issue_text,content=file_content.rstrip())
return prompt_content
def get_repo_files(structure, filepaths: list[str]):
files, classes, functions = get_full_file_paths_and_classes_and_functions(structure)
file_contents = dict()
for filepath in filepaths:
content = None
for file_content in files:
if file_content[0] == filepath:
content = '\n'.join(file_content[1])
file_contents[filepath] = content
break
# assert content is not None, "file not found"
return file_contents
def correct_file_path_in_structure(file_name, structure):
"""
Search for the correct file path in the structure, mainly checking first-level subdirectories
Args:
file_name (str): File name to search for
structure (dict): Repository structure
Returns:
str: Correct file path if found, otherwise returns original file_name
"""
# Search in current directory
file_contents = get_repo_files(structure, [file_name])
if file_contents != {}:
return file_name
# Only check first-level subdirectories
for sub_dir in structure.keys():
if isinstance(structure[sub_dir], dict):
file_contents = get_repo_files(structure[sub_dir], [file_name])
if file_contents != {}:
return f'{sub_dir}/{file_name}'
return file_name
def get_full_file_paths_and_classes_and_functions(structure, current_path=''):
"""
Recursively retrieve all file paths, classes, and functions within a directory structure.
Arguments:
structure -- a dictionary representing the directory structure
current_path -- the path accumulated so far, used during recursion (default="")
Returns:
A tuple containing:
- files: list of full file paths
- classes: list of class details with file paths
- functions: list of function details with file paths
"""
files = []
classes = []
functions = []
for name, content in structure.items():
if isinstance(content, dict):
if (
(
'functions' not in content.keys()
and 'classes' not in content.keys()
and 'text' not in content.keys()
)
or not len(content.keys()) == 3
or (
isinstance(content.get('text', []), dict)
or isinstance(content.get('functions', []), dict)
or isinstance(content.get('classes', []), dict)
)
):
# or guards against case where functions and classes are somehow part of the structure.
next_path = f'{current_path}/{name}' if current_path else name
(
sub_files,
sub_classes,
sub_functions,
) = get_full_file_paths_and_classes_and_functions(content, next_path)
files.extend(sub_files)
classes.extend(sub_classes)
functions.extend(sub_functions)
else:
next_path = f'{current_path}/{name}' if current_path else name
files.append((next_path, content.get('text', [])))
if content.get('text', []) == []:
continue
if 'classes' in content:
for clazz in content['classes']:
classes.append(
{
'file': next_path,
'name': clazz['name'],
'start_line': clazz['start_line'],
'end_line': clazz['end_line'],
'methods': [
{
'name': method['name'],
'start_line': method['start_line'],
'end_line': method['end_line'],
}
for method in clazz.get('methods', [])
],
},
)
if 'functions' in content:
for function in content['functions']:
try:
function['file'] = next_path
except TypeError:
continue
functions.append(function)
else:
next_path = f'{current_path}/{name}' if current_path else name
files.append(next_path)
return files, classes, functions
def post_process(response: str) -> str:
content = response
if "◁/think▷" in content:
content = content.replace("◁think▷", "")
parts = content.split("◁/think▷")
content = parts[-1]
# Extract content between triple backticks (```)
matches = re.findall(r"```.*?```", content, re.DOTALL)
if matches:
matches = [item.replace("```","") for item in matches]
return "\n".join(matches) # Return all matched code blocks joined by new lines
return content # If no match, return the full response
def correct_file_paths(model_found_files, files, similarity_threshold=0.8):
found_files = []
all_file_paths = [file_content[0] for file_content in files]
if model_found_files:
for model_file in model_found_files:
match_found = False
for file_path in all_file_paths:
if model_file == file_path:
found_files.append(file_path)
match_found = True
break
elif file_path.endswith(model_file):
found_files.append(file_path)
match_found = True
break
elif os.path.basename(file_path) == os.path.basename(model_file):
found_files.append(file_path)
match_found = True
break
if not match_found:
close_matches = difflib.get_close_matches(model_file, all_file_paths, n=1, cutoff=similarity_threshold)
if close_matches:
found_files.append(close_matches[0])
return found_files
else:
return []