Spaces:
Runtime error
Runtime error
Upload action_pipeline.py
Browse files- action_pipeline.py +19 -0
action_pipeline.py
ADDED
|
@@ -0,0 +1,19 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
|
| 2 |
+
class ActionPipeline:
|
| 3 |
+
def __init__(self):
|
| 4 |
+
self.registry = {}
|
| 5 |
+
|
| 6 |
+
def register(self, action_name: str, handler):
|
| 7 |
+
self.registry[action_name] = handler
|
| 8 |
+
|
| 9 |
+
def execute(self, action_name: str, *args, **kwargs):
|
| 10 |
+
if action_name in self.registry:
|
| 11 |
+
try:
|
| 12 |
+
result = self.registry[action_name](*args, **kwargs)
|
| 13 |
+
return f"✅ Action '{action_name}' executed successfully: {result}"
|
| 14 |
+
except Exception as e:
|
| 15 |
+
return f"❌ Error during '{action_name}': {str(e)}"
|
| 16 |
+
return f"⚠️ Unknown action: '{action_name}'"
|
| 17 |
+
|
| 18 |
+
def list_actions(self):
|
| 19 |
+
return list(self.registry.keys())
|