HMP / agents /init.py
GitHub Action
Sync from GitHub with Git LFS
2304947
raw
history blame
9.01 kB
# agents/init.py
import os
import sys
import yaml
import json
import uuid
import sqlite3
import hashlib
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from datetime import datetime, UTC
from werkzeug.security import generate_password_hash
from tools.storage import Storage
from tools.identity import generate_did
from tools.crypto import generate_keypair
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.yml")
DB_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "agent_data.db")) # фиксированный путь
def load_config(path):
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def save_config(path, config):
with open(path, 'w', encoding='utf-8') as f:
yaml.dump(config, f, allow_unicode=True)
def init_identity(storage, config):
if not config.get("agent_id"):
did = generate_did()
pubkey, privkey = generate_keypair()
identity_id = did.split(":")[-1]
identity = {
"id": identity_id,
"name": config.get("agent_name", "Unnamed"),
"pubkey": pubkey,
"privkey": privkey,
"metadata": json.dumps({"role": config.get("agent_role", "core")}),
"created_at": datetime.now(UTC).isoformat(),
"updated_at": datetime.now(UTC).isoformat()
}
storage.add_identity(identity)
config["agent_id"] = did
config["identity_agent"] = identity_id
save_config(CONFIG_PATH, config)
print(f"[+] Создана личность: {identity_id}")
else:
print("[=] agent_id уже задан, пропускаем генерацию DiD.")
def init_user(storage, config):
user = config.get("default_user", {})
if not user.get("email"):
print("[-] Не указан email пользователя — пропуск.")
return
password = user.get("password")
if not password:
print("[-] Не указан пароль пользователя — пропуск.")
return
password_hash = generate_password_hash(password)
did = generate_did()
user_entry = {
"username": user.get("username", "user"),
"badges": user.get("badges", ""),
"mail": user["email"],
"password_hash": password_hash,
"did": did,
"ban": None,
"info": json.dumps({}),
"contacts": json.dumps([]),
"language": "ru,en",
"operator": 1
}
storage.add_user(user_entry)
print(f"[+] Пользователь {user['username']} добавлен.")
def init_llm_backends(storage, config):
backends = config.get("llm_backends", [])
storage.clear_llm_registry()
for backend in backends:
backend_id = str(uuid.uuid4())
desc = f"{backend.get('type', 'unknown')} model"
llm = {
"id": backend_id,
"name": backend["name"],
"endpoint": desc,
"metadata": json.dumps(backend),
"created_at": datetime.now(UTC).isoformat()
}
storage.add_llm(llm)
print(f"[+] Зарегистрирован LLM: {backend['name']}")
def init_config_table(storage, config):
exclude_keys = {"default_user", "llm_backends"}
flat_config = {k: v for k, v in config.items() if k not in exclude_keys}
for key, value in flat_config.items():
storage.set_config(key, json.dumps(value))
print("[+] Конфигурация сохранена в БД.")
def init_prompts_and_ethics():
folder = os.path.dirname(__file__)
prompt_files = [
("prompt.md", "full"),
("prompt-short.md", "short")
]
ethics_file = "ethics.yml"
with sqlite3.connect(DB_PATH) as conn:
cur = conn.cursor()
# Загружаем промпты
for fname, ptype in prompt_files:
fpath = os.path.join(folder, fname)
if not os.path.exists(fpath):
print(f"[-] Файл {fname} не найден, пропуск.")
continue
with open(fpath, "r", encoding="utf-8") as f:
content = f.read()
pid = hashlib.sha256(f"{fname}:{ptype}".encode()).hexdigest()
cur.execute("""
INSERT INTO system_prompts (id, name, type, version, source, content, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
content=excluded.content,
updated_at=excluded.updated_at
""", (pid, fname, ptype, "1.0", "local", content, datetime.now(UTC).isoformat()))
print(f"[+] Загружен промпт: {fname} ({ptype})")
# Загружаем ethics.yml
efpath = os.path.join(folder, ethics_file)
if os.path.exists(efpath):
with open(efpath, "r", encoding="utf-8") as f:
ethics_data = yaml.safe_load(f)
eid = ethics_data.get("id", "default_ethics")
cur.execute("""
INSERT INTO ethics_policies (
id, version, source,
sync_enabled, mesh_endpoint, consensus_threshold, check_interval,
model_type, model_weights_json, principles_json, evaluation_json,
violation_policy_json, audit_json, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
version=excluded.version,
source=excluded.source,
sync_enabled=excluded.sync_enabled,
mesh_endpoint=excluded.mesh_endpoint,
consensus_threshold=excluded.consensus_threshold,
check_interval=excluded.check_interval,
model_type=excluded.model_type,
model_weights_json=excluded.model_weights_json,
principles_json=excluded.principles_json,
evaluation_json=excluded.evaluation_json,
violation_policy_json=excluded.violation_policy_json,
audit_json=excluded.audit_json,
updated_at=excluded.updated_at
""", (
eid,
ethics_data.get("version"),
ethics_data.get("source", "local"),
ethics_data.get("sync", {}).get("enabled", False),
ethics_data.get("sync", {}).get("mesh_endpoint"),
ethics_data.get("sync", {}).get("consensus_threshold"),
ethics_data.get("sync", {}).get("check_interval"),
ethics_data.get("model", {}).get("type"),
json.dumps(ethics_data.get("model", {}).get("weights"), ensure_ascii=False),
json.dumps(ethics_data.get("principles"), ensure_ascii=False),
json.dumps(ethics_data.get("evaluation"), ensure_ascii=False),
json.dumps(ethics_data.get("violation_policy"), ensure_ascii=False),
json.dumps(ethics_data.get("audit"), ensure_ascii=False),
datetime.now(UTC).isoformat()
))
print(f"[+] Загружена этическая политика: {eid}")
else:
print(f"[-] Файл {ethics_file} не найден, пропуск.")
def ensure_directories():
for folder in ["logs", "scripts"]:
full_path = os.path.abspath(os.path.join(os.path.dirname(__file__), folder))
if not os.path.exists(full_path):
os.makedirs(full_path)
print(f"[+] Создан каталог: {full_path}")
else:
print(f"[=] Каталог уже существует: {full_path}")
def is_db_initialized(db_path):
if not os.path.exists(db_path):
return False
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='identity'")
return cursor.fetchone() is not None
except Exception:
return False
def ensure_db_initialized():
config = load_config(CONFIG_PATH)
if not is_db_initialized(DB_PATH):
print("[*] БД не инициализирована — выполняем инициализацию.")
try:
ensure_directories()
storage = Storage()
init_identity(storage, config)
init_user(storage, config)
init_llm_backends(storage, config)
init_config_table(storage, config)
save_config(CONFIG_PATH, config)
init_prompts_and_ethics()
except Exception as e:
print(f"[!] Ошибка при инициализации: {e}")
sys.exit(1)
else:
print("[=] БД уже инициализирована.")
return config
if __name__ == "__main__":
ensure_db_initialized()