HMP / hf_repo /agents /_old /mcp_server.py
GitHub Action
Sync from GitHub with Git LFS
376ee15
raw
history blame
10.6 kB
# agents/mcp_server.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from flask import Flask, request, jsonify
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
from models import GraphExport
from storage import Storage
from tools.concept_store import ConceptStore
from tools.notebook_store import NotebookStore
import random
app = FastAPI(title="HMP MCP-Agent API", version="0.1")
# Добавляем CORS (полезно, если API вызывается с веб-клиента)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Можем позже ограничить, если потребуется
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Инициализация хранилищ
concept_store = ConceptStore()
notebook_store = NotebookStore()
db = Storage()
# === Модели запроса/ответа ===
class EntryInput(BaseModel):
text: str
tags: Optional[List[str]] = []
timestamp: Optional[str] = None
class EntryOutput(BaseModel):
id: int
text: str
tags: List[str]
timestamp: str
class EntryListOutput(BaseModel):
entries: List[EntryOutput]
class ConceptInput(BaseModel):
name: str
description: Optional[str] = None
class ConceptOutput(BaseModel):
concept_id: int
class LinkInput(BaseModel):
source_id: int
target_id: int
relation: str
class LinkOutput(BaseModel):
link_id: int
class Node(BaseModel):
id: str
label: str
tags: List[str] = []
class Edge(BaseModel):
source: str
target: str
relation: str
class GraphImportData(BaseModel):
nodes: List[Node] = []
edges: List[Edge] = []
class GraphExpansionOutput(BaseModel):
links: List[Edge]
class Concept(BaseModel):
concept_id: int
name: str
description: Optional[str] = None
class ConceptQueryOutput(BaseModel):
matches: List[Concept]
class DiaryEntry(BaseModel):
id: int
text: str
tags: List[str]
timestamp: str
class DiaryExport(BaseModel):
entries: List[DiaryEntry]
class ConceptExport(BaseModel):
id: int
name: str
description: Optional[str] = None
class LinkExport(BaseModel):
id: int
source_id: int
target_id: int
relation: str
class GraphExport(BaseModel):
concepts: List[ConceptExport]
links: List[LinkExport]
class ConceptUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
# === Эндпойнты ===
@app.get("/status")
def status():
return {
"status": "ok",
"agent": "HMP-MCP",
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/write_entry", response_model=dict)
def write_entry(entry: EntryInput):
db.write_entry(entry.text, entry.tags)
return {"result": "entry saved"}
@app.get("/read_entries", response_model=EntryListOutput)
def read_entries(limit: int = 5, tag: Optional[str] = None):
raw = db.read_entries(limit=limit, tag_filter=tag)
return {
"entries": [
{
"id": r[0],
"text": r[1],
"tags": r[2].split(",") if r[2] else [],
"timestamp": r[3]
} for r in raw
]
}
@app.get("/")
def root():
return {"message": "HMP MCP-Agent API is running"}
@app.post("/add_concept", response_model=ConceptOutput)
def add_concept(concept: ConceptInput):
cid = db.add_concept(concept.name, concept.description)
return {"concept_id": cid}
@app.post("/add_link", response_model=LinkOutput)
def add_link(link: LinkInput):
link_id = db.add_link(link.source_id, link.target_id, link.relation)
return {"link_id": link_id}
@app.get("/expand_graph", response_model=GraphExpansionOutput)
def expand_graph(start_id: int, depth: int = 1):
raw_links = db.expand_graph(start_id, depth)
edges = [{"source_id": s, "target_id": t, "relation": r} for s, t, r in raw_links]
return {"links": edges}
@app.get("/query_concept", response_model=ConceptQueryOutput)
def query_concept(name: str):
results = db.query_concept(name)
return {
"matches": [
{"concept_id": row[0], "name": row[1], "description": row[2]}
for row in results
]
}
@app.get("/list_concepts", response_model=List[Concept])
def list_concepts():
rows = db.list_concepts()
return [
{"concept_id": row[0], "name": row[1], "description": row[2]}
for row in rows
]
@app.get("/list_links", response_model=List[Edge])
def list_links():
rows = db.list_links()
return [
{"source_id": row[1], "target_id": row[2], "relation": row[3]}
for row in rows
]
@app.delete("/delete_concept/{concept_id}")
def delete_concept(concept_id: int):
db.delete_concept(concept_id)
return {"result": f"concept {concept_id} deleted"}
@app.delete("/delete_link/{link_id}")
def delete_link(link_id: int):
db.delete_link(link_id)
return {"result": f"link {link_id} deleted"}
@app.delete("/delete_entry/{entry_id}")
def delete_entry(entry_id: int):
db.delete_entry(entry_id)
return {"result": f"entry {entry_id} deleted"}
@app.get("/export_diary", response_model=DiaryExport)
def export_diary():
rows = db.export_diary()
return {
"entries": [
{
"id": r[0],
"text": r[1],
"tags": r[2].split(",") if r[2] else [],
"timestamp": r[3]
}
for r in rows
]
}
@app.get("/export_graph", response_model=GraphExport)
def export_graph():
return concept_store.export_as_json()
@app.put("/update_concept/{concept_id}")
def update_concept(concept_id: int, update: ConceptUpdate):
db.update_concept(concept_id, update.name, update.description)
return {"result": f"concept {concept_id} updated"}
@app.get("/tag_stats", response_model=dict)
def tag_stats():
return db.get_tag_stats()
@app.get("/search_links", response_model=List[LinkExport])
def search_links(relation: str):
rows = db.search_links_by_relation(relation)
return [
{
"id": row[0],
"source_id": row[1],
"target_id": row[2],
"relation": row[3]
}
for row in rows
]
@app.get("/search_concepts", response_model=List[Concept])
def search_concepts(query: str):
results = db.search_concepts(query)
return [
{"concept_id": row[0], "name": row[1], "description": row[2]}
for row in results
]
@app.post("/merge_concepts", response_model=dict)
def merge_concepts(source_id: int, target_id: int):
db.merge_concepts(source_id, target_id)
return {"result": f"concept {source_id} merged into {target_id}"}
@app.post("/relate_concepts", response_model=LinkOutput)
def relate_concepts(source_name: str, target_name: str, relation: str):
sid = db.find_concept_id_by_name(source_name)
tid = db.find_concept_id_by_name(target_name)
if sid is None or tid is None:
raise HTTPException(status_code=404, detail="Concept not found")
link_id = db.add_link(sid, tid, relation)
return {"link_id": link_id}
@app.get("/tag_cloud", response_model=dict)
def tag_cloud():
return db.get_tag_stats()
@app.get("/get_concept/{concept_id}")
def get_concept(concept_id: str):
concept = concept_store.get(concept_id)
if concept:
return concept
raise HTTPException(status_code=404, detail="Concept not found")
@app.get("/get_entry/{entry_id}")
def get_entry(entry_id: str):
entry = notebook_store.get(entry_id)
if entry:
return entry
raise HTTPException(status_code=404, detail="Entry not found")
@app.post("/search_entries")
def search_entries(query: str):
results = notebook_store.search(query)
return results
@app.post("/import_graph")
def import_graph(graph_data: GraphImportData):
concept_store.import_from_json(graph_data.dict())
print(f"[INFO] Imported {len(graph_data.nodes)} nodes, {len(graph_data.edges)} edges")
return {"status": "ok"}
# === Notebook API ===
@app.post("/notebook/add")
async def add_note(req: Request):
data = await req.json()
text = data.get("text", "").strip()
if not text:
return {"status": "error", "message": "Empty text"}
notebook.add_note(text, source="user")
return {"status": "ok", "message": "Note added"}
@app.get("/notebook/next")
def get_next_note():
note = notebook.get_first_unread_note()
if note:
note_id, text, source, timestamp, tags = note
return {
"id": note_id,
"text": text,
"source": source,
"timestamp": timestamp,
"tags": tags
}
return {"status": "empty", "message": "No unread notes"}
@app.post("/notebook/mark_read")
async def mark_note_read(req: Request):
data = await req.json()
note_id = data.get("id")
if note_id is not None:
notebook.mark_note_as_read(note_id)
return {"status": "ok"}
return {"status": "error", "message": "Missing note id"}
# === ✨ Дополнительные эндпоинты для заметок ===
@app.route("/notes/latest", methods=["GET"])
def get_latest_notes():
"""Вернуть последние N заметок (по умолчанию 10)."""
count = int(request.args.get("count", 10))
notes = storage.diary[-count:]
return jsonify([note.to_dict() for note in notes])
@app.route("/notes/random", methods=["GET"])
def get_random_note():
"""Вернуть случайную заметку из дневника."""
if not storage.diary:
return jsonify({})
note = random.choice(storage.diary)
return jsonify(note.to_dict())
@app.route("/notes/set_tags", methods=["POST"])
def set_tags():
"""Обновить теги у заметки по ID."""
data = request.json
note_id = data.get("id")
tags = data.get("tags", [])
for note in storage.diary:
if note.id == note_id:
note.tags = tags
return jsonify({"status": "ok"})
return jsonify({"error": "not found"}), 404
@app.route("/notes/by_tag", methods=["GET"])
def get_notes_by_tag():
tag = request.args.get("tag")
result = [note.to_dict() for note in storage.diary if tag in note.tags]
return jsonify(result)
# === Run ===
if __name__ == "__main__":
uvicorn.run("mcp_server:app", host="0.0.0.0", port=8080, reload=True)
# === Shutdown ===
@app.on_event("shutdown")
def shutdown():
db.close()