Spaces:
Building
Building
Andy Lee
commited on
Commit
Β·
14dc369
1
Parent(s):
9f00612
Revert "feat: force model to react with lat and lon for guessing"
Browse filesThis reverts commit 04ae29adceacd858775e0bd5e299a8556d12172f.
- geo_bot.py +58 -118
geo_bot.py
CHANGED
@@ -3,7 +3,6 @@ import json
|
|
3 |
import re
|
4 |
from io import BytesIO
|
5 |
from typing import Tuple, List, Optional, Dict, Any, Type
|
6 |
-
import time
|
7 |
|
8 |
from PIL import Image
|
9 |
from langchain_core.messages import HumanMessage, BaseMessage
|
@@ -38,10 +37,7 @@ AGENT_PROMPT_TEMPLATE = """
|
|
38 |
|
39 |
4. **Be Decisive:** A unique, definitive clue (full address, rare town name, etc.) β `GUESS` immediately.
|
40 |
|
41 |
-
5. **Final-Step Rule**
|
42 |
-
- If **Remaining Steps = 1**, you **MUST** `GUESS` with coordinates.
|
43 |
-
- **NO EXCEPTIONS**: Even with limited clues, provide your best estimate.
|
44 |
-
- **ALWAYS provide lat/lon numbers** - educated guesses are mandatory.
|
45 |
|
46 |
ββββββββββββββββββββββββββββββββ
|
47 |
**Context & Task:**
|
@@ -140,33 +136,21 @@ class GeoBot:
|
|
140 |
)
|
141 |
]
|
142 |
|
143 |
-
def _parse_agent_response(
|
144 |
-
self, response: BaseMessage, verbose: bool = False
|
145 |
-
) -> Optional[Dict[str, Any]]:
|
146 |
"""
|
147 |
-
Robustly parses JSON from the LLM response
|
148 |
"""
|
149 |
try:
|
150 |
assert isinstance(response.content, str), "Response content is not a string"
|
151 |
content = response.content.strip()
|
152 |
-
if verbose:
|
153 |
-
print(f"Raw AI response: {content[:200]}...") # Show first 200 chars
|
154 |
-
|
155 |
match = re.search(r"```json\s*(\{.*?\})\s*```", content, re.DOTALL)
|
156 |
if match:
|
157 |
json_str = match.group(1)
|
158 |
-
print(f"Extracted JSON: {json_str}")
|
159 |
else:
|
160 |
json_str = content
|
161 |
-
|
162 |
-
|
163 |
-
parsed = json.loads(json_str)
|
164 |
-
print(f"Successfully parsed JSON: {parsed}")
|
165 |
-
return parsed
|
166 |
-
|
167 |
except (json.JSONDecodeError, AttributeError) as e:
|
168 |
-
print(f"
|
169 |
-
print(f"Full response was:\n{response.content}")
|
170 |
return None
|
171 |
|
172 |
def init_history(self) -> List[Dict[str, Any]]:
|
@@ -238,8 +222,7 @@ class GeoBot:
|
|
238 |
prompt, image_b64_for_prompt[-1:]
|
239 |
)
|
240 |
response = self.model.invoke(message)
|
241 |
-
|
242 |
-
decision = self._parse_agent_response(response, verbose)
|
243 |
except Exception as e:
|
244 |
print(f"Error during model invocation: {e}")
|
245 |
decision = None
|
@@ -276,7 +259,15 @@ class GeoBot:
|
|
276 |
self, max_steps: int = 10, step_callback=None
|
277 |
) -> Optional[Tuple[float, float]]:
|
278 |
"""
|
279 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
280 |
"""
|
281 |
history = self.init_history()
|
282 |
|
@@ -284,24 +275,14 @@ class GeoBot:
|
|
284 |
step_num = max_steps - step + 1
|
285 |
print(f"\n--- Step {step_num}/{max_steps} ---")
|
286 |
|
287 |
-
#
|
288 |
-
|
289 |
-
|
290 |
-
try:
|
291 |
-
self.controller.setup_clean_environment()
|
292 |
-
self.controller.label_arrows_on_screen()
|
293 |
-
screenshot_bytes = self.controller.take_street_view_screenshot()
|
294 |
-
if screenshot_bytes:
|
295 |
-
break
|
296 |
-
print(f"Screenshot retry {retry + 1}/3")
|
297 |
-
except Exception as e:
|
298 |
-
print(f"Error in step {step_num}, retry {retry + 1}: {e}")
|
299 |
-
if retry < 2:
|
300 |
-
time.sleep(2)
|
301 |
|
|
|
302 |
if not screenshot_bytes:
|
303 |
-
print("Failed to
|
304 |
-
return
|
305 |
|
306 |
current_screenshot_b64 = self.pil_to_base64(
|
307 |
image=Image.open(BytesIO(screenshot_bytes))
|
@@ -309,28 +290,36 @@ class GeoBot:
|
|
309 |
available_actions = self.controller.get_available_actions()
|
310 |
print(f"Available actions: {available_actions}")
|
311 |
|
312 |
-
#
|
313 |
-
if step == 1: # Final step
|
314 |
-
|
315 |
-
|
316 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
317 |
else:
|
|
|
318 |
decision = self.execute_agent_step(
|
319 |
history, step, current_screenshot_b64, available_actions
|
320 |
)
|
321 |
|
322 |
-
|
323 |
-
|
324 |
-
decision = {
|
325 |
-
"reasoning": "AI decision failed",
|
326 |
-
"action_details": {
|
327 |
-
"action": "GUESS" if step == 1 else "PAN_RIGHT",
|
328 |
-
"lat": -1.0,
|
329 |
-
"lon": -1.0,
|
330 |
-
},
|
331 |
-
}
|
332 |
-
|
333 |
-
# UI callback
|
334 |
step_info = {
|
335 |
"step_num": step_num,
|
336 |
"max_steps": max_steps,
|
@@ -341,7 +330,7 @@ class GeoBot:
|
|
341 |
"is_final_step": step == 1,
|
342 |
"reasoning": decision.get("reasoning", "N/A"),
|
343 |
"action_details": decision.get("action_details", {"action": "N/A"}),
|
344 |
-
"history": history.copy(),
|
345 |
}
|
346 |
|
347 |
action_details = decision.get("action_details", {})
|
@@ -349,78 +338,29 @@ class GeoBot:
|
|
349 |
print(f"AI Reasoning: {decision.get('reasoning', 'N/A')}")
|
350 |
print(f"AI Action: {action}")
|
351 |
|
|
|
352 |
if step_callback:
|
353 |
try:
|
354 |
step_callback(step_info)
|
355 |
except Exception as e:
|
356 |
-
print(f"UI callback
|
357 |
|
358 |
-
# Add to history
|
359 |
self.add_step_to_history(history, current_screenshot_b64, decision)
|
360 |
|
361 |
# Execute action
|
362 |
if action == "GUESS":
|
363 |
-
lat = action_details.get("lat",
|
364 |
-
|
365 |
-
|
366 |
-
|
367 |
-
|
368 |
-
|
369 |
-
lat_f, lon_f = float(lat), float(lon)
|
370 |
-
if -90 <= lat_f <= 90 and -180 <= lon_f <= 180:
|
371 |
-
return lat_f, lon_f
|
372 |
-
except (ValueError, TypeError):
|
373 |
-
pass
|
374 |
-
|
375 |
-
print("Invalid coordinates, returning error values")
|
376 |
-
return -1.0, -1.0
|
377 |
else:
|
378 |
self.execute_action(action)
|
379 |
|
380 |
-
print("Max steps reached
|
381 |
-
return
|
382 |
-
|
383 |
-
def _get_final_guess(self, history, screenshot_b64, available_actions):
|
384 |
-
"""Get final guess from AI with simple retry."""
|
385 |
-
for retry in range(2):
|
386 |
-
try:
|
387 |
-
# If retry > 0, use a force prompt to ensure the AI returns a GUESS with coordinates.
|
388 |
-
if retry > 0:
|
389 |
-
history_text = self.generate_history_text(history)
|
390 |
-
force_prompt = f"""**FINAL STEP - MANDATORY GUESS**
|
391 |
-
You MUST return GUESS with coordinates. No other action allowed.
|
392 |
-
Remaining Steps: 1
|
393 |
-
Journey history: {history_text}
|
394 |
-
Provide your best lat/lon estimate based on all observed clues.
|
395 |
-
**MANDATORY JSON Format:**
|
396 |
-
{{"reasoning": "your analysis", "action_details": {{"action": "GUESS", "lat": 45.0, "lon": 2.0}} }}"""
|
397 |
-
|
398 |
-
message = self._create_message_with_history(
|
399 |
-
force_prompt, [screenshot_b64]
|
400 |
-
)
|
401 |
-
response = self.model.invoke(message)
|
402 |
-
decision = self._parse_agent_response(response)
|
403 |
-
else:
|
404 |
-
decision = self.execute_agent_step(
|
405 |
-
history, 1, screenshot_b64, available_actions
|
406 |
-
)
|
407 |
-
if (
|
408 |
-
decision
|
409 |
-
and decision.get("action_details", {}).get("action") == "GUESS"
|
410 |
-
):
|
411 |
-
return decision
|
412 |
-
print(f"AI didn't return GUESS, retry {retry + 1}/2")
|
413 |
-
except Exception as e:
|
414 |
-
print(f"AI call failed, retry {retry + 1}/2: {e}")
|
415 |
-
|
416 |
-
if retry == 0:
|
417 |
-
time.sleep(1)
|
418 |
-
|
419 |
-
# Fallback
|
420 |
-
return {
|
421 |
-
"reasoning": "AI failed to provide final guess after retries",
|
422 |
-
"action_details": {"action": "GUESS", "lat": -1.0, "lon": -1.0},
|
423 |
-
}
|
424 |
|
425 |
def analyze_image(self, image: Image.Image) -> Optional[Tuple[float, float]]:
|
426 |
image_b64 = self.pil_to_base64(image)
|
|
|
3 |
import re
|
4 |
from io import BytesIO
|
5 |
from typing import Tuple, List, Optional, Dict, Any, Type
|
|
|
6 |
|
7 |
from PIL import Image
|
8 |
from langchain_core.messages import HumanMessage, BaseMessage
|
|
|
37 |
|
38 |
4. **Be Decisive:** A unique, definitive clue (full address, rare town name, etc.) β `GUESS` immediately.
|
39 |
|
40 |
+
5. **Final-Step Rule:** If **Remaining Steps = 1**, you **MUST** `GUESS` and you should carefully check the image and the surroundings.
|
|
|
|
|
|
|
41 |
|
42 |
ββββββββββββββββββββββββββββββββ
|
43 |
**Context & Task:**
|
|
|
136 |
)
|
137 |
]
|
138 |
|
139 |
+
def _parse_agent_response(self, response: BaseMessage) -> Optional[Dict[str, Any]]:
|
|
|
|
|
140 |
"""
|
141 |
+
Robustly parses JSON from the LLM response, handling markdown code blocks.
|
142 |
"""
|
143 |
try:
|
144 |
assert isinstance(response.content, str), "Response content is not a string"
|
145 |
content = response.content.strip()
|
|
|
|
|
|
|
146 |
match = re.search(r"```json\s*(\{.*?\})\s*```", content, re.DOTALL)
|
147 |
if match:
|
148 |
json_str = match.group(1)
|
|
|
149 |
else:
|
150 |
json_str = content
|
151 |
+
return json.loads(json_str)
|
|
|
|
|
|
|
|
|
|
|
152 |
except (json.JSONDecodeError, AttributeError) as e:
|
153 |
+
print(f"Invalid JSON from LLM: {e}\nFull response was:\n{response.content}")
|
|
|
154 |
return None
|
155 |
|
156 |
def init_history(self) -> List[Dict[str, Any]]:
|
|
|
222 |
prompt, image_b64_for_prompt[-1:]
|
223 |
)
|
224 |
response = self.model.invoke(message)
|
225 |
+
decision = self._parse_agent_response(response)
|
|
|
226 |
except Exception as e:
|
227 |
print(f"Error during model invocation: {e}")
|
228 |
decision = None
|
|
|
259 |
self, max_steps: int = 10, step_callback=None
|
260 |
) -> Optional[Tuple[float, float]]:
|
261 |
"""
|
262 |
+
Enhanced agent loop that calls a callback function after each step for UI updates.
|
263 |
+
|
264 |
+
Args:
|
265 |
+
max_steps: Maximum number of steps to take
|
266 |
+
step_callback: Function called after each step with step info
|
267 |
+
Signature: callback(step_info: dict) -> None
|
268 |
+
|
269 |
+
Returns:
|
270 |
+
Final guess coordinates (lat, lon) or None if no guess made
|
271 |
"""
|
272 |
history = self.init_history()
|
273 |
|
|
|
275 |
step_num = max_steps - step + 1
|
276 |
print(f"\n--- Step {step_num}/{max_steps} ---")
|
277 |
|
278 |
+
# Setup and screenshot
|
279 |
+
self.controller.setup_clean_environment()
|
280 |
+
self.controller.label_arrows_on_screen()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
281 |
|
282 |
+
screenshot_bytes = self.controller.take_street_view_screenshot()
|
283 |
if not screenshot_bytes:
|
284 |
+
print("Failed to take screenshot. Ending agent loop.")
|
285 |
+
return None
|
286 |
|
287 |
current_screenshot_b64 = self.pil_to_base64(
|
288 |
image=Image.open(BytesIO(screenshot_bytes))
|
|
|
290 |
available_actions = self.controller.get_available_actions()
|
291 |
print(f"Available actions: {available_actions}")
|
292 |
|
293 |
+
# Force guess on final step or get AI decision
|
294 |
+
if step == 1: # Final step
|
295 |
+
# Force a guess with fallback logic
|
296 |
+
decision = {
|
297 |
+
"reasoning": "Maximum steps reached, forcing final guess.",
|
298 |
+
"action_details": {"action": "GUESS", "lat": 0.0, "lon": 0.0},
|
299 |
+
}
|
300 |
+
# Try to get a real guess from AI
|
301 |
+
try:
|
302 |
+
ai_decision = self.execute_agent_step(
|
303 |
+
history, step, current_screenshot_b64, available_actions
|
304 |
+
)
|
305 |
+
if (
|
306 |
+
ai_decision
|
307 |
+
and ai_decision.get("action_details", {}).get("action")
|
308 |
+
== "GUESS"
|
309 |
+
):
|
310 |
+
decision = ai_decision
|
311 |
+
except Exception as e:
|
312 |
+
print(
|
313 |
+
f"\nERROR: An exception occurred during the final GUESS attempt: {e}. Using fallback (0,0).\n"
|
314 |
+
)
|
315 |
else:
|
316 |
+
# Normal step execution
|
317 |
decision = self.execute_agent_step(
|
318 |
history, step, current_screenshot_b64, available_actions
|
319 |
)
|
320 |
|
321 |
+
# Create step_info with current history BEFORE adding current step
|
322 |
+
# This shows the history up to (but not including) the current step
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
323 |
step_info = {
|
324 |
"step_num": step_num,
|
325 |
"max_steps": max_steps,
|
|
|
330 |
"is_final_step": step == 1,
|
331 |
"reasoning": decision.get("reasoning", "N/A"),
|
332 |
"action_details": decision.get("action_details", {"action": "N/A"}),
|
333 |
+
"history": history.copy(), # History up to current step (excluding current)
|
334 |
}
|
335 |
|
336 |
action_details = decision.get("action_details", {})
|
|
|
338 |
print(f"AI Reasoning: {decision.get('reasoning', 'N/A')}")
|
339 |
print(f"AI Action: {action}")
|
340 |
|
341 |
+
# Call UI callback before executing action
|
342 |
if step_callback:
|
343 |
try:
|
344 |
step_callback(step_info)
|
345 |
except Exception as e:
|
346 |
+
print(f"Warning: UI callback failed: {e}")
|
347 |
|
348 |
+
# Add step to history AFTER callback (so next iteration has this step in history)
|
349 |
self.add_step_to_history(history, current_screenshot_b64, decision)
|
350 |
|
351 |
# Execute action
|
352 |
if action == "GUESS":
|
353 |
+
lat, lon = action_details.get("lat"), action_details.get("lon")
|
354 |
+
if lat is not None and lon is not None:
|
355 |
+
return lat, lon
|
356 |
+
else:
|
357 |
+
print("Invalid guess coordinates, using fallback")
|
358 |
+
return 0.0, 0.0 # Fallback coordinates
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
359 |
else:
|
360 |
self.execute_action(action)
|
361 |
|
362 |
+
print("Max steps reached. Agent did not make a final guess.")
|
363 |
+
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
364 |
|
365 |
def analyze_image(self, image: Image.Image) -> Optional[Tuple[float, float]]:
|
366 |
image_b64 = self.pil_to_base64(image)
|