BitTransformerLM / tests /test_dashboard.py
WCNegentropy's picture
🤖 Updated BitTransformerLM from development space
36c78b1 verified
raw
history blame
3.58 kB
import os, sys; sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import torch
import mcp_server as dash
from mcp_server import app
from bit_transformer.dashboard_app import ModelManager, MetricDriftWarning
from bit_transformer import BitTransformerLM
from bit_transformer.optimization import configure_optimizer
from bit_transformer.bit_io import text_to_bits
import time
def test_exec_endpoint_removed():
with app.test_client() as client:
resp = client.post("/exec", json={"code": "print('OK')"})
assert resp.status_code in (403, 404)
def test_status_endpoint(tmp_path):
dash.manager = ModelManager(snapshot_dir=tmp_path)
params = {"d_model": 16, "nhead": 2, "num_layers": 1, "dim_feedforward": 32, "max_seq_len": 8}
with app.test_client() as client:
client.post("/init", json=params)
resp = client.get("/status")
data = resp.get_json()
assert data["d_model"] == 16 and data["num_layers"] == 1
def test_modelmanager_compression(tmp_path):
mgr = ModelManager(snapshot_dir=tmp_path)
mgr.model = BitTransformerLM(d_model=16, nhead=2, num_layers=1, dim_feedforward=32, max_seq_len=8)
mgr.optimizer, mgr.scheduler = configure_optimizer(mgr.model, lr=1e-3, total_steps=1)
mgr.set_compression(True)
bits = torch.randint(0, 2, (1, 8), dtype=torch.long)
loss, ratio = mgr.train_step(bits)
assert isinstance(loss, float) and 0 <= ratio <= 1.0
def test_metric_drift_warning(tmp_path):
mgr = ModelManager(snapshot_dir=tmp_path, drift_window=2, drift_threshold=0.1)
tele = {
"negentropy_logits": torch.tensor([0.0]),
"lz_complexity_logits": torch.tensor([0.0]),
"symbiosis_score": torch.tensor([0.0]),
}
for _ in range(4):
mgr._log_metrics(tele)
tele_drift = {
"negentropy_logits": torch.tensor([1.0]),
"lz_complexity_logits": torch.tensor([0.0]),
"symbiosis_score": torch.tensor([0.0]),
}
import pytest
with pytest.warns(MetricDriftWarning):
mgr._log_metrics(tele_drift)
def test_dashboard_endpoints(tmp_path):
dash.manager = ModelManager(snapshot_dir=tmp_path)
params = {"d_model": 16, "nhead": 2, "num_layers": 1, "dim_feedforward": 32, "max_seq_len": 8}
with app.test_client() as client:
resp = client.post("/init", json=params)
assert resp.status_code == 200
bits = torch.randint(0, 2, (1, 8), dtype=torch.long).tolist()
train = client.post("/train", json={"bits": bits})
assert train.status_code == 200
job_id = train.get_json()["job_id"]
for _ in range(20):
job_resp = client.get(f"/job/{job_id}")
data = job_resp.get_json()
if data["status"] == "completed":
assert "loss" in data["result"]
break
time.sleep(0.1)
else:
assert False, "training job did not complete"
infer = client.post("/infer", json={"bits": bits})
assert infer.status_code == 200 and "predicted" in infer.get_json()
def test_text_to_bits_and_dataset(tmp_path):
dash.manager = ModelManager(snapshot_dir=tmp_path)
with app.test_client() as client:
resp = client.post("/text_to_bits", json={"text": "hi"})
assert resp.status_code == 200
assert resp.get_json()["bits"] == text_to_bits("hi")
ds = client.get("/dataset?name=wikitext2&split=train&size=1&seq_len=8")
data = ds.get_json()
assert len(data["bits"]) == 1 and len(data["bits"][0]) == 8