GenAIDevTOProd commited on
Commit
dc69696
·
verified ·
1 Parent(s): 6d81bb7

Upload tools_py.py

Browse files
Files changed (1) hide show
  1. tools_py.py +83 -0
tools_py.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # -*- coding: utf-8 -*-
2
+ """tools.py
3
+
4
+ Automatically generated by Colab.
5
+
6
+ Original file is located at
7
+ https://colab.research.google.com/drive/1Srm2oS1q5iYwlo4PoMx6TBc2uNoSVao3
8
+ """
9
+
10
+ from typing import Dict, Any, List
11
+ from langchain.tools import Tool
12
+
13
+ """**It's purely deterministic Python, no LLM inside the tool, so the reasoning stays transparent.**"""
14
+
15
+ def modernize_stack(legacy_stack: Dict[str, Any]) -> List[str]:
16
+ """Return concrete modernization actions based on a given legacy stack."""
17
+ actions: List[str] = []
18
+
19
+ scheduler = str(legacy_stack.get("scheduler", "")).lower()
20
+ language = str(legacy_stack.get("language", "")).lower()
21
+ architecture = str(legacy_stack.get("architecture", "")).lower()
22
+ dependencies = [d.lower() for d in legacy_stack.get("dependencies", [])]
23
+
24
+ if "autosys" in scheduler:
25
+ actions.append("Replace Autosys with Apache Airflow or AWS Step Functions.")
26
+
27
+ if "java 8" in language:
28
+ actions.append("Upgrade Java 8 → Java 17 and containerize the application.")
29
+
30
+ if "monolith" in architecture:
31
+ actions.append("Refactor monolith into microservices and add an API Gateway.")
32
+
33
+ if any("ftp" in dep for dep in dependencies):
34
+ actions.append("Replace FTP with S3 and event-driven ingestion via Lambda.")
35
+
36
+ actions.append("Adopt Terraform for Infrastructure-as-Code.")
37
+
38
+ return actions
39
+
40
+ """**This tool takes both the legacy_stack and an outage_scenario string to generate resilience test ideas.**"""
41
+
42
+ def generate_resilience_plan(legacy_stack: Dict[str, Any], outage_scenario: str) -> List[str]:
43
+ """Return a resilience test plan for a given outage scenario."""
44
+ plan: List[str] = []
45
+
46
+ if "oracle" in outage_scenario.lower():
47
+ plan.append("Test RDS Multi-AZ failover for Oracle DB.")
48
+ plan.append("Implement circuit breaker around DB calls.")
49
+
50
+ if "kafka" in outage_scenario.lower():
51
+ plan.append("Simulate Kafka broker outage and validate consumer recovery.")
52
+
53
+ plan.append("Perform chaos engineering tests for network partition.")
54
+ plan.append("Run DR simulation and validate RTO/RPO.")
55
+
56
+ return plan
57
+
58
+ """**This step is critical, it's how the agent knows these Python functions exist and what they do.**
59
+
60
+ - Deterministic: No AI hallucinations in the tools, pure logic.
61
+
62
+ - Composable: We can add more tools later (cost analysis, migration path scorer).
63
+
64
+ - Agent-Controlled: The agent can decide when to call each tool based on the request.
65
+ """
66
+
67
+ def get_tools() -> List[Tool]:
68
+ """Return a list of LangChain Tools for the agent."""
69
+ return [
70
+ Tool(
71
+ name="modernize_stack",
72
+ func=lambda x: "\n".join(modernize_stack(x)),
73
+ description="Recommend modernization actions for a given legacy stack."
74
+ ),
75
+ Tool(
76
+ name="generate_resilience_plan",
77
+ func=lambda x: "\n".join(generate_resilience_plan(
78
+ x.get("legacy_stack", {}),
79
+ x.get("outage_scenario", "")
80
+ )),
81
+ description="Generate a resilience test plan for the given legacy stack and outage scenario."
82
+ )
83
+ ]