# -*- coding: utf-8 -*- """ seed_kb_examples.py Create prompt→AttackPlan examples for RAG from train_attackplan.jsonl Usage (from repo root): %run scripts/seed_kb_examples.py # or choose a different source / count %run scripts/seed_kb_examples.py --src scripts/train_attackplan.jsonl --k 40 """ from __future__ import annotations import argparse, json, re, random from pathlib import Path from typing import Dict, Any, List, Tuple # ---------------------- # Helpers # ---------------------- def load_plans(src: Path) -> List[Dict[str, Any]]: lines = src.read_text(encoding="utf-8").splitlines() out = [] for ln in lines: ln = ln.strip() if not ln: continue try: obj = json.loads(ln) # tolerate files that contain chat rows by accident if isinstance(obj, dict) and "plan" in obj and isinstance(obj["plan"], list): out.append(obj) except Exception: continue return out def infer_device_name(item_name: str) -> str: # item_name may be: # "MIM2.mg1microgrid_switch2.status" or "mg1load_41.constant_power_A" # Take the middle chunk if MIM is present, else first chunk before '.' parts = item_name.split(".") if parts[0].startswith("MIM") and len(parts) >= 3: return parts[1] return parts[0] def infer_device_type(dev: str) -> str: s = dev.lower() if "switch" in s: return "switch" if "inverter" in s: return "inverter" if "diesel" in s or re.search(r"\bgen|generator\b", s): return "generator" if "capacitor" in s or s.startswith("cap_"): return "capacitor" if "regulator" in s or s.startswith("reg_"): return "regulator" if "load" in s: return "load" return "other" def collect_tags(plan: Dict[str, Any]) -> Dict[str, List[str]]: ops, points, mims, applys, dtypes = set(), set(), set(), set(), set() for it in plan.get("plan", []): ops.add(it.get("op", "set")) points.add(it.get("point", "")) sc = it.get("scope") or {} ap = sc.get("apply", "both") applys.add(ap) mim = sc.get("mim") if mim: mims.add(mim) dev = infer_device_name(it.get("name", "")) dtypes.add(infer_device_type(dev)) return { "ops": sorted(x for x in ops if x), "points": sorted(x for x in points if x), "apply": sorted(x for x in applys if x), "mims": sorted(mims), "device_types": sorted(dtypes), } def item_to_phrase(it: Dict[str, Any]) -> str: # Generate a concise, human prompt fragment for RAG. op = it.get("op", "set") point = it.get("point", "") val = it.get("attack_value", "") nm = infer_device_name(it.get("name", "")) sc = it.get("scope") or {} mim = sc.get("mim") # Normalize value strings a bit sval = str(val) if isinstance(val, float) and sval.endswith(".0"): sval = sval[:-2] # Choose verb template if op in {"open","close","trip"}: base = f"{op} {infer_device_type(nm)} {nm}" elif op in {"increase","decrease","scale"}: base = f"{op} {point} of {nm} by {sval}" else: # set/default base = f"set {point} of {nm} to {sval}" if mim: base += f" in {mim}" return base def plan_to_prompt(plan: Dict[str, Any], max_items: int = 6) -> str: items = plan.get("plan", [])[:max_items] if not items: return "Generate an AttackPlan JSON v1.1 (no items)." phrases = [item_to_phrase(it) for it in items] if len(phrases) == 1: return phrases[0] return "; ".join(phrases) def score(plan: Dict[str, Any]) -> Tuple[int,int,int,int]: """Sort key to promote diversity: favor both/apply, more mims, more ops, more device types.""" tags = collect_tags(plan) return ( 1 if "both" in tags["apply"] else 0, len(tags["mims"]), len(tags["ops"]), len(tags["device_types"]), ) def pick_diverse(plans: List[Dict[str, Any]], k: int, seed: int = 7) -> List[Dict[str, Any]]: rng = random.Random(seed) # Shuffle then sort by our diversity score (descending) rng.shuffle(plans) plans.sort(key=score, reverse=True) # Simple greedy: walk and enforce bucketing caps so we cover ops/apply/points seen_keys = set() picked = [] buckets = {} caps = { "apply:glm_only": max(1, k//6), "apply:both": max(1, k//3), } for p in plans: tags = collect_tags(p) key_apply = f"apply:{'glm_only' if 'glm_only' in tags['apply'] else 'both'}" buckets.setdefault(key_apply, 0) if buckets[key_apply] >= caps[key_apply]: continue # de-dup by items signature sig = tuple((it.get("op"), it.get("point"), (it.get("scope") or {}).get("mim")) for it in p.get("plan", [])[:4]) if sig in seen_keys: continue seen_keys.add(sig) picked.append(p) buckets[key_apply] += 1 if len(picked) >= k: break # If still short, top up ignoring caps i = 0 while len(picked) < k and i < len(plans): if plans[i] not in picked: picked.append(plans[i]) i += 1 return picked[:k] def write_examples(plans: List[Dict[str, Any]], outdir: Path): outdir.mkdir(parents=True, exist_ok=True) for i, p in enumerate(plans, 1): ex = { "prompt": plan_to_prompt(p), "attack_plan": p, "tags": collect_tags(p) } Path(outdir, f"ex-{i:04d}.json").write_text(json.dumps(ex, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def write_canonical_snippets(outdir: Path): """A couple of tiny single-item plans as structural references.""" outdir.mkdir(parents=True, exist_ok=True) mini = [ { "title": "set_inverter_Pref", "plan": { "version": "1.1", "time": {"start_s": 0, "end_s": 30}, "mim": {"active": True, "selected": ["MIM2"]}, "plan": [{ "name": "MIM2.mg1inverter_XXX.Pref", "scope": {"mg": "mg1", "mim":"MIM2", "apply":"both"}, "op": "set", "point": "Pref", "attack_value": 10000, "real_value": 0, "phase": None, "window": {"point_start_s": 1, "point_stop_s": 20} }] } }, { "title": "open_switch_status", "plan": { "version": "1.1", "time": {"start_s": 0, "end_s": 30}, "mim": {"active": True, "selected": ["MIM1"]}, "plan": [{ "name": "MIM1.mg2microgrid_switch_YYY.status", "scope": {"mg": "mg2", "mim":"MIM1", "apply":"both"}, "op": "set", "point": "status", "attack_value": "OPEN", "real_value": "CLOSED", "phase": None, "window": {"point_start_s": 2, "point_stop_s": 10} }] } }, { "title": "glm_only_unmapped_load", "plan": { "version": "1.1", "time": {"start_s": 0, "end_s": 30}, "mim": {"active": True, "selected": ["MIM3"]}, "plan": [{ "name": "load_42.constant_power_A", "scope": {"mg": "unmapped", "mim": None, "apply":"glm_only"}, "op": "set", "point": "constant_power_A", "attack_value": 25000, "real_value": 20000, "phase": None, "window": {"point_start_s": 5, "point_stop_s": 25} }] } } ] for m in mini: Path(outdir, f"{m['title']}.json").write_text(json.dumps(m["plan"], ensure_ascii=False, indent=2)+"\n", encoding="utf-8") def main(): ap = argparse.ArgumentParser() ap.add_argument("--src", type=str, default="scripts/train_attackplan.jsonl", help="Path to your AttackPlan JSONL") ap.add_argument("--out", type=str, default="kb/examples", help="Output folder for RAG examples") ap.add_argument("--k", type=int, default=40, help="How many examples to write") ap.add_argument("--seed", type=int, default=7) ap.add_argument("--write_snippets", action="store_true", help="Also write a few canonical mini-plans to kb/snippets/json/") args = ap.parse_args() src = Path(args.src) if not src.exists(): # Try a couple of common alternate locations candidates = [ Path("..") / "EditGlm" / "scripts" / "train_attackplan.jsonl", Path("scripts") / "train_attackplan.jsonl" ] for c in candidates: if c.exists(): src = c; break print("[seed] reading", src.resolve()) plans = load_plans(src) if not plans: raise SystemExit("No valid plans found in JSONL.") picked = pick_diverse(plans, k=args.k, seed=args.seed) write_examples(picked, Path(args.out)) if args.write_snippets: write_canonical_snippets(Path("kb/snippets/json")) print(f"[seed] wrote {len(picked)} examples to {Path(args.out).resolve()}") if args.write_snippets: print(f"[seed] wrote canonical mini snippets to {Path('kb/snippets/json').resolve()}") if __name__ == "__main__": main()