Autonomous-AI / scheduler.py
Leonydis137's picture
Upload scheduler.py
b96144f verified
raw
history blame
981 Bytes
import time
import threading
class TaskScheduler:
def __init__(self):
self.tasks = []
self.running = False
def add_task(self, name, func, interval=10):
task = {"name": name, "func": func, "interval": interval, "last_run": 0}
self.tasks.append(task)
def start(self):
self.running = True
threading.Thread(target=self.run_loop, daemon=True).start()
def stop(self):
self.running = False
def run_loop(self):
while self.running:
now = time.time()
for task in self.tasks:
if now - task["last_run"] >= task["interval"]:
try:
result = task["func"]()
print(f"[Scheduler] Ran task '{task['name']}': {result}")
except Exception as e:
print(f"[Scheduler Error] {task['name']}: {e}")
task["last_run"] = now
time.sleep(1)