import argparse import subprocess import sys import time from pathlib import Path from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer class RestartOnChange(FileSystemEventHandler): """Restart a subprocess when watched files change.""" def __init__(self, command: list[str], watch_paths: list[str]) -> None: self.command = command self.watch_paths = [Path(p).resolve() for p in watch_paths] self.process: subprocess.Popen | None = None self.restart() def restart(self) -> None: if self.process and self.process.poll() is None: self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() self.process.wait() self.process = subprocess.Popen(self.command) def on_any_event(self, event) -> None: # pragma: no cover - runtime utility if event.is_directory: return path = Path(event.src_path) if path.suffix != ".py": return if any(str(path).startswith(str(p)) for p in self.watch_paths): print(f"[watcher] {path} changed, running tests...") subprocess.run([sys.executable, "-m", "pytest", "-q"]) print("[watcher] restarting process...") self.restart() def main() -> None: # pragma: no cover - CLI entry parser = argparse.ArgumentParser( description="Watch files and restart a command on changes", ) parser.add_argument( "--command", nargs="+", default=[sys.executable, "mcp_server.py"], help="Command to run", ) parser.add_argument( "--paths", nargs="+", default=["bit_transformer", "mcp_server.py"], help="Paths to watch for changes", ) args = parser.parse_args() observer = Observer() handler = RestartOnChange(args.command, args.paths) for p in args.paths: observer.schedule(handler, p, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: pass finally: observer.stop() handler.restart() if handler.process and handler.process.poll() is None: handler.process.terminate() handler.process.wait() observer.join() if __name__ == "__main__": # pragma: no cover - CLI entry main()