|
import gradio as gr |
|
import yara |
|
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings |
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
import os |
|
|
|
|
|
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") |
|
|
|
|
|
def dsatp_parse_log(text: str) -> dict: |
|
"""Parse log for IoT threats.""" |
|
log = text.lower() |
|
threats = { |
|
"compromised": {"classification": "Threat Detected", "severity": "Critical", "mitigation": "Isolate process, run port scan"}, |
|
"unauthorized": {"classification": "Threat Detected", "severity": "High", "mitigation": "Quarantine MAC address"}, |
|
"high cpu": {"classification": "Threat Detected", "severity": "Medium", "mitigation": "Check for crypto-miner or DoS"}, |
|
"inbound traffic": {"classification": "Threat Detected", "severity": "Medium", "mitigation": "Block closed ports"}, |
|
"firmware mismatch": {"classification": "Threat Detected", "severity": "High", "mitigation": "Validate OTA or rollback"} |
|
} |
|
for key, value in threats.items(): |
|
if key in log: |
|
return {**value, "confidence": 0.9} |
|
return {"classification": "No Threat", "severity": "Safe", "mitigation": "None", "confidence": 0.5} |
|
|
|
|
|
def dsatp_yara_scan(file_path: str) -> dict: |
|
"""Scan file with YARA rules.""" |
|
try: |
|
rules = yara.compile(filepath="dsatp/rules.yar") |
|
matches = rules.match(file_path) |
|
if matches: |
|
return { |
|
"classification": "Malware Detected", |
|
"severity": "Critical", |
|
"mitigation": "Quarantine file, run antivirus", |
|
"confidence": 0.95 |
|
} |
|
return { |
|
"classification": "No Malware", |
|
"severity": "Safe", |
|
"mitigation": "None", |
|
"confidence": 0.7 |
|
} |
|
except Exception as e: |
|
return {"error": str(e), "severity": "Unknown", "mitigation": "Check file format"} |
|
|
|
|
|
def init_llama_index(): |
|
"""Load CVE/IoT corpus into LlamaIndex.""" |
|
try: |
|
documents = SimpleDirectoryReader("corpus", filename_as_id=True).load_data() |
|
return VectorStoreIndex.from_documents(documents) |
|
except Exception as e: |
|
print(f"Error loading corpus: {e}") |
|
return None |
|
|
|
index = init_llama_index() |
|
query_engine = index.as_query_engine() if index else None |
|
|
|
|
|
def chatbot_response(user_input, file, history): |
|
"""Process input or file with DSATP and LlamaIndex.""" |
|
if history is None: |
|
history = [] |
|
input_text = user_input |
|
scan_result = None |
|
|
|
if file: |
|
input_text = open(file.name, "r").read() |
|
scan_result = dsatp_yara_scan(file.name) |
|
else: |
|
scan_result = dsatp_parse_log(input_text) |
|
|
|
|
|
context_str = "No context available." |
|
if query_engine: |
|
try: |
|
context = query_engine.query(f"Mitigation for: {input_text}") |
|
context_str = str(context) |
|
except Exception as e: |
|
context_str = f"Context error: {e}" |
|
|
|
response = f"Security Analyst: {scan_result['classification']}. Severity: {scan_result['severity']}. Mitigation: {scan_result['mitigation']}. Confidence: {scan_result['confidence']:.1f}. Context: {context_str}" |
|
updated_history = history + [(user_input or "File uploaded", response)] |
|
return updated_history, scan_result |
|
|
|
|
|
with gr.Blocks(css=""" |
|
.threat-card { background-color: #1f2937; color: white; padding: 16px; border-radius: 8px; margin-bottom: 16px; } |
|
.severity-critical { background-color: #dc2626; } |
|
.severity-high { background-color: #f59e0b; } |
|
.severity-medium { background-color: #eab308; } |
|
.severity-safe { background-color: #10b981; } |
|
.severity-label { padding: 4px 8px; border-radius: 4px; } |
|
""") as demo: |
|
gr.Markdown("# AI Cybersecurity Agent") |
|
with gr.Row(): |
|
with gr.Column(): |
|
chatbot = gr.Chatbot(label="Security Analyst Chat") |
|
user_input = gr.Textbox(placeholder="Enter log data or alert (e.g., 'System compromised!')") |
|
file_input = gr.File(label="Upload .txt/.log file", file_types=[".txt", ".log"]) |
|
submit_btn = gr.Button("Analyze") |
|
with gr.Column(): |
|
gr.Markdown("### Threat Analysis Results") |
|
output_json = gr.JSON(label="Scan Results") |
|
gr.Markdown("### Threat Visualizer") |
|
gr.Markdown(""" |
|
<div class="threat-card" id="threat-card"> |
|
<p><strong>Classification:</strong> <span id="classification">{{classification}}</span></p> |
|
<p><strong>Severity:</strong> <span class="severity-label {{severity_class}}" id="severity">{{severity}}</span></p> |
|
<p><strong>Mitigation:</strong> <span id="mitigation">{{mitigation}}</span></p> |
|
<p><strong>Confidence:</strong> <span id="confidence">{{confidence}}</span></p> |
|
</div> |
|
""", visible=True) |
|
|
|
submit_btn.click( |
|
fn=chatbot_response, |
|
inputs=[user_input, file_input, chatbot], |
|
outputs=[chatbot, output_json] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(mcp_server=True) |