Spaces:
Sleeping
Sleeping
| # gamestate.py | |
| import threading | |
| import time | |
| import os | |
| import json | |
| import pandas as pd | |
| class GameState: | |
| def __init__(self, save_dir="saved_worlds", csv_filename="world_state.csv"): | |
| os.makedirs(save_dir, exist_ok=True) | |
| self.csv_path = os.path.join(save_dir, csv_filename) | |
| self.lock = threading.Lock() | |
| self.world_state = [] # List of dicts representing game objects | |
| self.last_update_time = time.time() | |
| self.load_state() | |
| def load_state(self): | |
| """Load world state from CSV if available.""" | |
| if os.path.exists(self.csv_path): | |
| try: | |
| df = pd.read_csv(self.csv_path) | |
| self.world_state = df.to_dict(orient='records') | |
| except Exception as e: | |
| print(f"Error loading state from {self.csv_path}: {e}") | |
| else: | |
| self.world_state = [] | |
| def save_state(self): | |
| """Persist the current world state to CSV.""" | |
| with self.lock: | |
| try: | |
| df = pd.DataFrame(self.world_state) | |
| df.to_csv(self.csv_path, index=False) | |
| except Exception as e: | |
| print(f"Error saving state to {self.csv_path}: {e}") | |
| def get_state(self): | |
| """Return a deep copy of the current world state.""" | |
| with self.lock: | |
| return json.loads(json.dumps(self.world_state)) | |
| def update_state(self, new_objects): | |
| """ | |
| Merge new or updated objects into the world state. | |
| Each object must have a unique 'obj_id'. | |
| """ | |
| with self.lock: | |
| for obj in new_objects: | |
| found = False | |
| for existing in self.world_state: | |
| if existing.get('obj_id') == obj.get('obj_id'): | |
| existing.update(obj) | |
| found = True | |
| break | |
| if not found: | |
| self.world_state.append(obj) | |
| self.last_update_time = time.time() | |
| self.save_state() | |