import os, sys; sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import torch import mcp_server as dash from mcp_server import app from bit_transformer.dashboard_app import ModelManager, MetricDriftWarning from bit_transformer import BitTransformerLM from bit_transformer.optimization import configure_optimizer from bit_transformer.bit_io import text_to_bits import time def test_exec_endpoint_removed(): with app.test_client() as client: resp = client.post("/exec", json={"code": "print('OK')"}) assert resp.status_code in (403, 404) def test_status_endpoint(tmp_path): dash.manager = ModelManager(snapshot_dir=tmp_path) params = {"d_model": 16, "nhead": 2, "num_layers": 1, "dim_feedforward": 32, "max_seq_len": 8} with app.test_client() as client: client.post("/init", json=params) resp = client.get("/status") data = resp.get_json() assert data["d_model"] == 16 and data["num_layers"] == 1 def test_modelmanager_compression(tmp_path): mgr = ModelManager(snapshot_dir=tmp_path) mgr.model = BitTransformerLM(d_model=16, nhead=2, num_layers=1, dim_feedforward=32, max_seq_len=8) mgr.optimizer, mgr.scheduler = configure_optimizer(mgr.model, lr=1e-3, total_steps=1) mgr.set_compression(True) bits = torch.randint(0, 2, (1, 8), dtype=torch.long) loss, ratio = mgr.train_step(bits) assert isinstance(loss, float) and 0 <= ratio <= 1.0 def test_metric_drift_warning(tmp_path): mgr = ModelManager(snapshot_dir=tmp_path, drift_window=2, drift_threshold=0.1) tele = { "negentropy_logits": torch.tensor([0.0]), "lz_complexity_logits": torch.tensor([0.0]), "symbiosis_score": torch.tensor([0.0]), } for _ in range(4): mgr._log_metrics(tele) tele_drift = { "negentropy_logits": torch.tensor([1.0]), "lz_complexity_logits": torch.tensor([0.0]), "symbiosis_score": torch.tensor([0.0]), } import pytest with pytest.warns(MetricDriftWarning): mgr._log_metrics(tele_drift) def test_dashboard_endpoints(tmp_path): dash.manager = ModelManager(snapshot_dir=tmp_path) params = {"d_model": 16, "nhead": 2, "num_layers": 1, "dim_feedforward": 32, "max_seq_len": 8} with app.test_client() as client: resp = client.post("/init", json=params) assert resp.status_code == 200 bits = torch.randint(0, 2, (1, 8), dtype=torch.long).tolist() train = client.post("/train", json={"bits": bits}) assert train.status_code == 200 job_id = train.get_json()["job_id"] for _ in range(20): job_resp = client.get(f"/job/{job_id}") data = job_resp.get_json() if data["status"] == "completed": assert "loss" in data["result"] break time.sleep(0.1) else: assert False, "training job did not complete" infer = client.post("/infer", json={"bits": bits}) assert infer.status_code == 200 and "predicted" in infer.get_json() def test_text_to_bits_and_dataset(tmp_path): dash.manager = ModelManager(snapshot_dir=tmp_path) with app.test_client() as client: resp = client.post("/text_to_bits", json={"text": "hi"}) assert resp.status_code == 200 assert resp.get_json()["bits"] == text_to_bits("hi") ds = client.get("/dataset?name=wikitext2&split=train&size=1&seq_len=8") data = ds.get_json() assert len(data["bits"]) == 1 and len(data["bits"][0]) == 8