rahul7star commited on
Commit
f73dd64
·
verified ·
1 Parent(s): 2830b98

Update ui/queue.py

Browse files
Files changed (1) hide show
  1. ui/queue.py +398 -397
ui/queue.py CHANGED
@@ -1,397 +1,398 @@
1
- # ui/queue.py
2
- import gradio as gr
3
- import numpy as np
4
- from PIL import Image
5
- import os
6
- import json
7
- import base64
8
- import io
9
- import zipfile
10
- import tempfile
11
- import atexit
12
- import traceback
13
- from pathlib import Path
14
-
15
- # Import shared state and constants from the dedicated module.
16
- from . import shared_state
17
- from generation_core import worker
18
- from diffusers_helper.thread_utils import AsyncStream, async_run
19
-
20
-
21
- # Configuration for the autosave feature.
22
- AUTOSAVE_FILENAME = "goan_autosave_queue.zip"
23
-
24
-
25
- def np_to_base64_uri(np_array_or_tuple, format="png"):
26
- """Converts a NumPy array representing an image to a base64 data URI."""
27
- if np_array_or_tuple is None: return None
28
- try:
29
- np_array = np_array_or_tuple[0] if isinstance(np_array_or_tuple, tuple) and len(np_array_or_tuple) > 0 and isinstance(np_array_or_tuple[0], np.ndarray) else np_array_or_tuple if isinstance(np_array_or_tuple, np.ndarray) else None
30
- if np_array is None: return None
31
- pil_image = Image.fromarray(np_array.astype(np.uint8))
32
- if format.lower() == "jpeg" and pil_image.mode == "RGBA": pil_image = pil_image.convert("RGB")
33
- buffer = io.BytesIO(); pil_image.save(buffer, format=format.upper()); img_bytes = buffer.getvalue()
34
- return f"data:image/{format.lower()};base64,{base64.b64encode(img_bytes).decode('utf-8')}"
35
- except Exception as e: print(f"Error converting NumPy to base64: {e}"); return None
36
-
37
- def get_queue_state(state_dict_gr_state):
38
- """Safely retrieves the queue_state dictionary from the main application state."""
39
- if "queue_state" not in state_dict_gr_state: state_dict_gr_state["queue_state"] = {"queue": [], "next_id": 1, "processing": False, "editing_task_id": None}
40
- return state_dict_gr_state["queue_state"]
41
-
42
- def update_queue_df_display(queue_state):
43
- """Generates a Gradio DataFrame update from the current queue state."""
44
- queue = queue_state.get("queue", []); data = []; processing = queue_state.get("processing", False); editing_task_id = queue_state.get("editing_task_id", None)
45
- for i, task in enumerate(queue):
46
- params = task['params']; task_id = task['id']; prompt_display = (params['prompt'][:77] + '...') if len(params['prompt']) > 80 else params['prompt']; prompt_title = params['prompt'].replace('"', '"'); prompt_cell = f'<span title="{prompt_title}">{prompt_display}</span>'; img_uri = np_to_base64_uri(params.get('input_image'), format="png"); thumbnail_size = "50px"; img_md = f'<img src="{img_uri}" alt="Input" style="max-width:{thumbnail_size}; max-height:{thumbnail_size}; display:block; margin:auto; object-fit:contain;" />' if img_uri else ""; is_processing_current_task = processing and i == 0; is_editing_current_task = editing_task_id == task_id; task_status_val = task.get("status", "pending");
47
- if is_processing_current_task: status_display = "⏳ Processing"
48
- elif is_editing_current_task: status_display = "✏️ Editing"
49
- elif task_status_val == "done": status_display = "✅ Done"
50
- elif task_status_val == "error": status_display = f"❌ Error: {task.get('error_message', 'Unknown')}"
51
- elif task_status_val == "aborted": status_display = "⏹️ Aborted"
52
- elif task_status_val == "pending": status_display = "⏸️ Pending"
53
- data.append([task_id, status_display, prompt_cell, f"{params.get('total_second_length', 0):.1f}s", params.get('steps', 0), img_md, "↑", "↓", "✖", "✎"])
54
- return gr.DataFrame(value=data, visible=len(data) > 0)
55
-
56
- def add_or_update_task_in_queue(state_dict_gr_state, *args_from_ui_controls_tuple):
57
- """Adds a new task to the queue or updates an existing one if in edit mode."""
58
- queue_state = get_queue_state(state_dict_gr_state); editing_task_id = queue_state.get("editing_task_id", None)
59
-
60
- input_images_pil_list = args_from_ui_controls_tuple[0]
61
- all_ui_values_tuple = args_from_ui_controls_tuple[1:]
62
- if not input_images_pil_list:
63
- gr.Warning("Input image is required!")
64
- return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Add Task to Queue" if editing_task_id is None else "Update Task"), gr.update(visible=editing_task_id is not None)
65
-
66
- temp_params_from_ui = dict(zip(shared_state.ALL_TASK_UI_KEYS, all_ui_values_tuple))
67
- base_params_for_worker_dict = {}
68
- for ui_key, worker_key in shared_state.UI_TO_WORKER_PARAM_MAP.items():
69
- if ui_key == 'gs_schedule_shape_ui':
70
- base_params_for_worker_dict[worker_key] = temp_params_from_ui.get(ui_key) != 'Off'
71
- else:
72
- base_params_for_worker_dict[worker_key] = temp_params_from_ui.get(ui_key)
73
-
74
- if editing_task_id is not None:
75
- if len(input_images_pil_list) > 1: gr.Warning("Cannot update task with multiple images. Cancel edit."); return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Update Task"), gr.update(visible=True)
76
- pil_img_for_update = input_images_pil_list[0][0] if isinstance(input_images_pil_list[0], tuple) else input_images_pil_list[0]
77
- if not isinstance(pil_img_for_update, Image.Image): gr.Warning("Invalid image format for update."); return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Update Task"), gr.update(visible=True)
78
- img_np_for_update = np.array(pil_img_for_update)
79
- with shared_state.queue_lock:
80
- task_found = False
81
- for task in queue_state["queue"]:
82
- if task["id"] == editing_task_id:
83
- task["params"] = {**base_params_for_worker_dict, 'input_image': img_np_for_update}
84
- task["status"] = "pending"
85
- task_found = True
86
- break
87
- if not task_found: gr.Warning(f"Task {editing_task_id} not found for update.")
88
- else: gr.Info(f"Task {editing_task_id} updated.")
89
- queue_state["editing_task_id"] = None
90
- else:
91
- tasks_added_count = 0; first_new_task_id = -1
92
- with shared_state.queue_lock:
93
- for img_obj in input_images_pil_list:
94
- pil_image = img_obj[0] if isinstance(img_obj, tuple) else img_obj
95
- if not isinstance(pil_image, Image.Image): gr.Warning("Skipping invalid image input."); continue
96
- img_np_data = np.array(pil_image)
97
- next_id = queue_state["next_id"]
98
- if first_new_task_id == -1: first_new_task_id = next_id
99
- task = {"id": next_id, "params": {**base_params_for_worker_dict, 'input_image': img_np_data}, "status": "pending"}
100
- queue_state["queue"].append(task); queue_state["next_id"] += 1; tasks_added_count += 1
101
- if tasks_added_count > 0: gr.Info(f"Added {tasks_added_count} task(s) (start ID: {first_new_task_id}).")
102
- else: gr.Warning("No valid tasks added.")
103
-
104
- return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Add Task(s) to Queue", variant="secondary"), gr.update(visible=False)
105
-
106
- def cancel_edit_mode_action(state_dict_gr_state):
107
- """Cancels the current task editing session."""
108
- queue_state = get_queue_state(state_dict_gr_state)
109
- if queue_state.get("editing_task_id") is not None: gr.Info("Edit cancelled."); queue_state["editing_task_id"] = None
110
- return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Add Task(s) to Queue", variant="secondary"), gr.update(visible=False)
111
-
112
- def move_task_in_queue(state_dict_gr_state, direction: str, selected_indices_list: list):
113
- """Moves a selected task up or down in the queue."""
114
- if not selected_indices_list or not selected_indices_list[0]: return state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state))
115
- idx = int(selected_indices_list[0][0]); queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]
116
- with shared_state.queue_lock:
117
- if direction == 'up' and idx > 0: queue[idx], queue[idx-1] = queue[idx-1], queue[idx]
118
- elif direction == 'down' and idx < len(queue) - 1: queue[idx], queue[idx+1] = queue[idx+1], queue[idx]
119
- return state_dict_gr_state, update_queue_df_display(queue_state)
120
-
121
- def remove_task_from_queue(state_dict_gr_state, selected_indices_list: list):
122
- """Removes a selected task from the queue."""
123
- removed_task_id = None
124
- if not selected_indices_list or not selected_indices_list[0]: return state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state)), removed_task_id
125
- idx = int(selected_indices_list[0][0]); queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]
126
- with shared_state.queue_lock:
127
- if 0 <= idx < len(queue): removed_task = queue.pop(idx); removed_task_id = removed_task['id']; gr.Info(f"Removed task {removed_task_id}.")
128
- else: gr.Warning("Invalid index for removal.")
129
- return state_dict_gr_state, update_queue_df_display(queue_state), removed_task_id
130
-
131
- def handle_queue_action_on_select(evt: gr.SelectData, state_dict_gr_state, *ui_param_controls_tuple):
132
- """Handles clicks on the action buttons (↑, ↓, ✖, ✎) in the queue DataFrame."""
133
- if evt.index is None or evt.value not in ["↑", "↓", "✖", "✎"]:
134
- return [state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state))] + [gr.update()] * (len(shared_state.ALL_TASK_UI_KEYS) + 4)
135
-
136
- row_index, col_index = evt.index; button_clicked = evt.value; queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]; processing = queue_state.get("processing", False)
137
- outputs_list = [state_dict_gr_state, update_queue_df_display(queue_state)] + [gr.update()] * (len(shared_state.ALL_TASK_UI_KEYS) + 4)
138
-
139
- if button_clicked == "↑":
140
- if processing and row_index == 0: gr.Warning("Cannot move processing task."); return outputs_list
141
- new_state, new_df = move_task_in_queue(state_dict_gr_state, 'up', [[row_index, col_index]]); outputs_list[0], outputs_list[1] = new_state, new_df
142
- elif button_clicked == "↓":
143
- if processing and row_index == 0: gr.Warning("Cannot move processing task."); return outputs_list
144
- if processing and row_index == 1: gr.Warning("Cannot move below processing task."); return outputs_list
145
- new_state, new_df = move_task_in_queue(state_dict_gr_state, 'down', [[row_index, col_index]]); outputs_list[0], outputs_list[1] = new_state, new_df
146
- elif button_clicked == "✖":
147
- if processing and row_index == 0: gr.Warning("Cannot remove processing task."); return outputs_list
148
- new_state, new_df, removed_id = remove_task_from_queue(state_dict_gr_state, [[row_index, col_index]]); outputs_list[0], outputs_list[1] = new_state, new_df
149
- if removed_id is not None and queue_state.get("editing_task_id", None) == removed_id:
150
- queue_state["editing_task_id"] = None
151
- outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS)] = gr.update(value="Add Task(s) to Queue", variant="secondary")
152
- outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS) + 1] = gr.update(visible=False)
153
- elif button_clicked == "✎":
154
- if processing and row_index == 0: gr.Warning("Cannot edit processing task."); return outputs_list
155
- if 0 <= row_index < len(queue):
156
- task_to_edit = queue[row_index]; task_id_to_edit = task_to_edit['id']; params_to_load_to_ui = task_to_edit['params']
157
- queue_state["editing_task_id"] = task_id_to_edit; gr.Info(f"Editing Task {task_id_to_edit}.")
158
- img_np_from_task = params_to_load_to_ui.get('input_image')
159
- outputs_list[2] = gr.update(value=[(Image.fromarray(img_np_from_task), "loaded_image")]) if isinstance(img_np_from_task, np.ndarray) else gr.update(value=None)
160
- for i, ui_key in enumerate(shared_state.ALL_TASK_UI_KEYS):
161
- worker_key = shared_state.UI_TO_WORKER_PARAM_MAP.get(ui_key)
162
- if worker_key in params_to_load_to_ui:
163
- value_from_task = params_to_load_to_ui[worker_key]
164
- outputs_list[3 + i] = gr.update(value="Linear" if value_from_task else "Off") if ui_key == 'gs_schedule_shape_ui' else gr.update(value=value_from_task)
165
- outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS)] = gr.update(value="Update Task", variant="secondary")
166
- outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS) + 1] = gr.update(visible=True)
167
- else: gr.Warning("Invalid index for edit.")
168
- return outputs_list
169
-
170
- def clear_task_queue_action(state_dict_gr_state):
171
- """Clears all non-processing tasks from the queue."""
172
- queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]; processing = queue_state["processing"]; cleared_count = 0
173
- with shared_state.queue_lock:
174
- if processing:
175
- if len(queue) > 1: cleared_count = len(queue) - 1; queue_state["queue"] = [queue[0]]; gr.Info(f"Cleared {cleared_count} pending tasks.")
176
- else: gr.Info("Only processing task in queue.")
177
- elif queue: cleared_count = len(queue); queue.clear(); gr.Info(f"Cleared {cleared_count} tasks.")
178
- else: gr.Info("Queue empty.")
179
- if not processing and cleared_count > 0 and os.path.isfile(AUTOSAVE_FILENAME):
180
- try: os.remove(AUTOSAVE_FILENAME); print(f"Cleared autosave: {AUTOSAVE_FILENAME}.")
181
- except OSError as e: print(f"Error deleting autosave: {e}")
182
- return state_dict_gr_state, update_queue_df_display(queue_state)
183
-
184
- def save_queue_to_zip(state_dict_gr_state):
185
- """Saves the current task queue to a zip file for download."""
186
- queue_state = get_queue_state(state_dict_gr_state); queue = queue_state.get("queue", [])
187
- if not queue: gr.Info("Queue is empty. Nothing to save."); return state_dict_gr_state, ""
188
- zip_buffer = io.BytesIO(); saved_files_count = 0
189
- try:
190
- with tempfile.TemporaryDirectory() as tmpdir:
191
- queue_manifest = []; image_paths_in_zip = {}
192
- for task in queue:
193
- params_copy = task['params'].copy(); task_id_s = task['id']; input_image_np_data = params_copy.pop('input_image', None)
194
- manifest_entry = {"id": task_id_s, "params": params_copy, "status": task.get("status", "pending")}
195
- if input_image_np_data is not None:
196
- img_hash = hash(input_image_np_data.tobytes()); img_filename_in_zip = f"task_{task_id_s}_input.png"; manifest_entry['image_ref'] = img_filename_in_zip
197
- if img_hash not in image_paths_in_zip:
198
- img_save_path = os.path.join(tmpdir, img_filename_in_zip)
199
- try: Image.fromarray(input_image_np_data).save(img_save_path, "PNG"); image_paths_in_zip[img_hash] = img_filename_in_zip; saved_files_count +=1
200
- except Exception as e: print(f"Error saving image for task {task_id_s} in zip: {e}")
201
- queue_manifest.append(manifest_entry)
202
- manifest_path = os.path.join(tmpdir, "queue_manifest.json");
203
- with open(manifest_path, 'w', encoding='utf-8') as f: json.dump(queue_manifest, f, indent=4)
204
- with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
205
- zf.write(manifest_path, arcname="queue_manifest.json")
206
- for img_hash, img_filename_rel in image_paths_in_zip.items(): zf.write(os.path.join(tmpdir, img_filename_rel), arcname=img_filename_rel)
207
- zip_buffer.seek(0); zip_base64 = base64.b64encode(zip_buffer.getvalue()).decode('utf-8')
208
- gr.Info(f"Queue with {len(queue)} tasks ({saved_files_count} images) prepared for download.")
209
- return state_dict_gr_state, zip_base64
210
- except Exception as e: print(f"Error creating zip for queue: {e}"); traceback.print_exc(); gr.Warning("Failed to create zip data."); return state_dict_gr_state, ""
211
- finally: zip_buffer.close()
212
-
213
- def load_queue_from_zip(state_dict_gr_state, uploaded_zip_file_obj):
214
- """Loads a task queue from an uploaded zip file."""
215
- if not uploaded_zip_file_obj or not hasattr(uploaded_zip_file_obj, 'name') or not Path(uploaded_zip_file_obj.name).is_file(): gr.Warning("No valid file selected."); return state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state))
216
- queue_state = get_queue_state(state_dict_gr_state); newly_loaded_queue = []; max_id_in_file = 0; loaded_image_count = 0; error_messages = []
217
- try:
218
- with tempfile.TemporaryDirectory() as tmpdir_extract:
219
- with zipfile.ZipFile(uploaded_zip_file_obj.name, 'r') as zf:
220
- if "queue_manifest.json" not in zf.namelist(): raise ValueError("queue_manifest.json not found in zip")
221
- zf.extractall(tmpdir_extract)
222
- manifest_path = os.path.join(tmpdir_extract, "queue_manifest.json")
223
- with open(manifest_path, 'r', encoding='utf-8') as f: loaded_manifest = json.load(f)
224
-
225
- for task_data in loaded_manifest:
226
- params_from_manifest = task_data.get('params', {}); task_id_loaded = task_data.get('id', 0); max_id_in_file = max(max_id_in_file, task_id_loaded)
227
- image_ref_from_manifest = task_data.get('image_ref'); input_image_np_data = None
228
- if image_ref_from_manifest:
229
- img_path_in_extract = os.path.join(tmpdir_extract, image_ref_from_manifest)
230
- if os.path.exists(img_path_in_extract):
231
- try: input_image_np_data = np.array(Image.open(img_path_in_extract)); loaded_image_count +=1
232
- except Exception as img_e: error_messages.append(f"Err loading img for task {task_id_loaded}: {img_e}")
233
- else: error_messages.append(f"Missing img file for task {task_id_loaded}: {image_ref_from_manifest}")
234
- runtime_task = {"id": task_id_loaded, "params": {**params_from_manifest, 'input_image': input_image_np_data}, "status": "pending"}
235
- newly_loaded_queue.append(runtime_task)
236
- with shared_state.queue_lock: queue_state["queue"] = newly_loaded_queue; queue_state["next_id"] = max(max_id_in_file + 1, queue_state.get("next_id", 1))
237
- gr.Info(f"Loaded {len(newly_loaded_queue)} tasks ({loaded_image_count} images).")
238
- if error_messages: gr.Warning(" ".join(error_messages))
239
- except Exception as e: print(f"Error loading queue: {e}"); traceback.print_exc(); gr.Warning(f"Failed to load queue: {str(e)[:200]}")
240
- finally:
241
- if uploaded_zip_file_obj and hasattr(uploaded_zip_file_obj, 'name') and uploaded_zip_file_obj.name and tempfile.gettempdir() in os.path.abspath(uploaded_zip_file_obj.name):
242
- try: os.remove(uploaded_zip_file_obj.name)
243
- except OSError: pass
244
- return state_dict_gr_state, update_queue_df_display(queue_state)
245
-
246
- def autosave_queue_on_exit_action(state_dict_gr_state_ref):
247
- """Saves the queue to a zip file on application exit."""
248
- print("Attempting to autosave queue on exit...")
249
- queue_state = get_queue_state(state_dict_gr_state_ref)
250
- if not queue_state.get("queue"): print("Autosave: Queue is empty."); return
251
- try:
252
- _dummy_state_ignored, zip_b64_for_save = save_queue_to_zip(state_dict_gr_state_ref)
253
- if zip_b64_for_save:
254
- with open(AUTOSAVE_FILENAME, "wb") as f: f.write(base64.b64decode(zip_b64_for_save))
255
- print(f"Autosave successful: Queue saved to {AUTOSAVE_FILENAME}")
256
- else: print("Autosave failed: Could not generate zip data.")
257
- except Exception as e: print(f"Error during autosave: {e}"); traceback.print_exc()
258
-
259
- def autoload_queue_on_start_action(state_dict_gr_state):
260
- """Loads a previously autosaved queue when the application starts."""
261
- queue_state = get_queue_state(state_dict_gr_state)
262
- df_update = update_queue_df_display(queue_state)
263
-
264
- if not queue_state["queue"] and Path(AUTOSAVE_FILENAME).is_file():
265
- print(f"Autoloading queue from {AUTOSAVE_FILENAME}...")
266
- class MockFilepath:
267
- def __init__(self, name): self.name = name
268
-
269
- temp_state_for_load = {"queue_state": queue_state.copy()}
270
- loaded_state_result, df_update_after_load = load_queue_from_zip(temp_state_for_load, MockFilepath(AUTOSAVE_FILENAME))
271
-
272
- if loaded_state_result["queue_state"]["queue"]:
273
- queue_state.update(loaded_state_result["queue_state"])
274
- df_update = df_update_after_load
275
- print(f"Autoload successful. Loaded {len(queue_state['queue'])} tasks.")
276
- try:
277
- os.remove(AUTOSAVE_FILENAME)
278
- print(f"Removed autosave file: {AUTOSAVE_FILENAME}")
279
- except OSError as e:
280
- print(f"Error removing autosave file '{AUTOSAVE_FILENAME}': {e}")
281
- else:
282
- print("Autoload: File existed but queue remains empty. Resetting queue.")
283
- queue_state["queue"] = []
284
- queue_state["next_id"] = 1
285
- df_update = update_queue_df_display(queue_state)
286
-
287
- is_processing_on_load = queue_state.get("processing", False) and bool(queue_state.get("queue"))
288
- initial_video_path = state_dict_gr_state.get("last_completed_video_path")
289
- if initial_video_path and not os.path.exists(initial_video_path):
290
- print(f"Warning: Last completed video file not found at {initial_video_path}. Clearing reference.")
291
- initial_video_path = None
292
- state_dict_gr_state["last_completed_video_path"] = None
293
-
294
- return (state_dict_gr_state, df_update, gr.update(interactive=not is_processing_on_load), gr.update(interactive=is_processing_on_load), gr.update(value=initial_video_path))
295
-
296
- def process_task_queue_main_loop(state_dict_gr_state):
297
- """The main loop that processes tasks from the queue one by one."""
298
- queue_state = get_queue_state(state_dict_gr_state)
299
- shared_state.abort_event.clear()
300
-
301
- output_stream_for_ui = state_dict_gr_state.get("active_output_stream_queue")
302
-
303
- if queue_state["processing"]:
304
- gr.Info("Queue processing is already active. Attempting to re-attach UI to live updates...")
305
- if output_stream_for_ui is None:
306
- gr.Warning("No active stream found in state. Queue processing may have been interrupted. Please clear queue or restart."); queue_state["processing"] = False
307
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True)); return
308
-
309
- # --- MODIFIED: Initial yield for re-attachment path ---
310
- # Provide placeholder updates to progress/preview UI elements
311
- yield (
312
- state_dict_gr_state,
313
- update_queue_df_display(queue_state),
314
- gr.update(value=state_dict_gr_state.get("last_completed_video_path", None)), # Keep last video if available
315
- gr.update(visible=True, value=None), # Make preview visible but start blank until new data
316
- gr.update(value=f"Re-attaching to processing Task {queue_state['queue'][0]['id']}... Awaiting next preview."),
317
- gr.update(value="<div style='text-align: center;'>Re-connecting...</div>"), # Generic HTML for progress bar
318
- gr.update(interactive=False), # Process Queue button
319
- gr.update(interactive=True), # Abort button
320
- gr.update(interactive=True) # Reset button
321
- )
322
- elif not queue_state["queue"]:
323
- gr.Info("Queue is empty. Add tasks to process.")
324
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True)); return
325
- else:
326
- queue_state["processing"] = True
327
- output_stream_for_ui = AsyncStream()
328
- state_dict_gr_state["active_output_stream_queue"] = output_stream_for_ui
329
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(visible=False), gr.update(value="Queue processing started..."), gr.update(value=""), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
330
-
331
- actual_output_queue = output_stream_for_ui.output_queue if output_stream_for_ui else None
332
- if not actual_output_queue:
333
- gr.Warning("Internal error: Output queue not available. Aborting."); queue_state["processing"] = False; state_dict_gr_state["active_output_stream_queue"] = None
334
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True)); return
335
-
336
- while queue_state["queue"] and not shared_state.abort_event.is_set():
337
- with shared_state.queue_lock:
338
- if not queue_state["queue"]: break
339
- current_task_obj = queue_state["queue"][0]
340
- task_parameters_for_worker = current_task_obj["params"]
341
- current_task_id = current_task_obj["id"]
342
-
343
- if task_parameters_for_worker.get('input_image') is None:
344
- print(f"Skipping task {current_task_id}: Missing input image data.")
345
- gr.Warning(f"Task {current_task_id} skipped: Input image is missing.")
346
- with shared_state.queue_lock:
347
- current_task_obj["status"] = "error"; current_task_obj["error_message"] = "Missing Image"
348
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(visible=False), gr.update(), gr.update(), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True)); break
349
-
350
- if task_parameters_for_worker.get('seed') == -1: task_parameters_for_worker['seed'] = np.random.randint(0, 2**32 - 1)
351
-
352
- print(f"Starting task {current_task_id} (Prompt: {task_parameters_for_worker.get('prompt', '')[:30]}...).")
353
- current_task_obj["status"] = "processing"
354
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(visible=False), gr.update(value=f"Processing Task {current_task_id}..."), gr.update(value=""), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
355
-
356
- worker_args = {
357
- **task_parameters_for_worker,
358
- 'task_id': current_task_id, 'output_queue_ref': actual_output_queue, 'abort_event': shared_state.abort_event,
359
- **shared_state.models
360
- }
361
- async_run(worker, **worker_args)
362
-
363
- last_known_output_filename = state_dict_gr_state.get("last_completed_video_path", None)
364
- task_completed_successfully = False
365
- while True:
366
- flag, data_from_worker = actual_output_queue.next()
367
- if flag == 'progress':
368
- msg_task_id, preview_np_array, desc_str, html_str = data_from_worker
369
- if msg_task_id == current_task_id: yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=last_known_output_filename), gr.update(visible=(preview_np_array is not None), value=preview_np_array), desc_str, html_str, gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
370
- elif flag == 'file':
371
- msg_task_id, segment_file_path, segment_info = data_from_worker
372
- if msg_task_id == current_task_id: last_known_output_filename = segment_file_path; gr.Info(f"Task {current_task_id}: {segment_info}")
373
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=last_known_output_filename), gr.update(), gr.update(), gr.update(), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
374
- elif flag == 'aborted': current_task_obj["status"] = "aborted"; task_completed_successfully = False; break
375
- elif flag == 'error': _, error_message_str = data_from_worker; gr.Warning(f"Task {current_task_id} Error: {error_message_str}"); current_task_obj["status"] = "error"; current_task_obj["error_message"] = str(error_message_str)[:100]; task_completed_successfully = False; break
376
- elif flag == 'end': _, success_bool, final_video_path = data_from_worker; task_completed_successfully = success_bool; last_known_output_filename = final_video_path if success_bool else last_known_output_filename; current_task_obj["status"] = "done" if success_bool else "error"; break
377
-
378
- with shared_state.queue_lock:
379
- if queue_state["queue"] and queue_state["queue"][0]["id"] == current_task_id: queue_state["queue"].pop(0)
380
- state_dict_gr_state["last_completed_video_path"] = last_known_output_filename if task_completed_successfully else None
381
- final_desc = f"Task {current_task_id} {'completed' if task_completed_successfully else 'finished with issues'}."
382
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=state_dict_gr_state["last_completed_video_path"]), gr.update(visible=False), gr.update(value=final_desc), gr.update(value=""), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
383
- if shared_state.abort_event.is_set(): gr.Info("Queue processing halted by user."); break
384
-
385
- queue_state["processing"] = False; state_dict_gr_state["active_output_stream_queue"] = None
386
- final_status_msg = "All tasks processed." if not shared_state.abort_event.is_set() else "Queue processing aborted."
387
- yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=state_dict_gr_state["last_completed_video_path"]), gr.update(visible=False), gr.update(value=final_status_msg), gr.update(value=""), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True))
388
-
389
- def abort_current_task_processing_action(state_dict_gr_state):
390
- """Sends the abort signal to the currently processing task."""
391
- queue_state = get_queue_state(state_dict_gr_state)
392
- if queue_state["processing"]:
393
- gr.Info("Abort signal sent. Current task will attempt to stop shortly.")
394
- shared_state.abort_event.set()
395
- else:
396
- gr.Info("Nothing is currently processing.")
397
- return state_dict_gr_state, gr.update(interactive=not queue_state["processing"])
 
 
1
+ # ui/queue.py
2
+ import gradio as gr
3
+ import numpy as np
4
+ from PIL import Image
5
+ import os
6
+ import json
7
+ import base64
8
+ import io
9
+ import zipfile
10
+ import tempfile
11
+ import atexit
12
+ import traceback
13
+ from pathlib import Path
14
+ import spaces
15
+ # Import shared state and constants from the dedicated module.
16
+ from . import shared_state
17
+ from generation_core import worker
18
+ from diffusers_helper.thread_utils import AsyncStream, async_run
19
+
20
+
21
+ # Configuration for the autosave feature.
22
+ AUTOSAVE_FILENAME = "goan_autosave_queue.zip"
23
+
24
+
25
+ def np_to_base64_uri(np_array_or_tuple, format="png"):
26
+ """Converts a NumPy array representing an image to a base64 data URI."""
27
+ if np_array_or_tuple is None: return None
28
+ try:
29
+ np_array = np_array_or_tuple[0] if isinstance(np_array_or_tuple, tuple) and len(np_array_or_tuple) > 0 and isinstance(np_array_or_tuple[0], np.ndarray) else np_array_or_tuple if isinstance(np_array_or_tuple, np.ndarray) else None
30
+ if np_array is None: return None
31
+ pil_image = Image.fromarray(np_array.astype(np.uint8))
32
+ if format.lower() == "jpeg" and pil_image.mode == "RGBA": pil_image = pil_image.convert("RGB")
33
+ buffer = io.BytesIO(); pil_image.save(buffer, format=format.upper()); img_bytes = buffer.getvalue()
34
+ return f"data:image/{format.lower()};base64,{base64.b64encode(img_bytes).decode('utf-8')}"
35
+ except Exception as e: print(f"Error converting NumPy to base64: {e}"); return None
36
+
37
+ def get_queue_state(state_dict_gr_state):
38
+ """Safely retrieves the queue_state dictionary from the main application state."""
39
+ if "queue_state" not in state_dict_gr_state: state_dict_gr_state["queue_state"] = {"queue": [], "next_id": 1, "processing": False, "editing_task_id": None}
40
+ return state_dict_gr_state["queue_state"]
41
+
42
+ def update_queue_df_display(queue_state):
43
+ """Generates a Gradio DataFrame update from the current queue state."""
44
+ queue = queue_state.get("queue", []); data = []; processing = queue_state.get("processing", False); editing_task_id = queue_state.get("editing_task_id", None)
45
+ for i, task in enumerate(queue):
46
+ params = task['params']; task_id = task['id']; prompt_display = (params['prompt'][:77] + '...') if len(params['prompt']) > 80 else params['prompt']; prompt_title = params['prompt'].replace('"', '"'); prompt_cell = f'<span title="{prompt_title}">{prompt_display}</span>'; img_uri = np_to_base64_uri(params.get('input_image'), format="png"); thumbnail_size = "50px"; img_md = f'<img src="{img_uri}" alt="Input" style="max-width:{thumbnail_size}; max-height:{thumbnail_size}; display:block; margin:auto; object-fit:contain;" />' if img_uri else ""; is_processing_current_task = processing and i == 0; is_editing_current_task = editing_task_id == task_id; task_status_val = task.get("status", "pending");
47
+ if is_processing_current_task: status_display = "⏳ Processing"
48
+ elif is_editing_current_task: status_display = "✏️ Editing"
49
+ elif task_status_val == "done": status_display = "✅ Done"
50
+ elif task_status_val == "error": status_display = f"❌ Error: {task.get('error_message', 'Unknown')}"
51
+ elif task_status_val == "aborted": status_display = "⏹️ Aborted"
52
+ elif task_status_val == "pending": status_display = "⏸️ Pending"
53
+ data.append([task_id, status_display, prompt_cell, f"{params.get('total_second_length', 0):.1f}s", params.get('steps', 0), img_md, "↑", "↓", "✖", "✎"])
54
+ return gr.DataFrame(value=data, visible=len(data) > 0)
55
+
56
+ def add_or_update_task_in_queue(state_dict_gr_state, *args_from_ui_controls_tuple):
57
+ """Adds a new task to the queue or updates an existing one if in edit mode."""
58
+ queue_state = get_queue_state(state_dict_gr_state); editing_task_id = queue_state.get("editing_task_id", None)
59
+
60
+ input_images_pil_list = args_from_ui_controls_tuple[0]
61
+ all_ui_values_tuple = args_from_ui_controls_tuple[1:]
62
+ if not input_images_pil_list:
63
+ gr.Warning("Input image is required!")
64
+ return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Add Task to Queue" if editing_task_id is None else "Update Task"), gr.update(visible=editing_task_id is not None)
65
+
66
+ temp_params_from_ui = dict(zip(shared_state.ALL_TASK_UI_KEYS, all_ui_values_tuple))
67
+ base_params_for_worker_dict = {}
68
+ for ui_key, worker_key in shared_state.UI_TO_WORKER_PARAM_MAP.items():
69
+ if ui_key == 'gs_schedule_shape_ui':
70
+ base_params_for_worker_dict[worker_key] = temp_params_from_ui.get(ui_key) != 'Off'
71
+ else:
72
+ base_params_for_worker_dict[worker_key] = temp_params_from_ui.get(ui_key)
73
+
74
+ if editing_task_id is not None:
75
+ if len(input_images_pil_list) > 1: gr.Warning("Cannot update task with multiple images. Cancel edit."); return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Update Task"), gr.update(visible=True)
76
+ pil_img_for_update = input_images_pil_list[0][0] if isinstance(input_images_pil_list[0], tuple) else input_images_pil_list[0]
77
+ if not isinstance(pil_img_for_update, Image.Image): gr.Warning("Invalid image format for update."); return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Update Task"), gr.update(visible=True)
78
+ img_np_for_update = np.array(pil_img_for_update)
79
+ with shared_state.queue_lock:
80
+ task_found = False
81
+ for task in queue_state["queue"]:
82
+ if task["id"] == editing_task_id:
83
+ task["params"] = {**base_params_for_worker_dict, 'input_image': img_np_for_update}
84
+ task["status"] = "pending"
85
+ task_found = True
86
+ break
87
+ if not task_found: gr.Warning(f"Task {editing_task_id} not found for update.")
88
+ else: gr.Info(f"Task {editing_task_id} updated.")
89
+ queue_state["editing_task_id"] = None
90
+ else:
91
+ tasks_added_count = 0; first_new_task_id = -1
92
+ with shared_state.queue_lock:
93
+ for img_obj in input_images_pil_list:
94
+ pil_image = img_obj[0] if isinstance(img_obj, tuple) else img_obj
95
+ if not isinstance(pil_image, Image.Image): gr.Warning("Skipping invalid image input."); continue
96
+ img_np_data = np.array(pil_image)
97
+ next_id = queue_state["next_id"]
98
+ if first_new_task_id == -1: first_new_task_id = next_id
99
+ task = {"id": next_id, "params": {**base_params_for_worker_dict, 'input_image': img_np_data}, "status": "pending"}
100
+ queue_state["queue"].append(task); queue_state["next_id"] += 1; tasks_added_count += 1
101
+ if tasks_added_count > 0: gr.Info(f"Added {tasks_added_count} task(s) (start ID: {first_new_task_id}).")
102
+ else: gr.Warning("No valid tasks added.")
103
+
104
+ return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Add Task(s) to Queue", variant="secondary"), gr.update(visible=False)
105
+
106
+ def cancel_edit_mode_action(state_dict_gr_state):
107
+ """Cancels the current task editing session."""
108
+ queue_state = get_queue_state(state_dict_gr_state)
109
+ if queue_state.get("editing_task_id") is not None: gr.Info("Edit cancelled."); queue_state["editing_task_id"] = None
110
+ return state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value="Add Task(s) to Queue", variant="secondary"), gr.update(visible=False)
111
+
112
+ def move_task_in_queue(state_dict_gr_state, direction: str, selected_indices_list: list):
113
+ """Moves a selected task up or down in the queue."""
114
+ if not selected_indices_list or not selected_indices_list[0]: return state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state))
115
+ idx = int(selected_indices_list[0][0]); queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]
116
+ with shared_state.queue_lock:
117
+ if direction == 'up' and idx > 0: queue[idx], queue[idx-1] = queue[idx-1], queue[idx]
118
+ elif direction == 'down' and idx < len(queue) - 1: queue[idx], queue[idx+1] = queue[idx+1], queue[idx]
119
+ return state_dict_gr_state, update_queue_df_display(queue_state)
120
+
121
+ def remove_task_from_queue(state_dict_gr_state, selected_indices_list: list):
122
+ """Removes a selected task from the queue."""
123
+ removed_task_id = None
124
+ if not selected_indices_list or not selected_indices_list[0]: return state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state)), removed_task_id
125
+ idx = int(selected_indices_list[0][0]); queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]
126
+ with shared_state.queue_lock:
127
+ if 0 <= idx < len(queue): removed_task = queue.pop(idx); removed_task_id = removed_task['id']; gr.Info(f"Removed task {removed_task_id}.")
128
+ else: gr.Warning("Invalid index for removal.")
129
+ return state_dict_gr_state, update_queue_df_display(queue_state), removed_task_id
130
+
131
+ def handle_queue_action_on_select(evt: gr.SelectData, state_dict_gr_state, *ui_param_controls_tuple):
132
+ """Handles clicks on the action buttons (↑, ↓, ✖, ✎) in the queue DataFrame."""
133
+ if evt.index is None or evt.value not in ["↑", "↓", "✖", "✎"]:
134
+ return [state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state))] + [gr.update()] * (len(shared_state.ALL_TASK_UI_KEYS) + 4)
135
+
136
+ row_index, col_index = evt.index; button_clicked = evt.value; queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]; processing = queue_state.get("processing", False)
137
+ outputs_list = [state_dict_gr_state, update_queue_df_display(queue_state)] + [gr.update()] * (len(shared_state.ALL_TASK_UI_KEYS) + 4)
138
+
139
+ if button_clicked == "↑":
140
+ if processing and row_index == 0: gr.Warning("Cannot move processing task."); return outputs_list
141
+ new_state, new_df = move_task_in_queue(state_dict_gr_state, 'up', [[row_index, col_index]]); outputs_list[0], outputs_list[1] = new_state, new_df
142
+ elif button_clicked == "↓":
143
+ if processing and row_index == 0: gr.Warning("Cannot move processing task."); return outputs_list
144
+ if processing and row_index == 1: gr.Warning("Cannot move below processing task."); return outputs_list
145
+ new_state, new_df = move_task_in_queue(state_dict_gr_state, 'down', [[row_index, col_index]]); outputs_list[0], outputs_list[1] = new_state, new_df
146
+ elif button_clicked == "✖":
147
+ if processing and row_index == 0: gr.Warning("Cannot remove processing task."); return outputs_list
148
+ new_state, new_df, removed_id = remove_task_from_queue(state_dict_gr_state, [[row_index, col_index]]); outputs_list[0], outputs_list[1] = new_state, new_df
149
+ if removed_id is not None and queue_state.get("editing_task_id", None) == removed_id:
150
+ queue_state["editing_task_id"] = None
151
+ outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS)] = gr.update(value="Add Task(s) to Queue", variant="secondary")
152
+ outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS) + 1] = gr.update(visible=False)
153
+ elif button_clicked == "✎":
154
+ if processing and row_index == 0: gr.Warning("Cannot edit processing task."); return outputs_list
155
+ if 0 <= row_index < len(queue):
156
+ task_to_edit = queue[row_index]; task_id_to_edit = task_to_edit['id']; params_to_load_to_ui = task_to_edit['params']
157
+ queue_state["editing_task_id"] = task_id_to_edit; gr.Info(f"Editing Task {task_id_to_edit}.")
158
+ img_np_from_task = params_to_load_to_ui.get('input_image')
159
+ outputs_list[2] = gr.update(value=[(Image.fromarray(img_np_from_task), "loaded_image")]) if isinstance(img_np_from_task, np.ndarray) else gr.update(value=None)
160
+ for i, ui_key in enumerate(shared_state.ALL_TASK_UI_KEYS):
161
+ worker_key = shared_state.UI_TO_WORKER_PARAM_MAP.get(ui_key)
162
+ if worker_key in params_to_load_to_ui:
163
+ value_from_task = params_to_load_to_ui[worker_key]
164
+ outputs_list[3 + i] = gr.update(value="Linear" if value_from_task else "Off") if ui_key == 'gs_schedule_shape_ui' else gr.update(value=value_from_task)
165
+ outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS)] = gr.update(value="Update Task", variant="secondary")
166
+ outputs_list[2 + 1 + len(shared_state.ALL_TASK_UI_KEYS) + 1] = gr.update(visible=True)
167
+ else: gr.Warning("Invalid index for edit.")
168
+ return outputs_list
169
+
170
+ def clear_task_queue_action(state_dict_gr_state):
171
+ """Clears all non-processing tasks from the queue."""
172
+ queue_state = get_queue_state(state_dict_gr_state); queue = queue_state["queue"]; processing = queue_state["processing"]; cleared_count = 0
173
+ with shared_state.queue_lock:
174
+ if processing:
175
+ if len(queue) > 1: cleared_count = len(queue) - 1; queue_state["queue"] = [queue[0]]; gr.Info(f"Cleared {cleared_count} pending tasks.")
176
+ else: gr.Info("Only processing task in queue.")
177
+ elif queue: cleared_count = len(queue); queue.clear(); gr.Info(f"Cleared {cleared_count} tasks.")
178
+ else: gr.Info("Queue empty.")
179
+ if not processing and cleared_count > 0 and os.path.isfile(AUTOSAVE_FILENAME):
180
+ try: os.remove(AUTOSAVE_FILENAME); print(f"Cleared autosave: {AUTOSAVE_FILENAME}.")
181
+ except OSError as e: print(f"Error deleting autosave: {e}")
182
+ return state_dict_gr_state, update_queue_df_display(queue_state)
183
+
184
+ def save_queue_to_zip(state_dict_gr_state):
185
+ """Saves the current task queue to a zip file for download."""
186
+ queue_state = get_queue_state(state_dict_gr_state); queue = queue_state.get("queue", [])
187
+ if not queue: gr.Info("Queue is empty. Nothing to save."); return state_dict_gr_state, ""
188
+ zip_buffer = io.BytesIO(); saved_files_count = 0
189
+ try:
190
+ with tempfile.TemporaryDirectory() as tmpdir:
191
+ queue_manifest = []; image_paths_in_zip = {}
192
+ for task in queue:
193
+ params_copy = task['params'].copy(); task_id_s = task['id']; input_image_np_data = params_copy.pop('input_image', None)
194
+ manifest_entry = {"id": task_id_s, "params": params_copy, "status": task.get("status", "pending")}
195
+ if input_image_np_data is not None:
196
+ img_hash = hash(input_image_np_data.tobytes()); img_filename_in_zip = f"task_{task_id_s}_input.png"; manifest_entry['image_ref'] = img_filename_in_zip
197
+ if img_hash not in image_paths_in_zip:
198
+ img_save_path = os.path.join(tmpdir, img_filename_in_zip)
199
+ try: Image.fromarray(input_image_np_data).save(img_save_path, "PNG"); image_paths_in_zip[img_hash] = img_filename_in_zip; saved_files_count +=1
200
+ except Exception as e: print(f"Error saving image for task {task_id_s} in zip: {e}")
201
+ queue_manifest.append(manifest_entry)
202
+ manifest_path = os.path.join(tmpdir, "queue_manifest.json");
203
+ with open(manifest_path, 'w', encoding='utf-8') as f: json.dump(queue_manifest, f, indent=4)
204
+ with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
205
+ zf.write(manifest_path, arcname="queue_manifest.json")
206
+ for img_hash, img_filename_rel in image_paths_in_zip.items(): zf.write(os.path.join(tmpdir, img_filename_rel), arcname=img_filename_rel)
207
+ zip_buffer.seek(0); zip_base64 = base64.b64encode(zip_buffer.getvalue()).decode('utf-8')
208
+ gr.Info(f"Queue with {len(queue)} tasks ({saved_files_count} images) prepared for download.")
209
+ return state_dict_gr_state, zip_base64
210
+ except Exception as e: print(f"Error creating zip for queue: {e}"); traceback.print_exc(); gr.Warning("Failed to create zip data."); return state_dict_gr_state, ""
211
+ finally: zip_buffer.close()
212
+
213
+ def load_queue_from_zip(state_dict_gr_state, uploaded_zip_file_obj):
214
+ """Loads a task queue from an uploaded zip file."""
215
+ if not uploaded_zip_file_obj or not hasattr(uploaded_zip_file_obj, 'name') or not Path(uploaded_zip_file_obj.name).is_file(): gr.Warning("No valid file selected."); return state_dict_gr_state, update_queue_df_display(get_queue_state(state_dict_gr_state))
216
+ queue_state = get_queue_state(state_dict_gr_state); newly_loaded_queue = []; max_id_in_file = 0; loaded_image_count = 0; error_messages = []
217
+ try:
218
+ with tempfile.TemporaryDirectory() as tmpdir_extract:
219
+ with zipfile.ZipFile(uploaded_zip_file_obj.name, 'r') as zf:
220
+ if "queue_manifest.json" not in zf.namelist(): raise ValueError("queue_manifest.json not found in zip")
221
+ zf.extractall(tmpdir_extract)
222
+ manifest_path = os.path.join(tmpdir_extract, "queue_manifest.json")
223
+ with open(manifest_path, 'r', encoding='utf-8') as f: loaded_manifest = json.load(f)
224
+
225
+ for task_data in loaded_manifest:
226
+ params_from_manifest = task_data.get('params', {}); task_id_loaded = task_data.get('id', 0); max_id_in_file = max(max_id_in_file, task_id_loaded)
227
+ image_ref_from_manifest = task_data.get('image_ref'); input_image_np_data = None
228
+ if image_ref_from_manifest:
229
+ img_path_in_extract = os.path.join(tmpdir_extract, image_ref_from_manifest)
230
+ if os.path.exists(img_path_in_extract):
231
+ try: input_image_np_data = np.array(Image.open(img_path_in_extract)); loaded_image_count +=1
232
+ except Exception as img_e: error_messages.append(f"Err loading img for task {task_id_loaded}: {img_e}")
233
+ else: error_messages.append(f"Missing img file for task {task_id_loaded}: {image_ref_from_manifest}")
234
+ runtime_task = {"id": task_id_loaded, "params": {**params_from_manifest, 'input_image': input_image_np_data}, "status": "pending"}
235
+ newly_loaded_queue.append(runtime_task)
236
+ with shared_state.queue_lock: queue_state["queue"] = newly_loaded_queue; queue_state["next_id"] = max(max_id_in_file + 1, queue_state.get("next_id", 1))
237
+ gr.Info(f"Loaded {len(newly_loaded_queue)} tasks ({loaded_image_count} images).")
238
+ if error_messages: gr.Warning(" ".join(error_messages))
239
+ except Exception as e: print(f"Error loading queue: {e}"); traceback.print_exc(); gr.Warning(f"Failed to load queue: {str(e)[:200]}")
240
+ finally:
241
+ if uploaded_zip_file_obj and hasattr(uploaded_zip_file_obj, 'name') and uploaded_zip_file_obj.name and tempfile.gettempdir() in os.path.abspath(uploaded_zip_file_obj.name):
242
+ try: os.remove(uploaded_zip_file_obj.name)
243
+ except OSError: pass
244
+ return state_dict_gr_state, update_queue_df_display(queue_state)
245
+
246
+ def autosave_queue_on_exit_action(state_dict_gr_state_ref):
247
+ """Saves the queue to a zip file on application exit."""
248
+ print("Attempting to autosave queue on exit...")
249
+ queue_state = get_queue_state(state_dict_gr_state_ref)
250
+ if not queue_state.get("queue"): print("Autosave: Queue is empty."); return
251
+ try:
252
+ _dummy_state_ignored, zip_b64_for_save = save_queue_to_zip(state_dict_gr_state_ref)
253
+ if zip_b64_for_save:
254
+ with open(AUTOSAVE_FILENAME, "wb") as f: f.write(base64.b64decode(zip_b64_for_save))
255
+ print(f"Autosave successful: Queue saved to {AUTOSAVE_FILENAME}")
256
+ else: print("Autosave failed: Could not generate zip data.")
257
+ except Exception as e: print(f"Error during autosave: {e}"); traceback.print_exc()
258
+
259
+ def autoload_queue_on_start_action(state_dict_gr_state):
260
+ """Loads a previously autosaved queue when the application starts."""
261
+ queue_state = get_queue_state(state_dict_gr_state)
262
+ df_update = update_queue_df_display(queue_state)
263
+
264
+ if not queue_state["queue"] and Path(AUTOSAVE_FILENAME).is_file():
265
+ print(f"Autoloading queue from {AUTOSAVE_FILENAME}...")
266
+ class MockFilepath:
267
+ def __init__(self, name): self.name = name
268
+
269
+ temp_state_for_load = {"queue_state": queue_state.copy()}
270
+ loaded_state_result, df_update_after_load = load_queue_from_zip(temp_state_for_load, MockFilepath(AUTOSAVE_FILENAME))
271
+
272
+ if loaded_state_result["queue_state"]["queue"]:
273
+ queue_state.update(loaded_state_result["queue_state"])
274
+ df_update = df_update_after_load
275
+ print(f"Autoload successful. Loaded {len(queue_state['queue'])} tasks.")
276
+ try:
277
+ os.remove(AUTOSAVE_FILENAME)
278
+ print(f"Removed autosave file: {AUTOSAVE_FILENAME}")
279
+ except OSError as e:
280
+ print(f"Error removing autosave file '{AUTOSAVE_FILENAME}': {e}")
281
+ else:
282
+ print("Autoload: File existed but queue remains empty. Resetting queue.")
283
+ queue_state["queue"] = []
284
+ queue_state["next_id"] = 1
285
+ df_update = update_queue_df_display(queue_state)
286
+
287
+ is_processing_on_load = queue_state.get("processing", False) and bool(queue_state.get("queue"))
288
+ initial_video_path = state_dict_gr_state.get("last_completed_video_path")
289
+ if initial_video_path and not os.path.exists(initial_video_path):
290
+ print(f"Warning: Last completed video file not found at {initial_video_path}. Clearing reference.")
291
+ initial_video_path = None
292
+ state_dict_gr_state["last_completed_video_path"] = None
293
+
294
+ return (state_dict_gr_state, df_update, gr.update(interactive=not is_processing_on_load), gr.update(interactive=is_processing_on_load), gr.update(value=initial_video_path))
295
+
296
+ @spaces.GPU(duration=200)
297
+ def process_task_queue_main_loop(state_dict_gr_state):
298
+ """The main loop that processes tasks from the queue one by one."""
299
+ queue_state = get_queue_state(state_dict_gr_state)
300
+ shared_state.abort_event.clear()
301
+
302
+ output_stream_for_ui = state_dict_gr_state.get("active_output_stream_queue")
303
+
304
+ if queue_state["processing"]:
305
+ gr.Info("Queue processing is already active. Attempting to re-attach UI to live updates...")
306
+ if output_stream_for_ui is None:
307
+ gr.Warning("No active stream found in state. Queue processing may have been interrupted. Please clear queue or restart."); queue_state["processing"] = False
308
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True)); return
309
+
310
+ # --- MODIFIED: Initial yield for re-attachment path ---
311
+ # Provide placeholder updates to progress/preview UI elements
312
+ yield (
313
+ state_dict_gr_state,
314
+ update_queue_df_display(queue_state),
315
+ gr.update(value=state_dict_gr_state.get("last_completed_video_path", None)), # Keep last video if available
316
+ gr.update(visible=True, value=None), # Make preview visible but start blank until new data
317
+ gr.update(value=f"Re-attaching to processing Task {queue_state['queue'][0]['id']}... Awaiting next preview."),
318
+ gr.update(value="<div style='text-align: center;'>Re-connecting...</div>"), # Generic HTML for progress bar
319
+ gr.update(interactive=False), # Process Queue button
320
+ gr.update(interactive=True), # Abort button
321
+ gr.update(interactive=True) # Reset button
322
+ )
323
+ elif not queue_state["queue"]:
324
+ gr.Info("Queue is empty. Add tasks to process.")
325
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True)); return
326
+ else:
327
+ queue_state["processing"] = True
328
+ output_stream_for_ui = AsyncStream()
329
+ state_dict_gr_state["active_output_stream_queue"] = output_stream_for_ui
330
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(visible=False), gr.update(value="Queue processing started..."), gr.update(value=""), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
331
+
332
+ actual_output_queue = output_stream_for_ui.output_queue if output_stream_for_ui else None
333
+ if not actual_output_queue:
334
+ gr.Warning("Internal error: Output queue not available. Aborting."); queue_state["processing"] = False; state_dict_gr_state["active_output_stream_queue"] = None
335
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True)); return
336
+
337
+ while queue_state["queue"] and not shared_state.abort_event.is_set():
338
+ with shared_state.queue_lock:
339
+ if not queue_state["queue"]: break
340
+ current_task_obj = queue_state["queue"][0]
341
+ task_parameters_for_worker = current_task_obj["params"]
342
+ current_task_id = current_task_obj["id"]
343
+
344
+ if task_parameters_for_worker.get('input_image') is None:
345
+ print(f"Skipping task {current_task_id}: Missing input image data.")
346
+ gr.Warning(f"Task {current_task_id} skipped: Input image is missing.")
347
+ with shared_state.queue_lock:
348
+ current_task_obj["status"] = "error"; current_task_obj["error_message"] = "Missing Image"
349
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(visible=False), gr.update(), gr.update(), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True)); break
350
+
351
+ if task_parameters_for_worker.get('seed') == -1: task_parameters_for_worker['seed'] = np.random.randint(0, 2**32 - 1)
352
+
353
+ print(f"Starting task {current_task_id} (Prompt: {task_parameters_for_worker.get('prompt', '')[:30]}...).")
354
+ current_task_obj["status"] = "processing"
355
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(), gr.update(visible=False), gr.update(value=f"Processing Task {current_task_id}..."), gr.update(value=""), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
356
+
357
+ worker_args = {
358
+ **task_parameters_for_worker,
359
+ 'task_id': current_task_id, 'output_queue_ref': actual_output_queue, 'abort_event': shared_state.abort_event,
360
+ **shared_state.models
361
+ }
362
+ async_run(worker, **worker_args)
363
+
364
+ last_known_output_filename = state_dict_gr_state.get("last_completed_video_path", None)
365
+ task_completed_successfully = False
366
+ while True:
367
+ flag, data_from_worker = actual_output_queue.next()
368
+ if flag == 'progress':
369
+ msg_task_id, preview_np_array, desc_str, html_str = data_from_worker
370
+ if msg_task_id == current_task_id: yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=last_known_output_filename), gr.update(visible=(preview_np_array is not None), value=preview_np_array), desc_str, html_str, gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
371
+ elif flag == 'file':
372
+ msg_task_id, segment_file_path, segment_info = data_from_worker
373
+ if msg_task_id == current_task_id: last_known_output_filename = segment_file_path; gr.Info(f"Task {current_task_id}: {segment_info}")
374
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=last_known_output_filename), gr.update(), gr.update(), gr.update(), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
375
+ elif flag == 'aborted': current_task_obj["status"] = "aborted"; task_completed_successfully = False; break
376
+ elif flag == 'error': _, error_message_str = data_from_worker; gr.Warning(f"Task {current_task_id} Error: {error_message_str}"); current_task_obj["status"] = "error"; current_task_obj["error_message"] = str(error_message_str)[:100]; task_completed_successfully = False; break
377
+ elif flag == 'end': _, success_bool, final_video_path = data_from_worker; task_completed_successfully = success_bool; last_known_output_filename = final_video_path if success_bool else last_known_output_filename; current_task_obj["status"] = "done" if success_bool else "error"; break
378
+
379
+ with shared_state.queue_lock:
380
+ if queue_state["queue"] and queue_state["queue"][0]["id"] == current_task_id: queue_state["queue"].pop(0)
381
+ state_dict_gr_state["last_completed_video_path"] = last_known_output_filename if task_completed_successfully else None
382
+ final_desc = f"Task {current_task_id} {'completed' if task_completed_successfully else 'finished with issues'}."
383
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=state_dict_gr_state["last_completed_video_path"]), gr.update(visible=False), gr.update(value=final_desc), gr.update(value=""), gr.update(interactive=False), gr.update(interactive=True), gr.update(interactive=True))
384
+ if shared_state.abort_event.is_set(): gr.Info("Queue processing halted by user."); break
385
+
386
+ queue_state["processing"] = False; state_dict_gr_state["active_output_stream_queue"] = None
387
+ final_status_msg = "All tasks processed." if not shared_state.abort_event.is_set() else "Queue processing aborted."
388
+ yield (state_dict_gr_state, update_queue_df_display(queue_state), gr.update(value=state_dict_gr_state["last_completed_video_path"]), gr.update(visible=False), gr.update(value=final_status_msg), gr.update(value=""), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=True))
389
+
390
+ def abort_current_task_processing_action(state_dict_gr_state):
391
+ """Sends the abort signal to the currently processing task."""
392
+ queue_state = get_queue_state(state_dict_gr_state)
393
+ if queue_state["processing"]:
394
+ gr.Info("Abort signal sent. Current task will attempt to stop shortly.")
395
+ shared_state.abort_event.set()
396
+ else:
397
+ gr.Info("Nothing is currently processing.")
398
+ return state_dict_gr_state, gr.update(interactive=not queue_state["processing"])