# ========================================================= # CELLA 1: IMPORT E SETUP INIZIALE # ========================================================= import os import re import gradio as gr import pandas as pd import json from typing import List, Dict, Tuple, Any import spacy import torch from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline # Import Presidio from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer from presidio_analyzer.pattern_recognizer import Pattern from presidio_analyzer.nlp_engine import NlpEngine, NlpEngineProvider from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer from presidio_anonymizer import AnonymizerEngine from presidio_anonymizer.entities import OperatorConfig # Configurazione base print("✅ Import completati!") # ========================================================= # CELLA 2: CONFIGURAZIONE RICONOSCITORI PERSONALIZZATI (CORRETTA) # ========================================================= def create_italian_recognizers(): """ Crea riconoscitori personalizzati per il contesto italiano """ recognizers = [] # CODICE FISCALE cf_patterns = [Pattern(name="codice fiscale", regex=r"\b[A-Z]{6}[0-9]{2}[A-Z][0-9]{2}[A-Z][0-9]{3}[A-Z]\b", score=0.9)] cf_recognizer = PatternRecognizer( supported_entity="CODICE_FISCALE", patterns=cf_patterns, context=["codice", "fiscale", "cf", "c.f.", "cod.fisc.", "codice fiscale"], supported_language="en" # Aggiungiamo il supporto per l'inglese ) recognizers.append(cf_recognizer) # PARTITA IVA piva_patterns = [Pattern(name="partita iva", regex=r"\b(IT)?[0-9]{11}\b", score=0.85)] piva_recognizer = PatternRecognizer( supported_entity="PARTITA_IVA", patterns=piva_patterns, context=["partita", "iva", "p.iva", "p. iva", "piva", "partita iva"], supported_language="en" ) recognizers.append(piva_recognizer) # IBAN ITALIANO iban_patterns = [Pattern(name="iban", regex=r"\b[A-Z]{2}[0-9]{2}[A-Z0-9]{4}[0-9]{7}([A-Z0-9]?){0,16}\b", score=0.9)] iban_recognizer = PatternRecognizer( supported_entity="IBAN_CODE", patterns=iban_patterns, context=["iban", "bonifico", "bancario", "conto", "pagamento", "IBAN"], supported_language="en" ) recognizers.append(iban_recognizer) # TARGA ITALIANA targa_patterns = [Pattern(name="targa", regex=r"\b[A-Z]{2}[0-9]{3}[A-Z]{2}\b", score=0.85)] targa_recognizer = PatternRecognizer( supported_entity="TARGA", patterns=targa_patterns, context=["targa", "auto", "veicolo", "automobile", "macchina"], supported_language="en" ) recognizers.append(targa_recognizer) # TELEFONO ITALIANO telefono_patterns = [ Pattern(name="telefono (con prefisso)", regex=r"\b\+39\s?[0-9]{10}\b", score=0.9), Pattern(name="telefono (cellulare)", regex=r"\b[3][0-9]{9}\b", score=0.8), Pattern(name="telefono (fisso)", regex=r"\b0[0-9]{1,3}[-\s]?[0-9]{7}\b", score=0.7), Pattern(name="telefono (generico)", regex=r"\b[0-9]{10}\b", score=0.6) ] telefono_recognizer = PatternRecognizer( supported_entity="PHONE_NUMBER", patterns=telefono_patterns, context=["telefono", "cellulare", "tel", "chiamare", "contattare", "mobile"], supported_language="en" ) recognizers.append(telefono_recognizer) # DATA ITALIANA data_patterns = [ Pattern(name="data (dd/mm/yyyy)", regex=r"\b[0-3][0-9]/[0-1][0-9]/[1-2][0-9]{3}\b", score=0.9), Pattern(name="data (dd-mm-yyyy)", regex=r"\b[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}\b", score=0.9), Pattern(name="data (d/m/yyyy)", regex=r"\b[1-9]/[1-9]/[1-2][0-9]{3}\b", score=0.8), Pattern(name="data (dd/mm/yy)", regex=r"\b[0-3][0-9]/[0-1][0-9]/[0-9]{2}\b", score=0.8) ] data_recognizer = PatternRecognizer( supported_entity="DATE_TIME", patterns=data_patterns, context=["nato", "nata", "data di nascita", "nasce", "data", "nascita"], supported_language="en" ) recognizers.append(data_recognizer) print(f"✅ Creati {len(recognizers)} riconoscitori personalizzati") return recognizers # Crea i riconoscitori italian_recognizers = create_italian_recognizers() # ========================================================= # CELLA 3: STANFORD COME RECOGNIZER SEPARATO # ========================================================= from presidio_analyzer import EntityRecognizer, RecognizerResult from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline import torch class StanfordRecognizer(EntityRecognizer): def __init__(self): self.supported_entities = ["PERSON", "ORGANIZATION", "LOCATION", "DATE_TIME", "AGE", "PHONE_NUMBER", "EMAIL"] self.supported_language = "en" # Carica il modello Stanford try: self.tokenizer = AutoTokenizer.from_pretrained("StanfordAIMI/stanford-deidentifier-base") self.model = AutoModelForTokenClassification.from_pretrained("StanfordAIMI/stanford-deidentifier-base") self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.model.to(self.device) self.model.eval() # Crea una pipeline per gestire più facilmente il modello self.pipeline = pipeline( "token-classification", model=self.model, tokenizer=self.tokenizer, device=0 if torch.cuda.is_available() else -1, aggregation_strategy="max" ) print("✅ Modello Stanford caricato con successo!") except Exception as e: print(f"⚠️ Errore nel caricamento del modello Stanford: {e}") self.pipeline = None super().__init__(supported_entities=self.supported_entities, supported_language=self.supported_language) def analyze(self, text, entities, nlp_artifacts): """ Analizza il testo e restituisce RecognizerResult """ results = [] if self.pipeline is None: return results try: # Usa la pipeline per processare il testo outputs = self.pipeline(text) for output in outputs: # Mappa le etichette del modello Stanford a quelle di Presidio stanford_to_presidio = { "PATIENT": "PERSON", "STAFF": "PERSON", "HOSP": "ORGANIZATION", "HOSPITAL": "ORGANIZATION", "AGE": "AGE", "DATE": "DATE_TIME", "PHONE": "PHONE_NUMBER", "PER": "PERSON", "LOC": "LOCATION", "ORG": "ORGANIZATION", "PERSON": "PERSON", "LOCATION": "LOCATION", "ORGANIZATION": "ORGANIZATION" } entity_type = output.get("entity_group", "") # Rimuovi prefissi B-, I- se presenti if entity_type.startswith(("B-", "I-")): entity_type = entity_type[2:] # Mappa all'entità Presidio presidio_entity = stanford_to_presidio.get(entity_type, entity_type) # Crea RecognizerResult se l'entità è supportata if presidio_entity in self.supported_entities: result = RecognizerResult( entity_type=presidio_entity, start=output["start"], end=output["end"], score=output["score"] ) results.append(result) except Exception as e: print(f"Errore nell'analisi Stanford: {e}") return results def load(self): pass # Il caricamento è fatto nel costruttore # Crea un'istanza del recognizer Stanford stanford_recognizer = StanfordRecognizer() # Se l'analyzer è già stato creato, aggiungi il recognizer Stanford if 'analyzer' in globals(): try: analyzer.registry.add_recognizer(stanford_recognizer) print("✅ Stanford recognizer aggiunto a Presidio") except Exception as e: print(f"⚠️ Errore nell'aggiunta di Stanford recognizer: {e}") # ========================================================= # CELLA 4: SISTEMA DI REGEX FALLBACK # ========================================================= class RegexFallbackEngine: def __init__(self): self.patterns = { "PERSON": [ r"\b[A-Z][a-z]+\s+[A-Z][a-z]+\b", # Nome Cognome r"\b(?:Sig\.|Dott\.|Dr\.|Ing\.)\s+[A-Z][a-z]+\s+[A-Z][a-z]+\b", # Titolo + Nome Cognome ], "CODICE_FISCALE": [ r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b", ], "PARTITA_IVA": [ r"\b(?:IT)?\d{11}\b", ], "DATE_TIME": [ r"\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b", r"\b\d{1,2}\s+(?:gennaio|febbraio|marzo|aprile|maggio|giugno|luglio|agosto|settembre|ottobre|novembre|dicembre)\s+\d{4}\b", ], "PHONE_NUMBER": [ r"\b\+39\s?\d{10}\b", r"\b\d{10}\b", r"\b0\d{1,3}[-\.\s]?\d{7}\b", ], "EMAIL": [ r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", ], "IBAN_CODE": [ r"\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}[A-Z0-9]{12}\b", ], "TARGA": [ r"\b[A-Z]{2}\d{3}[A-Z]{2}\b", ] } def analyze(self, text): """ Analizza il testo utilizzando regex """ results = [] for entity_type, patterns in self.patterns.items(): for pattern in patterns: for match in re.finditer(pattern, text): results.append({ "entity_type": entity_type, "start": match.start(), "end": match.end(), "text": match.group(), "score": 0.9 # Assegna un punteggio fisso per regex }) return results # Inizializza il sistema di regex fallback regex_engine = RegexFallbackEngine() # ========================================================= # CELLA 5: CONFIGURAZIONE PRESIDIO SEMPLIFICATA # ========================================================= # Per ora, creiamo una configurazione base senza Stanford, che possiamo aggiungere come recognizer separato from presidio_analyzer import AnalyzerEngine from presidio_analyzer.predefined_recognizers import ( PhoneRecognizer, EmailRecognizer, CreditCardRecognizer, IbanRecognizer ) def setup_presidio_simple(): """ Configura Presidio con setup semplificato """ try: # Crea l'analyzer engine con configurazione di base analyzer = AnalyzerEngine() # Aggiungi riconoscitori predefiniti try: analyzer.registry.add_recognizer(PhoneRecognizer()) except: pass try: analyzer.registry.add_recognizer(EmailRecognizer()) except: pass try: analyzer.registry.add_recognizer(CreditCardRecognizer()) except: pass try: analyzer.registry.add_recognizer(IbanRecognizer()) except: pass # Aggiungi riconoscitori personalizzati se definiti if 'italian_recognizers' in globals(): for recognizer in italian_recognizers: try: analyzer.registry.add_recognizer(recognizer) except Exception as e: print(f"Errore aggiungendo recognizer: {e}") # Crea l'anonymizer engine anonymizer = AnonymizerEngine() print("✅ Presidio configurato con successo!") return analyzer, anonymizer except Exception as e: print(f"❌ Errore nella configurazione di Presidio: {e}") # Fallback a configurazione minima analyzer = AnalyzerEngine() anonymizer = AnonymizerEngine() print("⚠️ Usando configurazione di default") return analyzer, anonymizer # Inizializza Presidio analyzer, anonymizer = setup_presidio_simple() # ========================================================= # CELLA 6: SISTEMA DI ANONIMIZZAZIONE IBRIDO (CORRETTO) # ========================================================= class HybridAnonymizer: def __init__(self, presidio_analyzer, regex_engine, anonymizer): self.presidio_analyzer = presidio_analyzer self.regex_engine = regex_engine self.anonymizer = anonymizer def analyze_text(self, text, enable_stanford=True, enable_regex=True): """ Analizza il testo usando tutti i metodi disponibili """ all_entities = [] # Presidio ora include Stanford tramite il recognizer aggiunto presidio_results = self.presidio_analyzer.analyze( text=text, language="en", entities=None, # Usa tutti i recognizer disponibili allow_list=None ) # Converti risultati Presidio for result in presidio_results: all_entities.append({ "entity_type": result.entity_type, "start": result.start, "end": result.end, "text": text[result.start:result.end], "score": result.score, "source": "presidio" }) # Aggiungi regex se abilitato if enable_regex: try: regex_entities = self.regex_engine.analyze(text) for entity in regex_entities: all_entities.append({ "entity_type": entity["entity_type"], "start": entity["start"], "end": entity["end"], "text": entity["text"], "score": entity["score"], "source": "regex" }) except Exception as e: print(f"Errore in Regex: {e}") return self._merge_overlapping_entities(all_entities) def _merge_overlapping_entities(self, entities): if not entities: return [] entities.sort(key=lambda x: (x["start"], -x["score"])) merged = [] for entity in entities: if not merged or merged[-1]["end"] <= entity["start"]: merged.append(entity) elif entity["score"] > merged[-1]["score"]: merged[-1] = entity return merged def anonymize_text(self, text, entities, anonymization_type="replace"): """ Anonimizza il testo basandosi sulle entità trovate con diversi metodi Tipi di anonimizzazione: - replace: sostituisce con tag (es. ) - redact: oscura con asterischi (es. ******) - pseudonymize: sostituisce con valori fittizi (es. Persona1) """ if not entities: return text if anonymization_type == "replace": # Usa Presidio per sostituire le entità con tag presidio_results = [] for entity in entities: from presidio_analyzer import RecognizerResult presidio_results.append( RecognizerResult( entity_type=entity["entity_type"], start=entity["start"], end=entity["end"], score=entity["score"] ) ) # Configura l'anonimizzazione con tag operators = { "PERSON": OperatorConfig("replace", {"new_value": ""}), "LOCATION": OperatorConfig("replace", {"new_value": ""}), "ORGANIZATION": OperatorConfig("replace", {"new_value": ""}), "DATE_TIME": OperatorConfig("replace", {"new_value": ""}), "PHONE_NUMBER": OperatorConfig("replace", {"new_value": ""}), "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": ""}), "IBAN_CODE": OperatorConfig("replace", {"new_value": ""}), "CODICE_FISCALE": OperatorConfig("replace", {"new_value": ""}), "PARTITA_IVA": OperatorConfig("replace", {"new_value": ""}), "TARGA": OperatorConfig("replace", {"new_value": ""}), "AGE": OperatorConfig("replace", {"new_value": ""}) } anonymized_result = self.anonymizer.anonymize( text=text, analyzer_results=presidio_results, operators=operators ) return anonymized_result.text elif anonymization_type == "redact": # Sostituisce ogni entità con asterischi anonymized = text # Ordina le entità per posizione (dall'ultima alla prima) per non alterare gli indici sorted_entities = sorted(entities, key=lambda x: x["start"], reverse=True) for entity in sorted_entities: # Genera asterischi della stessa lunghezza dell'entità asterisks = "*" * (entity["end"] - entity["start"]) # Sostituisci il testo anonymized = anonymized[:entity["start"]] + asterisks + anonymized[entity["end"]:] return anonymized elif anonymization_type == "pseudonymize": # Sostituisce ogni entità con un valore fittizio anonymized = text # Dizionario per tenere traccia dei valori fittizi generati pseudonyms = {} type_counts = {} # Ordina le entità per posizione (dall'ultima alla prima) per non alterare gli indici sorted_entities = sorted(entities, key=lambda x: x["start"], reverse=True) for entity in sorted_entities: entity_type = entity["entity_type"] entity_text = entity["text"] # Se questa entità è già stata sostituita in precedenza, usa lo stesso valore if entity_text in pseudonyms: new_value = pseudonyms[entity_text] else: # Inizializza il contatore se non esiste if entity_type not in type_counts: type_counts[entity_type] = 0 # Incrementa il contatore type_counts[entity_type] += 1 # Genera un valore fittizio basato sul tipo di entità if entity_type == "PERSON": new_value = f"Persona{type_counts[entity_type]}" elif entity_type == "LOCATION": new_value = f"Luogo{type_counts[entity_type]}" elif entity_type == "ORGANIZATION": new_value = f"Organizzazione{type_counts[entity_type]}" elif entity_type == "DATE_TIME": new_value = f"Data{type_counts[entity_type]}" elif entity_type == "PHONE_NUMBER": new_value = f"+39-XXX-XXX-{1000+type_counts[entity_type]}" elif entity_type == "EMAIL_ADDRESS" or entity_type == "EMAIL": new_value = f"email{type_counts[entity_type]}@esempio.com" elif entity_type == "IBAN_CODE": new_value = f"IT00X0000000000000{type_counts[entity_type]}" elif entity_type == "CODICE_FISCALE" or entity_type == "CF": new_value = f"ABCDEF00G00H000{type_counts[entity_type]}" elif entity_type == "PARTITA_IVA" or entity_type == "PIVA": new_value = f"IT0000000000{type_counts[entity_type]}" elif entity_type == "TARGA": new_value = f"XX000{type_counts[entity_type]}" elif entity_type == "AGE": new_value = f"XX" else: new_value = f"{entity_type}{type_counts[entity_type]}" # Memorizza il valore generato per riusi futuri pseudonyms[entity_text] = new_value # Sostituisci il testo anonymized = anonymized[:entity["start"]] + new_value + anonymized[entity["end"]:] return anonymized else: # Tipo di anonimizzazione non supportato, usa replace come fallback print(f"Tipo di anonimizzazione non supportato: {anonymization_type}, usando 'replace'") return self.anonymize_text(text, entities, "replace") # Inizializza il sistema ibrido hybrid_anonymizer = HybridAnonymizer(analyzer, regex_engine, anonymizer) # ========================================================= # CELLA 7: UTILITÀ DI VISUALIZZAZIONE # ========================================================= # Colori per i diversi tipi di entità ENTITY_COLORS = { "PERSON": "#ff7f50", # Corallo "LOCATION": "#6495ed", # Azzurro "ORGANIZATION": "#9acd32", # Verde "DATE_TIME": "#ffa500", # Arancione "PHONE_NUMBER": "#da70d6", # Orchidea "EMAIL_ADDRESS": "#dda0dd", # Plum "IBAN_CODE": "#1e90ff", # Blu "CODICE_FISCALE": "#ff69b4", # Rosa "PARTITA_IVA": "#ff69b4", # Rosa "TARGA": "#bdb76b" # Kaki } def highlight_entities_html(text, entities): """ Evidenzia le entità trovate nel testo con colori """ if not entities: return text # Prepara HTML con span colorati chars = list(text) spans = [] for entity in entities: entity_type = entity["entity_type"] source = entity.get("source", "unknown") color = ENTITY_COLORS.get(entity_type, "#cccccc") score = int(entity["score"] * 100) # Tooltip con informazioni dettagliate tooltip = f"{entity_type} ({score}%) - detected by {source}" spans.append({ "index": entity["start"], "content": f'', "is_opening": True }) spans.append({ "index": entity["end"], "content": '', "is_opening": False }) # Ordina i span (chiusura prima dell'apertura se stesso indice) spans.sort(key=lambda x: (x["index"], not x["is_opening"])) # Inserisce i tag span nel testo offset = 0 for span in spans: adjusted_index = span["index"] + offset chars.insert(adjusted_index, span["content"]) offset += 1 return "".join(chars) def generate_statistics(entities): """ Genera statistiche sulle entità rilevate """ stats = { "total_entities": len(entities), "by_type": {}, "by_source": {}, "avg_confidence": 0, "all_detected_types": set() } for entity in entities: entity_type = entity["entity_type"] source = entity.get("source", "unknown") score = entity["score"] # Count by type stats["by_type"][entity_type] = stats["by_type"].get(entity_type, 0) + 1 # Count by source stats["by_source"][source] = stats["by_source"].get(source, 0) + 1 # Track all detected types stats["all_detected_types"].add(entity_type) # Update average confidence stats["avg_confidence"] += score if entities: stats["avg_confidence"] /= len(entities) stats["all_detected_types"] = list(stats["all_detected_types"]) return stats # ========================================================= # CELLA 8: INTERFACCIA GRADIO (MODIFICHE) # ========================================================= def process_text_gradio(text, anonymization_type, use_stanford, use_regex, confidence_threshold): """ Processa il testo con l'interfaccia Gradio """ # Verifica che il testo sia una stringa if not isinstance(text, str): return "Errore: input deve essere una stringa", "", "Errore: tipo di input non valido" if not text.strip(): return "", "", "Nessun testo fornito" try: # Analizza il testo entities = hybrid_anonymizer.analyze_text( text, enable_stanford=use_stanford, enable_regex=use_regex ) # Filtra per confidenza filtered_entities = [e for e in entities if e["score"] >= confidence_threshold] # Genera HTML evidenziato highlighted_html = highlight_entities_html(text, filtered_entities) # Anonimizza il testo anonymized_text = hybrid_anonymizer.anonymize_text(text, filtered_entities, anonymization_type) # Genera statistiche stats = generate_statistics(filtered_entities) # Formatta le statistiche per Gradio stats_str = f""" **Statistiche rilevamento:** - Entità totali trovate: {stats['total_entities']} - Confidenza media: {stats['avg_confidence']:.2%} - Tipi di entità rilevati: {', '.join(sorted(stats['all_detected_types']))} **Per tipo:** {chr(10).join([f"- {k}: {v}" for k, v in stats['by_type'].items()])} **Per sorgente:** {chr(10).join([f"- {k}: {v}" for k, v in stats['by_source'].items()])} """ return highlighted_html, anonymized_text, stats_str except Exception as e: import traceback error_msg = f"Errore: {str(e)}\n{traceback.format_exc()}" return error_msg, "", error_msg # ========================================================= # CELLA 9: INTERFACCIA DI CONTROLLO ENTITÀ (VERSIONE COMPLETA) # ========================================================= def process_text_with_entity_control( text, anonymization_type, use_stanford, use_regex, confidence_threshold, person_enabled, location_enabled, organization_enabled, date_time_enabled, phone_number_enabled, email_enabled, iban_enabled, codice_fiscale_enabled, partita_iva_enabled, targa_enabled, # Nuovi parametri di anonimizzazione tag_format="", redact_char="*", preserve_length=False, # Anonimizzazione per tipo specifico person_anon_method=None, location_anon_method=None, organization_anon_method=None, date_time_anon_method=None, phone_anon_method=None, email_anon_method=None, iban_anon_method=None, cf_anon_method=None, piva_anon_method=None, targa_anon_method=None, # Soglie di confidenza specifiche per tipo person_threshold=None, location_threshold=None, organization_threshold=None, date_time_threshold=None, phone_threshold=None, email_threshold=None, iban_threshold=None, cf_threshold=None, piva_threshold=None, targa_threshold=None, # Formati dei pseudonimi person_pseudo_format="Persona{num}", location_pseudo_format="Luogo{num}", organization_pseudo_format="Organizzazione{num}", date_pseudo_format="Data{num}", phone_pseudo_format="+39-XXX-XXX-{num}", email_pseudo_format="email{num}@esempio.com", iban_pseudo_format="IT00X0000000000000{num}", cf_pseudo_format="ABCDEF00G00H000{num}", piva_pseudo_format="IT0000000000{num}", targa_pseudo_format="XX000{num}" ): """ Processa il testo con controllo sulle entità da estrarre/anonimizzare e con parametri avanzati di anonimizzazione """ # Verifica che il testo sia una stringa if not isinstance(text, str): return "Errore: input deve essere una stringa", "", "Errore: tipo di input non valido" if not text.strip(): return "", "", "Nessun testo fornito" try: # Crea una lista di entità abilitate e mappa dei metodi per tipo enabled_entities = [] entity_anon_methods = {} entity_thresholds = {} entity_pseudo_formats = {} # Mappa degli entity types, abilitazione, metodi, soglie e formati entity_config = [ ("PERSON", person_enabled, person_anon_method, person_threshold, person_pseudo_format), ("LOCATION", location_enabled, location_anon_method, location_threshold, location_pseudo_format), ("ORGANIZATION", organization_enabled, organization_anon_method, organization_threshold, organization_pseudo_format), ("DATE_TIME", date_time_enabled, date_time_anon_method, date_time_threshold, date_pseudo_format), ("PHONE_NUMBER", phone_number_enabled, phone_anon_method, phone_threshold, phone_pseudo_format), ("EMAIL", email_enabled, email_anon_method, email_threshold, email_pseudo_format), ("EMAIL_ADDRESS", email_enabled, email_anon_method, email_threshold, email_pseudo_format), ("IBAN_CODE", iban_enabled, iban_anon_method, iban_threshold, iban_pseudo_format), ("CODICE_FISCALE", codice_fiscale_enabled, cf_anon_method, cf_threshold, cf_pseudo_format), ("PARTITA_IVA", partita_iva_enabled, piva_anon_method, piva_threshold, piva_pseudo_format), ("TARGA", targa_enabled, targa_anon_method, targa_threshold, targa_pseudo_format) ] # Popola gli array basandosi sulla configurazione for entity_type, is_enabled, anon_method, threshold, pseudo_format in entity_config: if is_enabled: enabled_entities.append(entity_type) # Se è specificato un metodo specifico per questo tipo, usalo if anon_method: entity_anon_methods[entity_type] = anon_method # Se è specificata una soglia specifica per questo tipo, usala if threshold is not None: entity_thresholds[entity_type] = threshold # Salva il formato del pseudonimo per questo tipo entity_pseudo_formats[entity_type] = pseudo_format # Se nessuna entità è abilitata, mostra il testo originale if not enabled_entities: return text, text, "Nessuna entità selezionata per l'anonimizzazione" # Analizza il testo entities = hybrid_anonymizer.analyze_text( text, enable_stanford=use_stanford, enable_regex=use_regex ) # Filtra per confidenza e per tipo di entità abilitato, usando soglie specifiche per tipo se disponibili filtered_entities = [] for e in entities: if e["entity_type"] in enabled_entities: # Determina la soglia da usare entity_threshold = entity_thresholds.get(e["entity_type"], confidence_threshold) if e["score"] >= entity_threshold: filtered_entities.append(e) # Genera HTML evidenziato highlighted_html = highlight_entities_html(text, filtered_entities) # Anonimizza il testo con i parametri avanzati anonymized_text = advanced_anonymize_text( text, filtered_entities, anonymization_type, tag_format=tag_format, redact_char=redact_char, preserve_length=preserve_length, entity_anon_methods=entity_anon_methods, entity_pseudo_formats=entity_pseudo_formats ) # Genera statistiche stats = generate_statistics(filtered_entities) # Formatta le statistiche per Gradio stats_str = f""" **Statistiche rilevamento:** - Entità totali trovate: {stats['total_entities']} - Confidenza media: {stats['avg_confidence']:.2%} - Tipi di entità rilevati: {', '.join(sorted(stats['all_detected_types']))} **Per tipo:** {chr(10).join([f"- {k}: {v}" for k, v in stats['by_type'].items()])} **Per sorgente:** {chr(10).join([f"- {k}: {v}" for k, v in stats['by_source'].items()])} **Parametri di anonimizzazione:** - Metodo globale: {anonymization_type} - Formato tag: {tag_format} - Preserva lunghezza: {"Sì" if preserve_length else "No"} """ # Aggiungi informazioni sui metodi specifici if entity_anon_methods: stats_str += "\n**Metodi specifici per tipo:**\n" stats_str += chr(10).join([f"- {k}: {v}" for k, v in entity_anon_methods.items()]) # Aggiungi informazioni sulle soglie specifiche if entity_thresholds: stats_str += "\n\n**Soglie di confidenza specifiche:**\n" stats_str += chr(10).join([f"- {k}: {v}" for k, v in entity_thresholds.items()]) return highlighted_html, anonymized_text, stats_str except Exception as e: import traceback error_msg = f"Errore: {str(e)}\n{traceback.format_exc()}" return error_msg, "", error_msg def advanced_anonymize_text(text, entities, global_anon_type, tag_format="", redact_char="*", preserve_length=False, entity_anon_methods={}, entity_pseudo_formats={}): """ Versione avanzata dell'anonimizzazione che supporta più parametri """ if not entities: return text # Ordina le entità per posizione (dall'ultima alla prima) per non alterare gli indici sorted_entities = sorted(entities, key=lambda x: x["start"], reverse=True) anonymized = text # Dizionario per tenere traccia dei valori sostituiti pseudonyms = {} type_counts = {} for entity in sorted_entities: entity_type = entity["entity_type"] entity_text = entity["text"] entity_start = entity["start"] entity_end = entity["end"] # Determina il metodo di anonimizzazione per questa entità specifica anon_type = entity_anon_methods.get(entity_type, global_anon_type) if anon_type == "replace": # Formatta il tag in base al formato scelto if tag_format == "": new_value = f"<{entity_type}>" elif tag_format == "[TAG]": new_value = f"[{entity_type}]" elif tag_format == "{TAG}": new_value = f"{{{entity_type}}}" elif tag_format == "TAG_": new_value = f"{entity_type}_" else: new_value = f"<{entity_type}>" elif anon_type == "redact": # Redact con il carattere scelto, mantenendo o meno la lunghezza originale if preserve_length: new_value = redact_char * (entity_end - entity_start) else: new_value = redact_char * 5 # Lunghezza fissa elif anon_type == "pseudonymize": # Pseudonimizzazione con nomi fittizi if entity_text in pseudonyms: new_value = pseudonyms[entity_text] else: # Inizializza il contatore se non esiste if entity_type not in type_counts: type_counts[entity_type] = 0 # Incrementa il contatore type_counts[entity_type] += 1 # Ottieni il formato del pseudonimo per questo tipo di entità pseudo_format = entity_pseudo_formats.get(entity_type, "") # Genera un valore fittizio basato sul tipo e formato if pseudo_format: try: # Prova a formattare usando il formato specificato new_value = pseudo_format.format( num=type_counts[entity_type], type=entity_type, orig=entity_text[:1] if entity_text else "X" ) except Exception: # Fallback in caso di errore di formattazione new_value = f"{entity_type}{type_counts[entity_type]}" else: # Formati predefiniti per ogni tipo se non specificato if entity_type == "PERSON": new_value = f"Persona{type_counts[entity_type]}" elif entity_type == "LOCATION": new_value = f"Luogo{type_counts[entity_type]}" elif entity_type == "ORGANIZATION": new_value = f"Organizzazione{type_counts[entity_type]}" elif entity_type == "DATE_TIME": new_value = f"Data{type_counts[entity_type]}" elif entity_type == "PHONE_NUMBER": new_value = f"+39-XXX-XXX-{1000+type_counts[entity_type]}" elif entity_type == "EMAIL_ADDRESS" or entity_type == "EMAIL": new_value = f"email{type_counts[entity_type]}@esempio.com" elif entity_type == "IBAN_CODE": new_value = f"IT00X0000000000000{type_counts[entity_type]}" elif entity_type == "CODICE_FISCALE" or entity_type == "CF": new_value = f"ABCDEF00G00H000{type_counts[entity_type]}" elif entity_type == "PARTITA_IVA" or entity_type == "PIVA": new_value = f"IT0000000000{type_counts[entity_type]}" elif entity_type == "TARGA": new_value = f"XX000{type_counts[entity_type]}" else: new_value = f"{entity_type}{type_counts[entity_type]}" # Memorizza il valore per riusi futuri pseudonyms[entity_text] = new_value # Adatta la lunghezza se necessario if preserve_length and len(new_value) < (entity_end - entity_start): new_value = new_value.ljust(entity_end - entity_start) elif preserve_length and len(new_value) > (entity_end - entity_start): # Troncamento con ellipsis new_value = new_value[:entity_end - entity_start - 1] + "…" else: # Tipo sconosciuto, usa il metodo replace come fallback new_value = f"<{entity_type}>" # Sostituisci il testo anonymized = anonymized[:entity_start] + new_value + anonymized[entity_end:] return anonymized # Esempi per la nuova interfaccia entity_control_examples = [ [ "Il signor Mario Rossi, nato il 15/04/1980, CF: RSSMRC80D15H501V, residente in Via Roma 123, Milano, possiede la partita IVA IT12345678901.", "replace", True, False, 0.5, True, # person_enabled True, # location_enabled True, # organization_enabled True, # date_time_enabled True, # phone_number_enabled True, # email_enabled True, # iban_enabled True, # codice_fiscale_enabled True, # partita_iva_enabled True, # targa_enabled ], [ "Per contattare il cliente Giovanni Bianchi utilizzare l'email giovanni.bianchi@example.com o il numero +39 333-123-4567.", "replace", False, True, 0.6, True, # person_enabled False, # location_enabled False, # organization_enabled False, # date_time_enabled True, # phone_number_enabled True, # email_enabled False, # iban_enabled False, # codice_fiscale_enabled False, # partita_iva_enabled False, # targa_enabled ], [ "Il veicolo targato AB123CD appartiene a Maria Verdi, titolare del conto bancario IT12K1234567890123456789012.", "replace", True, True, 0.7, True, # person_enabled False, # location_enabled False, # organization_enabled False, # date_time_enabled False, # phone_number_enabled False, # email_enabled True, # iban_enabled False, # codice_fiscale_enabled False, # partita_iva_enabled True, # targa_enabled ] ] def process_text_with_entity_control_wrapper( text, anonymization_type, use_stanford, use_regex, confidence_threshold, person_enabled, location_enabled, organization_enabled, date_time_enabled, phone_number_enabled, email_enabled, iban_enabled, codice_fiscale_enabled, partita_iva_enabled, targa_enabled ): """ Funzione wrapper che passa i parametri predefiniti ai nuovi parametri della funzione originale """ return process_text_with_entity_control( text=text, anonymization_type=anonymization_type, use_stanford=use_stanford, use_regex=use_regex, confidence_threshold=confidence_threshold, person_enabled=person_enabled, location_enabled=location_enabled, organization_enabled=organization_enabled, date_time_enabled=date_time_enabled, phone_number_enabled=phone_number_enabled, email_enabled=email_enabled, iban_enabled=iban_enabled, codice_fiscale_enabled=codice_fiscale_enabled, partita_iva_enabled=partita_iva_enabled, targa_enabled=targa_enabled, # Valori predefiniti per i nuovi parametri tag_format="", redact_char="*", preserve_length=False, # Metodi specifici per tipo (tutti None = usa metodo globale) person_anon_method=None, location_anon_method=None, organization_anon_method=None, date_time_anon_method=None, phone_anon_method=None, email_anon_method=None, iban_anon_method=None, cf_anon_method=None, piva_anon_method=None, targa_anon_method=None, # Soglie specifiche (tutte None = usa soglia globale) person_threshold=None, location_threshold=None, organization_threshold=None, date_time_threshold=None, phone_threshold=None, email_threshold=None, iban_threshold=None, cf_threshold=None, piva_threshold=None, targa_threshold=None, # Formati predefiniti per i pseudonimi person_pseudo_format="Persona{num}", location_pseudo_format="Luogo{num}", organization_pseudo_format="Organizzazione{num}", date_pseudo_format="Data{num}", phone_pseudo_format="+39-XXX-XXX-{num}", email_pseudo_format="email{num}@esempio.com", iban_pseudo_format="IT00X0000000000000{num}", cf_pseudo_format="ABCDEF00G00H000{num}", piva_pseudo_format="IT0000000000{num}", targa_pseudo_format="XX000{num}" ) # Crea l'interfaccia Gradio con controllo entità demo_advanced = gr.Interface( fn=process_text_with_entity_control_wrapper, # Usa la funzione wrapper inputs=[ gr.Textbox( label="Testo da analizzare", lines=5, placeholder="Inserisci il testo contenente dati sensibili...", value="Il signor Marco Rossi, nato il 15/04/1978, CF: RSSMRC78D15H501T, può essere contattato al numero +39 333-1234567 o all'email marco.rossi@example.com." ), gr.Radio( ["replace", "redact", "pseudonymize"], label="Tipo di anonimizzazione", value="replace" ), gr.Checkbox( label="Usa modello Stanford", value=True ), gr.Checkbox( label="Usa Regex Fallback", value=True ), gr.Slider( minimum=0.1, maximum=1.0, value=0.5, step=0.05, label="Soglia di confidenza minima" ), # Controlli per i tipi di entità gr.Checkbox(label="Persone (PERSON)", value=True), gr.Checkbox(label="Luoghi (LOCATION)", value=True), gr.Checkbox(label="Organizzazioni (ORGANIZATION)", value=True), gr.Checkbox(label="Date (DATE_TIME)", value=True), gr.Checkbox(label="Numeri di telefono (PHONE_NUMBER)", value=True), gr.Checkbox(label="Email (EMAIL)", value=True), gr.Checkbox(label="IBAN (IBAN_CODE)", value=True), gr.Checkbox(label="Codici Fiscali (CODICE_FISCALE)", value=True), gr.Checkbox(label="Partite IVA (PARTITA_IVA)", value=True), gr.Checkbox(label="Targhe (TARGA)", value=True) ], outputs=[ gr.HTML(label="Testo con entità evidenziate"), gr.Textbox(label="Testo anonimizzato", lines=5), gr.Markdown(label="Statistiche di rilevamento") ], title="🔒 Sistema Ibrido di Anonimizzazione Dati - Controllo Entità", description="Analizza e anonimizza testi selezionando i tipi di entità da processare.\n" "I diversi colori indicano i tipi di entità rilevate.", examples=entity_control_examples, theme=gr.themes.Soft(), allow_flagging="never" ) # Avvia l'interfaccia migliorata # demo_advanced.launch(share=True, debug=True) # ========================================================= # CELLA 10: INTERFACCIA AVANZATA CON PARAMETRI DI ANONIMIZZAZIONE # ========================================================= with gr.Blocks(theme=gr.themes.Soft()) as demo_blocks: gr.Markdown("# 🔒 Sistema Ibrido di Anonimizzazione Dati") gr.Markdown("Analizza e anonimizza testi in italiano con controllo avanzato dei parametri.") with gr.Row(): with gr.Column(scale=2): text_input = gr.Textbox( label="Testo da analizzare", lines=6, placeholder="Inserisci il testo contenente dati sensibili...", value="Il signor Marco Rossi, nato il 15/04/1978, CF: RSSMRC78D15H501T, può essere contattato al numero +39 333-1234567 o all'email marco.rossi@example.com." ) with gr.Tabs(): with gr.TabItem("Impostazioni Base"): with gr.Row(): with gr.Column(): anon_type = gr.Radio( ["replace", "redact", "pseudonymize"], label="Tipo di anonimizzazione globale", value="replace" ) confidence = gr.Slider( minimum=0.1, maximum=1.0, value=0.5, step=0.05, label="Soglia di confidenza globale" ) with gr.Column(): use_stanford = gr.Checkbox(label="Usa modello Stanford", value=True) use_regex = gr.Checkbox(label="Usa Regex Fallback", value=True) with gr.TabItem("Parametri di Anonimizzazione"): with gr.Row(): with gr.Column(): tag_format = gr.Radio( ["", "[TAG]", "{TAG}", "TAG_"], label="Formato dei tag di sostituzione", value="" ) redact_char = gr.Radio( ["*", "X", "#", "_"], label="Carattere di oscuramento (per redact)", value="*" ) with gr.Column(): preserve_length = gr.Checkbox( label="Preserva lunghezza originale nelle sostituzioni", value=False ) gr.Markdown("### Anteprima formati di tag") anteprima_html = gr.HTML(value="

