from datetime import datetime
import json
import os
import time
from pathlib import Path
import streamlit as st
from utils import (
load_json,
load_json_no_cache,
parse_arguments,
format_chat_message,
find_screenshot,
gather_chat_history,
get_screenshot,
load_page,
)
def show_selectbox(demonstration_dir):
# find all the subdirectories in the current directory
dirs = [
d
for d in os.listdir(demonstration_dir)
if os.path.isdir(f"{demonstration_dir}/{d}")
]
if not dirs:
st.title("No recordings found.")
return None
# sort by date
dirs.sort(key=lambda x: os.path.getmtime(f"{demonstration_dir}/{x}"), reverse=True)
# offer the user a dropdown to select which recording to visualize, set a default
recording_name = st.sidebar.selectbox("Recording", dirs, index=0)
return recording_name
def show_overview(data, recording_name, basedir):
st.title('[WebLINX](https://mcgill-nlp.github.io/weblinx) Explorer')
st.header(f"Recording: `{recording_name}`")
screenshot_size = st.session_state.get("screenshot_size_view_mode", "regular")
show_advanced_info = st.session_state.get("show_advanced_information", False)
if screenshot_size == "regular":
col_layout = [1.5, 1.5, 7, 3.5]
elif screenshot_size == "small":
col_layout = [1.5, 1.5, 7, 2]
else: # screenshot_size == 'large'
col_layout = [1.5, 1.5, 11]
# col_i, col_time, col_act, col_actvis = st.columns(col_layout)
# screenshots = load_screenshots(data, basedir)
for i, d in enumerate(data):
if i > 0 and show_advanced_info:
# Use html to add a horizontal line with minimal gap
st.markdown(
"
",
unsafe_allow_html=True,
)
if screenshot_size == "large":
col_time, col_i, col_act = st.columns(col_layout)
col_actvis = col_act
else:
col_time, col_i, col_act, col_actvis = st.columns(col_layout)
secs_from_start = d["timestamp"] - data[0]["timestamp"]
# `secs_from_start` is a float including ms, display in MM:SS.mm format
col_time.markdown(
f"**{datetime.utcfromtimestamp(secs_from_start).strftime('%M:%S')}**"
)
if not st.session_state.get("enable_html_download", True):
col_i.markdown(f"**#{i}**")
elif d["type"] == "browser" and (page_filename := d["state"]["page"]):
page_path = f"{basedir}/pages/{page_filename}"
col_i.download_button(
label="#" + str(i),
data=load_page(page_path),
file_name=recording_name + "-" + page_filename,
mime="multipart/related",
key=f"page{i}",
)
else:
col_i.button(f"#{i}", type='secondary')
if d["type"] == "chat":
col_act.markdown(format_chat_message(d), unsafe_allow_html=True)
continue
# screenshot_filename = d["state"]["screenshot"]
img = get_screenshot(d, basedir)
arguments = parse_arguments(d["action"])
event_type = d["action"]["intent"]
action_str = f"**{event_type}**({arguments})"
if img:
col_actvis.image(img)
col_act.markdown(action_str)
if show_advanced_info:
status = d["state"].get("screenshot_status", "unknown")
text = ""
if status == "good":
text += f'**:green[Used in demo]**\n\n'
text += f'Screenshot: `{d["state"]["screenshot"]}`\\\n'
text += f'Page: `{d["state"]["page"]}`\n'
col_act.markdown(text)
def load_recording(basedir):
# Before loading replay, we need a dropdown that allows us to select replay.json or replay_orig.json
# Find all files in basedir starting with "replay" and ending with ".json"
replay_files = sorted(
[
f
for f in os.listdir(basedir)
if f.startswith("replay") and f.endswith(".json")
]
)
replay_file = st.sidebar.selectbox("Select replay", replay_files, index=0)
st.sidebar.checkbox(
"Advanced Screenshot Info", False, key="show_advanced_information"
)
st.sidebar.checkbox(
"Enable HTML download", False, key="enable_html_download"
)
replay_file = replay_file.replace(".json", "")
if not Path(basedir).joinpath('metadata.json').exists():
st.error(f"Metadata file not found at {basedir}/metadata.json. This is likely an issue with Huggingface Spaces. Try cloning this repo and running locally.")
st.stop()
metadata = load_json_no_cache(basedir, "metadata")
# convert timestamp to readable date string
recording_start_timestamp = metadata["recordingStart"]
recording_start_date = datetime.fromtimestamp(
int(recording_start_timestamp) / 1000
).strftime("%Y-%m-%d %H:%M:%S")
st.sidebar.markdown(f"**started**: {recording_start_date}")
# recording_end_timestamp = k["recordingEnd"]
# calculate duration
# duration = int(recording_end_timestamp) - int(recording_start_timestamp)
# duration = time.strftime("%M:%S", time.gmtime(duration / 1000))
# Read in the JSON data
replay_dict = load_json_no_cache(basedir, replay_file)
form = load_json_no_cache(basedir, "form")
if replay_dict is None:
st.error(f"Replay file not found at {basedir}/{replay_file}. This is likely an issue with Huggingface Spaces. Try cloning this repo and running locally.")
st.stop()
if form is None:
st.error(f"Form file not found at {basedir}/form.json. This is likely an issue with Huggingface Spaces. Try cloning this repo and running locally.")
st.stop()
duration = replay_dict["data"][-1]["timestamp"] - replay_dict["data"][0]["timestamp"]
duration = time.strftime("%M:%S", time.gmtime(duration))
st.sidebar.markdown(f"**duration**: {duration}")
if not replay_dict:
return None
for key in [
"annotator",
"description",
"tasks",
"upload_date",
"instructor_sees_screen",
"uses_ai_generated_output",
]:
if form and key in form:
# Normalize the key to be more human-readable
key_name = key.replace("_", " ").title()
if type(form[key]) == list:
st.sidebar.markdown(f"**{key_name}**: {', '.join(form[key])}")
else:
st.sidebar.markdown(f"**{key_name}**: {form[key]}")
st.sidebar.markdown("---")
if replay_dict and "status" in replay_dict:
st.sidebar.markdown(f"**Validation status**: {replay_dict['status']}")
processed_meta_path = Path(basedir).joinpath('processed_metadata.json')
start_frame = 'file not found'
if processed_meta_path.exists():
with open(processed_meta_path) as f:
processed_meta = json.load(f)
start_frame = processed_meta.get('start_frame', 'info not in file')
st.sidebar.markdown(f"**Recording start frame**: {start_frame}")
# st.sidebar.button("Delete recording", type="primary", on_click=delete_recording, args=[basedir])
data = replay_dict["data"]
return data
def run():
# mode = st.sidebar.radio("Mode", ["Overview"])
demonstration_dir = "./demonstrations"
# # params = st.experimental_get_query_params()
# params = st.query_params
# print(params)
# # list demonstrations/
# demo_names = os.listdir(demonstration_dir)
# if params.get("recording"):
# if isinstance(params["recording"], list):
# recording_name = params["recording"][0]
# else:
# recording_name = params["recording"]
# else:
# recording_name = demo_names[0]
# recording_name = st.sidebar.selectbox(
# "Recordings",
# demo_names,
# index=demo_names.index(recording_name),
# )
# if recording_name != params.get("recording", [None])[0]:
# # st.experimental_set_query_params(recording=recording_name)
# # use st.query_params as a dict instead
# st.query_params['recording'] = recording_name
demo_names = os.listdir(demonstration_dir)
def update_recording_name():
st.query_params["recording"] = st.session_state.get("recording_name", demo_names[0])
# For initial run, set the query parameter to the selected recording
if not st.query_params.get("recording"):
update_recording_name()
recording_name = st.query_params.get("recording")
if recording_name not in demo_names:
st.error(f"Recording `{recording_name}` not found. Please select another recording.")
st.stop()
recording_idx = demo_names.index(recording_name)
st.sidebar.selectbox(
"Recordings", demo_names, on_change=update_recording_name, key="recording_name", index=recording_idx
)
with st.sidebar:
# Want a dropdown
st.selectbox(
"Screenshot size",
["small", "regular", "large"],
index=1,
key="screenshot_size_view_mode",
)
if recording_name is not None:
basedir = f"{demonstration_dir}/{recording_name}"
data = load_recording(basedir=basedir)
if not data:
st.stop()
show_overview(data, recording_name=recording_name, basedir=basedir)
if __name__ == "__main__":
st.set_page_config(layout="wide")
run()