Replace: <PERSON>, [PERSON], {PERSON}, PERSON_

Redact: *****, XXXXX, #####, _____

Pseudonymize: Persona1, Luogo1, Data1...

") with gr.Accordion("Metodi specifici per tipo di entità", open=False): gr.Markdown("Seleziona un metodo specifico per ogni tipo di entità, o lascia 'Globale' per usare il metodo globale") with gr.Row(): with gr.Column(): person_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per PERSON", value=None ) location_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per LOCATION", value=None ) organization_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per ORGANIZATION", value=None ) date_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per DATE_TIME", value=None ) phone_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per PHONE_NUMBER", value=None ) with gr.Column(): email_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per EMAIL", value=None ) iban_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per IBAN_CODE", value=None ) cf_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per CODICE_FISCALE", value=None ) piva_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per PARTITA_IVA", value=None ) targa_method = gr.Dropdown( [None, "replace", "redact", "pseudonymize"], label="Metodo per TARGA", value=None ) with gr.TabItem("Soglie di Confidenza"): gr.Markdown("### Imposta soglie di confidenza specifiche per tipo di entità") gr.Markdown("Lascia vuoto per usare la soglia globale") with gr.Row(): with gr.Column(): person_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per PERSON", value=None ) location_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per LOCATION", value=None ) organization_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per ORGANIZATION", value=None ) date_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per DATE_TIME", value=None ) phone_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per PHONE_NUMBER", value=None ) with gr.Column(): email_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per EMAIL", value=None ) iban_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per IBAN_CODE", value=None ) cf_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per CODICE_FISCALE", value=None ) piva_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per PARTITA_IVA", value=None ) targa_threshold = gr.Slider( minimum=0.1, maximum=1.0, step=0.05, label="Soglia per TARGA", value=None ) with gr.TabItem("Formati Pseudonimi"): gr.Markdown("### Personalizza i formati dei pseudonimi") gr.Markdown("Usa {num} per inserire il numero progressivo, {type} per il tipo di entità, {orig} per l'iniziale dell'originale") with gr.Row(): with gr.Column(): person_format = gr.Textbox( label="Formato per PERSON", value="Persona{num}", placeholder="es. Persona{num}, P{num}, {orig}..." ) location_format = gr.Textbox( label="Formato per LOCATION", value="Luogo{num}", placeholder="es. Luogo{num}, L{num}..." ) organization_format = gr.Textbox( label="Formato per ORGANIZATION", value="Organizzazione{num}", placeholder="es. Org{num}, Azienda{num}..." ) date_format = gr.Textbox( label="Formato per DATE_TIME", value="Data{num}", placeholder="es. GG/MM/AAAA, Data{num}..." ) phone_format = gr.Textbox( label="Formato per PHONE_NUMBER", value="+39-XXX-XXX-{num}", placeholder="es. +39-XXX-XXX-{num}..." ) with gr.Column(): email_format = gr.Textbox( label="Formato per EMAIL", value="email{num}@esempio.com", placeholder="es. user{num}@domain.com..." ) iban_format = gr.Textbox( label="Formato per IBAN_CODE", value="IT00X0000000000000{num}", placeholder="es. IT00X0000..." ) cf_format = gr.Textbox( label="Formato per CODICE_FISCALE", value="ABCDEF00G00H000{num}", placeholder="es. ABCDEF00G00H000{num}..." ) piva_format = gr.Textbox( label="Formato per PARTITA_IVA", value="IT0000000000{num}", placeholder="es. IT0000000000{num}..." ) targa_format = gr.Textbox( label="Formato per TARGA", value="XX000{num}", placeholder="es. XX000{num}..." ) process_btn = gr.Button("Analizza e Anonimizza", variant="primary") with gr.Column(scale=1): gr.Markdown("### Seleziona i tipi di entità da anonimizzare") with gr.Group(): person_enabled = gr.Checkbox(label="👤 Persone (PERSON)", value=True) location_enabled = gr.Checkbox(label="📍 Luoghi (LOCATION)", value=True) organization_enabled = gr.Checkbox(label="🏢 Organizzazioni (ORGANIZATION)", value=True) date_time_enabled = gr.Checkbox(label="📅 Date (DATE_TIME)", value=True) phone_number_enabled = gr.Checkbox(label="📞 Numeri di telefono (PHONE_NUMBER)", value=True) email_enabled = gr.Checkbox(label="📧 Email (EMAIL)", value=True) iban_enabled = gr.Checkbox(label="💳 IBAN (IBAN_CODE)", value=True) codice_fiscale_enabled = gr.Checkbox(label="🪪 Codici Fiscali (CODICE_FISCALE)", value=True) partita_iva_enabled = gr.Checkbox(label="🏷️ Partite IVA (PARTITA_IVA)", value=True) targa_enabled = gr.Checkbox(label="🚗 Targhe (TARGA)", value=True) with gr.Row(): select_all_btn = gr.Button("Seleziona tutti") clear_all_btn = gr.Button("Deseleziona tutti") with gr.Accordion("Guida rapida", open=False): gr.Markdown(""" **Tipi di anonimizzazione:** - **Replace**: sostituisce l'entità con un tag (es. ) - **Redact**: oscura l'entità con caratteri (es. *****) - **Pseudonymize**: sostituisce con valori fittizi (es. Persona1) **Formato tag:** - ``: usa tag HTML (es. ) - `[TAG]`: usa parentesi quadre (es. [PERSON]) - `{TAG}`: usa parentesi graffe (es. {PERSON}) - `TAG_`: usa underscore (es. PERSON_) **Preserva lunghezza:** - Se attivo, mantiene la lunghezza originale dell'entità - Utile per mantenere il formato del documento """) with gr.Tabs(): with gr.TabItem("Risultati"): html_output = gr.HTML(label="Testo con entità evidenziate") anon_output = gr.Textbox(label="Testo anonimizzato", lines=5) stats_output = gr.Markdown(label="Statistiche di rilevamento") # Funzione per aggiornare l'anteprima dei formati di tag def update_preview(tag_format, redact_char, preserve_length): replace_examples = { "": "<PERSON>", "[TAG]": "[PERSON]", "{TAG}": "{PERSON}", "TAG_": "PERSON_" } redact_example = redact_char * 5 if preserve_length: redact_note = " (mantenendo lunghezza originale)" else: redact_note = " (lunghezza fissa)" return f"""

Replace: {replace_examples[tag_format]}

Redact: {redact_example}{redact_note}

Pseudonymize: Persona1, Luogo1, Data1...

""" # Aggiorna l'anteprima quando cambiano i parametri tag_format.change( update_preview, inputs=[tag_format, redact_char, preserve_length], outputs=anteprima_html ) redact_char.change( update_preview, inputs=[tag_format, redact_char, preserve_length], outputs=anteprima_html ) preserve_length.change( update_preview, inputs=[tag_format, redact_char, preserve_length], outputs=anteprima_html ) # Logica per i pulsanti di selezione def select_all(): return [True] * 10 def clear_all(): return [False] * 10 select_all_btn.click( select_all, inputs=None, outputs=[ person_enabled, location_enabled, organization_enabled, date_time_enabled, phone_number_enabled, email_enabled, iban_enabled, codice_fiscale_enabled, partita_iva_enabled, targa_enabled ] ) clear_all_btn.click( clear_all, inputs=None, outputs=[ person_enabled, location_enabled, organization_enabled, date_time_enabled, phone_number_enabled, email_enabled, iban_enabled, codice_fiscale_enabled, partita_iva_enabled, targa_enabled ] ) # Callback per il pulsante di processo process_btn.click( process_text_with_entity_control, inputs=[ text_input, anon_type, use_stanford, use_regex, confidence, person_enabled, location_enabled, organization_enabled, date_time_enabled, phone_number_enabled, email_enabled, iban_enabled, codice_fiscale_enabled, partita_iva_enabled, targa_enabled, # Parametri di anonimizzazione avanzati tag_format, redact_char, preserve_length, # Metodi specifici per tipo person_method, location_method, organization_method, date_method, phone_method, email_method, iban_method, cf_method, piva_method, targa_method, # Soglie specifiche per tipo person_threshold, location_threshold, organization_threshold, date_threshold, phone_threshold, email_threshold, iban_threshold, cf_threshold, piva_threshold, targa_threshold, # Formati dei pseudonimi person_format, location_format, organization_format, date_format, phone_format, email_format, iban_format, cf_format, piva_format, targa_format ], outputs=[html_output, anon_output, stats_output] ) # Avvia l'interfaccia a blocchi (commenta la linea launch della cella 11 se la usi) if __name__ == "__main__": demo_blocks.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )